💻 آخرین فرصت یادگیری برنامه‌نویسی با آفر ویژه قبل از افزایش قیمت در ۵ آذر ماه (🎁 به همراه یک هدیه ارزشمند )
۰ ثانیه
۰ دقیقه
۰ ساعت
۱ Mahdi Nematshahi
ارتباط برقرار کردن بین container ها
جامعه هوش مصنوعی ایجاد شده در ۰۶ خرداد ۱۴۰۳

با سلام

من دو تا کانتینر دارم یکی Spark و یکی Airflow 

می خواستم بدونم چطور می‌تونم داخل DAG ارتباط برقرار کنم با Spark و Task را طوری بنویسم که Spark اجرا کنه. کد DAG زیر رو نفهمیدم چطور این کار را انجام میده

من این فایل رو از اینترنت گرفتم و چند تا سوال داشتم:

چرا کانتینرهای schedular و webserver بعد از مدتی پاک میشن

و این روش که روی یک سرور به عنوان مثال اگر چند کانتینر مثل این کد بالا بیارم و کاری رو انجام بدیم در واقع توی شرکتها تو محیط production هم همین کار را انجام میدن.

 

با تشکر

 

version: '3'
x-spark-common: &spark-common
  image: bitnami/spark:latest
  volumes:
    - ./jobs:/opt/bitnami/spark/jobs
  networks:
    - code-with-yu
x-airflow-common: &airflow-common
  build:
    context: .
    dockerfile: Dockerfile
  env_file:
    - airflow.env
  volumes:
    - ./jobs:/opt/airflow/jobs
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
  depends_on:
    - postgres
  networks:
    - code-with-yu
services:
  spark-master:
    <<: *spark-common
    command: bin/spark-class org.apache.spark.deploy.master.Master
    ports:
      - "9090:8080"
      - "7077:7077"
  spark-worker:
    <<: *spark-common
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_WORKER_CORES: 2
      SPARK_WORKER_MEMORY: 1g
      SPARK_MASTER_URL: spark://spark-master:7077
  postgres:
    image: postgres:14.0
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    networks:
      - code-with-yu
  webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    depends_on:
      - scheduler
  scheduler:
    <<: *airflow-common
    command: bash -c "airflow db migrate && airflow users create --username admin --firstname Yusuf --lastname Ganiyu --role Admin --email airscholar@gmail.com --password admin && airflow scheduler"
networks:
  code-with-yu:
  Dockerfile:
  FROM apache/airflow:2.7.1-python3.11
USER root
RUN apt-get update && \
    apt-get install -y gcc python3-dev openjdk-11-jdk && \
    apt-get clean
# Set JAVA_HOME environment variable
ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-arm64
USER airflow
RUN pip install apache-airflow apache-airflow-providers-apache-spark pyspark
Env:
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__WEBSERVER_BASE_URL=http://localhost:8080
AIRFLOW__WEBSERVER__SECRET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
DAG:
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
dag = DAG(
    dag_id = "sparking_flow",
    default_args = {
        "owner": "Yusuf Ganiyu",
        "start_date": airflow.utils.dates.days_ago(1)
    },
    schedule_interval = "@daily"
)
start = PythonOperator(
    task_id="start",
    python_callable = lambda: print("Jobs started"),
    dag=dag
)
python_job = SparkSubmitOperator(
    task_id="python_job",
    conn_id="spark-conn",
    application="jobs/python/wordcountjob.py",
    dag=dag
)
scala_job = SparkSubmitOperator(
    task_id="scala_job",
    conn_id="spark-conn",
    application="jobs/scala/target/scala-2.12/word-count_2.12-0.1.jar",
    dag=dag
)
java_job = SparkSubmitOperator(
    task_id="java_job",
    conn_id="spark-conn",
    application="jobs/java/spark-job/target/spark-job-1.0-SNAPSHOT.jar",
    java_class="com.airscholar.spark.WordCountJob",
    dag=dag
)
end = PythonOperator(
    task_id="end",
    python_callable = lambda: print("Jobs completed successfully"),
    dag=dag
)
start >> [python_job, scala_job, java_job] >> end

 

 

 

 

 

سلام وقتتون بخیر.

چندتا نکته رو باید مدنظرتون قرار بدین.

1. مورد اول اینکه ما از یک شبکه داریم توی فایل docker-compose مون استفاده میکنیم. سرویسهایی که دوست داریم به هم دیگه دسترسی داشته باشن رو توی این شبکه قرار میدیم. آخر فایل docker-compose.yml مون یه جایی هست که networks تعریف میکنیم و برای هرکدوم از سرویسها هم مشخص میکنیم که به کدوم network دسترسی داشته باشه.
2. اینکه کانتینترهای webserver و scheduler بعد یه مدت از کار میوفتن و حذف میشن دلیلش ممکنه این باشه که جایی به ارور برمیخورین. میتونین id کانتینرهاتون رو بردارین و از docker logs لاگهای اون کانتینر رو بخونین ببینین چه اتفاقی براشون افتاده.
3. توی محیط production ما غالبا از طریق UI یا از طریق API ای که خودمون میسازیم به یوزرها دسترسی میدیم و خود UI یا API به این سرویسها دسترسی میگیرن و اگر موردی باشه انجام میدن. منتها اگر سرویسی خیلی زیر لود میره و میزان مصرفش بالاست میتونین از یه سیستم Container Orchestration مثل Kubernetes یا Docker Swarm برای داشتن Instance‌های بیشتر از هر سرویس استفاده کنین و در کنارش مصرف منابع رو هم کنترل کنین.

توی Dockerfile ای که قرارداده شده Secret_Key ای که Airflow از اون استفاده میکنه رو قرار دادن مستقیم. این خیلی کار خوبی نیست و میتونه به یه شکلی در اختیار هکرها یا آدمهایی که قصد نفوذ به سیستم رو دارن قرار بگیره. به جاش ترجیح میدیم که اینها رو در یک فایل متنی مثلا یک فایل .env بریزیم و بعد اون  فایل رو در داخل ایمیج کپی کنیم و محتویاتش رو در Environment لود کنیم.

علی سوفالی ۲۲ خرداد ۱۴۰۳، ۱۰:۲۸