Skip to content

Commit

Permalink
Airflow spark (#10)
Browse files Browse the repository at this point in the history
* Airflow spark

* ignore logs folder

* update .env.template

* update airflow config

* setup java and spark in airflow dockerfile

* adjust spark and docker-compose

* Dag with SparkSubmitOperator

* fix spark Dockerfile
  • Loading branch information
aliavni authored May 28, 2024
1 parent 26de4c8 commit 59b9a21
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 568 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__

data/
logs/

notebooks/.cache
notebooks/.conda
Expand Down
4 changes: 2 additions & 2 deletions airflow/config/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ encrypt_s3_logs = False
#
# Variable: AIRFLOW__LOGGING__LOGGING_LEVEL
#
logging_level = INFO
logging_level = DEBUG

# Logging level for celery. If not set, it uses the value of logging_level
#
Expand Down Expand Up @@ -1552,7 +1552,7 @@ cookie_samesite = Lax
#
# Variable: AIRFLOW__WEBSERVER__DEFAULT_WRAP
#
default_wrap = False
default_wrap = True

# Allow the UI to be rendered in a frame
#
Expand Down
13 changes: 13 additions & 0 deletions airflow/dags/airbnb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
from datetime import datetime
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag = DAG("airbnb", start_date=datetime(2024, 5, 20), schedule=None)

spark_task = SparkSubmitOperator(
conn_id="spark_master",
application=os.path.abspath("dags/airbnb_job.py"),
task_id="run_spark_job",
dag=dag,
)
13 changes: 13 additions & 0 deletions airflow/dags/airbnb_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName("airbnb").getOrCreate()
sc = spark.sparkContext

url = "https://data.insideairbnb.com/united-states/ma/boston/2024-03-24/data/listings.csv.gz"

df = spark.createDataFrame(pd.read_csv(url))

print(f"# of rows: {df.count()}")

df.show(20)
13 changes: 13 additions & 0 deletions airflow/dags/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
from datetime import datetime
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag = DAG("spark_job_example", start_date=datetime(2024, 5, 20))

spark_task = SparkSubmitOperator(
conn_id="spark_master",
application=os.path.abspath("dags/spark_job.py"),
task_id="run_spark_job",
dag=dag,
)
15 changes: 15 additions & 0 deletions airflow/dags/spark_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

df = spark.createDataFrame(
[
("sue", 32),
("li", 3),
("bob", 75),
("heo", 13),
],
["first_name", "age"],
)

df.show()
34 changes: 26 additions & 8 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ x-airflow-common:
networks:
- backend


services:
db:
container_name: docker-postgres
Expand Down Expand Up @@ -69,12 +68,15 @@ services:
container_name: spark-master
build: ./docker/spark
ports:
- "9090:8080"
- "4040:4040"
- "6066:6066"
- "7077:7077"
- "9090:8080"
- "9870:9870"
- "10000:10000"
volumes:
- ./apps:/opt/spark-apps
- ./data:/opt/spark-data
- ./spark-apps:/opt/spark-apps
- ./data/spark-data:/opt/spark-data
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
Expand All @@ -83,6 +85,10 @@ services:
spark-worker-a:
build: ./docker/spark
container_name: spark-worker-a
deploy:
resources:
limits:
memory: 10GB
env_file:
- .env
ports:
Expand All @@ -94,13 +100,17 @@ services:
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-a
volumes:
- ./apps:/opt/spark-apps
- ./data:/opt/spark-data
- ./spark-apps:/opt/spark-apps
- ./data/spark-data:/opt/spark-data
networks:
- backend
spark-worker-b:
build: ./docker/spark
container_name: spark-worker-b
deploy:
resources:
limits:
memory: 10GB
ports:
- "10092:8080"
- "7002:7000"
Expand All @@ -110,8 +120,8 @@ services:
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-b
volumes:
- ./apps:/opt/spark-apps
- ./data:/opt/spark-data
- ./spark-apps:/opt/spark-apps
- ./data/spark-data:/opt/spark-data
networks:
- backend

Expand Down Expand Up @@ -232,6 +242,12 @@ services:
airflow-worker:
<<: *airflow-common
command: celery worker
ports:
- "8793:8793"
deploy:
resources:
limits:
memory: 10GB
healthcheck:
# yamllint disable rule:line-length
test:
Expand All @@ -254,6 +270,8 @@ services:
airflow-triggerer:
<<: *airflow-common
command: triggerer
ports:
- "8794:8794"
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
Expand Down
37 changes: 36 additions & 1 deletion docker/airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,39 @@
FROM apache/airflow:2.9.1-python3.12
FROM apache/airflow:2.9.1-python3.11

USER root
RUN apt-get update && apt-get install -y curl wget vim

RUN wget --no-verbose -O openjdk-11.tar.gz https://builds.openlogic.com/downloadJDK/openlogic-openjdk/11.0.11%2B9/openlogic-openjdk-11.0.11%2B9-linux-x64.tar.gz
RUN tar -xzf openjdk-11.tar.gz --one-top-level=openjdk-11 --strip-components 1 -C /usr/local
ENV JAVA_HOME=/usr/local/openjdk-11
# ENV SPARK_WORKLOAD=submit

ENV SPARK_VERSION=3.5.1 \
HADOOP_VERSION=3 \
SPARK_HOME=/opt/spark \
PYTHONHASHSEED=1

RUN mkdir ${SPARK_HOME} && chown -R "${AIRFLOW_UID}:0" "${SPARK_HOME}"

USER airflow

# Download and uncompress spark from the apache archive
RUN wget --no-verbose -O apache-spark.tgz "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" \
&& mkdir -p ${SPARK_HOME} \
&& tar -xf apache-spark.tgz -C ${SPARK_HOME} --strip-components=1 \
&& rm apache-spark.tgz

COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt

USER root

COPY requirements_spark.txt /tmp/requirements_spark.txt
RUN cd /usr/local \
&& python -m venv pyspark_venv \
&& . pyspark_venv/bin/activate \
&& pip install -r /tmp/requirements_spark.txt

USER airflow

ENV PYSPARK_PYTHON=/usr/local/pyspark_venv/bin/python
11 changes: 9 additions & 2 deletions docker/airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.1/constraints-3.12.txt
duckdb==0.10.2
polars==0.20.26
apache-airflow-providers-apache-spark==4.7.2
plyvel==1.5.1

# duckdb==0.10.2
# polars==0.20.26
# pyspark==3.5.1
# apache-airflow-providers-slack
# deltalake
# delta-spark
3 changes: 3 additions & 0 deletions docker/airflow/requirements_spark.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# requirements should be the same as `docker/spark/requirements.txt`
pandas==2.2.2
setuptools==65.5.0
60 changes: 57 additions & 3 deletions docker/spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ RUN apt-get update && apt-get install -y curl vim wget software-properties-commo
# Fix the value of PYTHONHASHSEED
# Note: this is needed when you use Python 3.3 or greater
ENV SPARK_VERSION=3.5.1 \
HADOOP_VERSION=3 \
HADOOP_MAJOR_VERSION=3 \
HADOOP_VERSION=3.4.0 \
SPARK_HOME=/opt/spark \
PYTHONHASHSEED=1

# Download and uncompress spark from the apache archive
RUN wget --no-verbose -O apache-spark.tgz "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" \
RUN wget --no-verbose -O apache-spark.tgz "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_MAJOR_VERSION}.tgz" \
&& mkdir -p /opt/spark \
&& tar -xf apache-spark.tgz -C /opt/spark --strip-components=1 \
&& rm apache-spark.tgz
Expand All @@ -28,6 +29,54 @@ RUN wget https://www.python.org/ftp/python/3.11.5/Python-3.11.5.tgz \
&& make -j$(nproc) \
&& make altinstall
RUN update-alternatives --install "/usr/bin/python" "python" "$(which python3.11)" 1
RUN update-alternatives --install "/usr/bin/python3" "python3" "$(which python3.11)" 1

RUN rm -rf /tmp/Python-3.11.5
RUN rm /tmp/Python-3.11.5.tgz

# Hadoop
ENV HADOOP_HOME=/opt/hadoop
ENV HADOOP_USER_HOME=/home/hadoop
ENV HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
RUN mkdir -p $HADOOP_USER_HOME
RUN mkdir -p $HADOOP_HOME
RUN groupadd hadoop
RUN useradd -d $HADOOP_USER_HOME -g hadoop -m hadoop --shell /bin/bash

RUN apt-get install -y pdsh openssh-client openssh-server

RUN mkdir $HADOOP_USER_HOME/.ssh
RUN ssh-keygen -t rsa -P '' -f $HADOOP_USER_HOME/.ssh/id_rsa
RUN cat $HADOOP_USER_HOME/.ssh/id_rsa.pub >> $HADOOP_USER_HOME/.ssh/authorized_keys
RUN chmod 0600 $HADOOP_USER_HOME/.ssh/authorized_keys

COPY ssh/config $HADOOP_USER_HOME/.ssh/config
RUN service ssh start

RUN chown -R hadoop:hadoop $HADOOP_HOME
RUN chown -R hadoop:hadoop $HADOOP_USER_HOME
RUN mkdir -p /opt/spark-apps && chown -R hadoop:hadoop /opt/spark-apps
RUN mkdir -p /opt/spark-data && chown -R hadoop:hadoop /opt/spark-data

USER hadoop
ENV JAVA_HOME=/usr/local/openjdk-11
USER root
COPY requirements.txt /tmp/requirements.txt

ENV PATH=/opt/spark/bin:$PATH

RUN chown -R hadoop:hadoop /opt/spark
RUN ln -sf /bin/bash /bin/sh

# Venv
RUN cd /usr/local \
&& python -m venv pyspark_venv \
&& . pyspark_venv/bin/activate \
&& pip install -r /tmp/requirements.txt

WORKDIR /opt/spark

USER hadoop

# Apache spark environment
FROM builder as apache-spark
Expand All @@ -41,7 +90,7 @@ SPARK_MASTER_LOG=/opt/spark/logs/spark-master.out \
SPARK_WORKER_LOG=/opt/spark/logs/spark-worker.out \
SPARK_WORKER_WEBUI_PORT=8080 \
SPARK_WORKER_PORT=7000 \
SPARK_MASTER="spark://spark-master:7077" \
SPARK_MASTER="spark-master:7077" \
SPARK_WORKLOAD="master"

EXPOSE 8080 7077 6066 10000
Expand All @@ -55,4 +104,9 @@ ln -sf /dev/stdout $SPARK_WORKER_LOG
COPY start-spark.sh /
RUN wget -P /opt/spark/jars https://jdbc.postgresql.org/download/postgresql-42.7.3.jar

RUN echo "alias ls='ls --color=auto'" >> $HADOOP_USER_HOME/.bashrc
RUN echo "alias l='ls -lah'" >> $HADOOP_USER_HOME/.bashrc
RUN echo "alias ll='ls -lh'" >> $HADOOP_USER_HOME/.bashrc
RUN echo "alias la='ls -lAh'" >> $HADOOP_USER_HOME/.bashrc

CMD ["/bin/bash", "/start-spark.sh"]
3 changes: 3 additions & 0 deletions docker/spark/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# requirements should be the same as `docker/airflow/requirements_spark.txt`
pandas==2.2.2
setuptools==65.5.0
2 changes: 2 additions & 0 deletions docker/spark/ssh/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Host *
StrictHostKeyChecking no
4 changes: 3 additions & 1 deletion docker/spark/start-spark.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#start-spark.sh
#!/bin/bash

service ssh restart

. "/opt/spark/bin/load-spark-env.sh"
# When the spark work_load is master run class org.apache.spark.deploy.master.Master
if [ "$SPARK_WORKLOAD" == "master" ];
Expand Down
Empty file added logs/.gitkeep
Empty file.
Loading

0 comments on commit 59b9a21

Please sign in to comment.