با سلام
من دو تا کانتینر دارم یکی Spark و یکی Airflow
می خواستم بدونم چطور میتونم داخل DAG ارتباط برقرار کنم با Spark و Task را طوری بنویسم که Spark اجرا کنه. کد DAG زیر رو نفهمیدم چطور این کار را انجام میده
من این فایل رو از اینترنت گرفتم و چند تا سوال داشتم:
چرا کانتینرهای schedular و webserver بعد از مدتی پاک میشن
و این روش که روی یک سرور به عنوان مثال اگر چند کانتینر مثل این کد بالا بیارم و کاری رو انجام بدیم در واقع توی شرکتها تو محیط production هم همین کار را انجام میدن.
با تشکر
version: '3'
x-spark-common: &spark-common
image: bitnami/spark:latest
volumes:
- ./jobs:/opt/bitnami/spark/jobs
networks:
- code-with-yu
x-airflow-common: &airflow-common
build:
context: .
dockerfile: Dockerfile
env_file:
- airflow.env
volumes:
- ./jobs:/opt/airflow/jobs
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
networks:
- code-with-yu
services:
spark-master:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
spark-worker:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
postgres:
image: postgres:14.0
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
networks:
- code-with-yu
webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
depends_on:
- scheduler
scheduler:
<<: *airflow-common
command: bash -c "airflow db migrate && airflow users create --username admin --firstname Yusuf --lastname Ganiyu --role Admin --email airscholar@gmail.com --password admin && airflow scheduler"
networks:
code-with-yu:
Dockerfile:
FROM apache/airflow:2.7.1-python3.11
USER root
RUN apt-get update && \
apt-get install -y gcc python3-dev openjdk-11-jdk && \
apt-get clean
# Set JAVA_HOME environment variable
ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-arm64
USER airflow
RUN pip install apache-airflow apache-airflow-providers-apache-spark pyspark
Env:
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__WEBSERVER_BASE_URL=http://localhost:8080
AIRFLOW__WEBSERVER__SECRET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
DAG:
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
dag = DAG(
dag_id = "sparking_flow",
default_args = {
"owner": "Yusuf Ganiyu",
"start_date": airflow.utils.dates.days_ago(1)
},
schedule_interval = "@daily"
)
start = PythonOperator(
task_id="start",
python_callable = lambda: print("Jobs started"),
dag=dag
)
python_job = SparkSubmitOperator(
task_id="python_job",
conn_id="spark-conn",
application="jobs/python/wordcountjob.py",
dag=dag
)
scala_job = SparkSubmitOperator(
task_id="scala_job",
conn_id="spark-conn",
application="jobs/scala/target/scala-2.12/word-count_2.12-0.1.jar",
dag=dag
)
java_job = SparkSubmitOperator(
task_id="java_job",
conn_id="spark-conn",
application="jobs/java/spark-job/target/spark-job-1.0-SNAPSHOT.jar",
java_class="com.airscholar.spark.WordCountJob",
dag=dag
)
end = PythonOperator(
task_id="end",
python_callable = lambda: print("Jobs completed successfully"),
dag=dag
)
start >> [python_job, scala_job, java_job] >> end