Skip to content

Commit

Permalink
Mongo (#14)
Browse files Browse the repository at this point in the history
* Mongo

* minor

* minor
  • Loading branch information
aliavni authored Jun 22, 2024
1 parent 2d4dfaa commit 3af5ccf
Show file tree
Hide file tree
Showing 14 changed files with 843 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ AIRFLOW_CONFIG='/opt/airflow/config/airflow.cfg'

# Airflow connections
AIRFLOW_CONN_SPARK_DEFAULT='spark://spark-master:7077?deploy_mode=client'
AIRFLOW_CONN_MONGO_DEFAULT='mongo://mongo:27017/%3FauthSource%3Dadmin'
AIRFLOW_CONN_HTTP_DEFAULT=''

# Airflow slack integration
AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:<paste slack api key here. this value starts with xoxb->@/?timeout=42'
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ __pycache__
data/
logs/

notebooks/.bash_history
notebooks/.cache
notebooks/.conda
notebooks/.ipynb_checkpoints
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- [Kafka consumer](#kafka-consumer)
- [Airflow](#airflow)
- [Slack integration](#slack-integration)
- [Mongo](#mongo)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -157,3 +158,5 @@ update their values where necessary.
You need to create a Slack app and setup `AIRFLOW_CONN_SLACK_API_DEFAULT`
env variable with Slack api key. If you don't want to use this integration,
remove the `AIRFLOW_CONN_SLACK_API_DEFAULT` variable from your `.env` file.
## Mongo
2 changes: 1 addition & 1 deletion airflow/config/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ encrypt_s3_logs = False
#
# Variable: AIRFLOW__LOGGING__LOGGING_LEVEL
#
logging_level = DEBUG
logging_level = INFO

# Logging level for celery. If not set, it uses the value of logging_level
#
Expand Down
70 changes: 70 additions & 0 deletions airflow/dags/artic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging
from datetime import datetime

import pymongo
import requests
from airflow.decorators import dag

from airflow.operators.python import PythonOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pymongo import MongoClient, UpdateOne
from pymongo.collection import Collection
from pymongo.database import Database


@dag(
dag_id="artic",
schedule=None,
start_date=datetime(2024, 6, 21),
catchup=False,
)
def artic() -> None:
pass


def get_art_data_and_write_to_mongo():
endpoint = "https://api.artic.edu/api/v1/artworks?limit=100"

hook = MongoHook(mongo_conn_id="mongo_default")
client: MongoClient = hook.get_conn()
db: Database = client.get_database("artic")
collection: Collection = db.get_collection("art")

collection.create_index([("id", pymongo.ASCENDING)], unique=True)

page = 1
pieces = 0
while True:
logging.info(f"working on page {page}")
resp = requests.get(endpoint, timeout=10)
resp_json = resp.json()

data = resp_json.get("data", [])
if len(data) == 0:
break

pieces += len(data)

# Upsert to mongo
ops = [
UpdateOne({"id": piece["id"]}, {"$set": piece}, upsert=True)
for piece in data
]
collection.bulk_write(ops)

pagination = resp_json.get("pagination", {})
endpoint = pagination.get("next_url")

if not endpoint:
break

page += 1

logging.info(f"Finished. Made {page} API calls. Upserted {pieces}.")


dag = artic()

get_art_data = PythonOperator(
task_id="get_art_data", dag=dag, python_callable=get_art_data_and_write_to_mongo
)
2 changes: 1 addition & 1 deletion airflow/dags/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag = DAG("spark_job_example", start_date=datetime(2024, 5, 20))
dag = DAG("spark_job_example", start_date=datetime(2024, 5, 20), schedule="@once")

spark_task = SparkSubmitOperator(
conn_id="spark_master",
Expand Down
71 changes: 47 additions & 24 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
name: docker

x-common:
&common
networks:
- backend

x-airflow-common:
&airflow-common
build: ./docker/airflow
Expand All @@ -22,6 +27,7 @@ x-airflow-common:

services:
db:
<<: *common
container_name: docker-postgres
image: postgres:16
ports:
Expand All @@ -31,27 +37,25 @@ services:
- .env
volumes:
- pgdata:/var/lib/postgresql/data
networks:
- backend
healthcheck:
test: ["CMD-SHELL", "pg_isready -d postgres -U postgres"]
interval: 10s
timeout: 5s
retries: 5

python:
<<: *common
container_name: python
build:
context: .
networks:
- backend
depends_on:
db:
condition: service_healthy
env_file:
- .env

trino:
<<: *common
container_name: trino
ports:
- "8080:8080"
Expand All @@ -61,10 +65,9 @@ services:
- ./docker/trino/catalog:/etc/trino/catalog
environment:
- CATALOG_MANAGEMENT=dynamic
networks:
- backend

spark-master:
<<: *common
container_name: spark-master
build: ./docker/spark
ports:
Expand All @@ -80,9 +83,10 @@ services:
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
networks:
- backend
profiles:
- spark
spark-worker-a:
<<: *common
build: ./docker/spark
container_name: spark-worker-a
deploy:
Expand All @@ -102,9 +106,10 @@ services:
volumes:
- ./spark-apps:/opt/spark-apps
- ./data/spark-data:/opt/spark-data
networks:
- backend
profiles:
- spark
spark-worker-b:
<<: *common
build: ./docker/spark
container_name: spark-worker-b
deploy:
Expand All @@ -122,10 +127,11 @@ services:
volumes:
- ./spark-apps:/opt/spark-apps
- ./data/spark-data:/opt/spark-data
networks:
- backend
profiles:
- spark

jupyterlab:
<<: *common
container_name: jupyterlab
build: ./docker/jupyter
environment:
Expand All @@ -137,10 +143,9 @@ services:
- .env
ports:
- "8089:8089"
networks:
- backend

scylla-1:
<<: *common
build: ./docker/scylladb
container_name: scylla-1
restart: always
Expand All @@ -151,10 +156,11 @@ services:
- ./docker/scylladb/cassandra-rackdc.properties.dc1:/etc/scylla/cassandra-rackdc.properties
ports:
- "19042:19042"
networks:
- backend
profiles:
- scylla

scylla-2:
<<: *common
build: ./docker/scylladb
container_name: scylla-2
restart: always
Expand All @@ -163,10 +169,11 @@ services:
- ./docker/scylladb/scylla.yaml:/etc/scylla/scylla.yaml
- ./docker/scylladb/cassandra-rackdc.properties.dc1:/etc/scylla/cassandra-rackdc.properties
command: --seeds=scylla-1,scylla-2,scylla-3 --smp 1 --memory 750M --overprovisioned 1
networks:
- backend
profiles:
- scylla

scylla-3:
<<: *common
build: ./docker/scylladb
container_name: scylla-3
restart: always
Expand All @@ -175,22 +182,24 @@ services:
- ./docker/scylladb/scylla.yaml:/etc/scylla/scylla.yaml
- ./docker/scylladb/cassandra-rackdc.properties.dc1:/etc/scylla/cassandra-rackdc.properties
command: --seeds=scylla-1,scylla-2,scylla-3 --smp 1 --memory 750M --overprovisioned 1
networks:
- backend
profiles:
- scylla

kafka:
<<: *common
container_name: kafka
build: ./docker/kafka
volumes:
- ./data/kafka:/bitnami/kafka
networks:
- backend
env_file:
- .env
ports:
- '9092:9092'
- '9093:9093'
- '9094:9094'

redis:
<<: *common
container_name: redis
# Redis is limited to 7.2-bookworm due to licencing change
# https://redis.io/blog/redis-adopts-dual-source-available-licensing/
Expand All @@ -204,8 +213,6 @@ services:
retries: 50
start_period: 30s
restart: always
networks:
- backend

airflow-webserver:
<<: *airflow-common
Expand Down Expand Up @@ -381,6 +388,22 @@ services:
airflow-init:
condition: service_completed_successfully

mongo:
<<: *common
container_name: mongo
build: ./docker/mongo
ports:
- "27017:27017"
volumes:
- ./data/mongo:/data/db
healthcheck:
test: ["CMD-SHELL", "mongo --eval 'db.adminCommand(\"ping\")'"]
interval: 10s
timeout: 5s
retries: 5
restart: always


volumes:
pgdata:

Expand Down
2 changes: 1 addition & 1 deletion docker/airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.1/constraints-3.11.txt
apache-airflow-providers-amazon==8.20.0
apache-airflow-providers-apache-spark==4.7.2
apache-airflow-providers-mongo==4.0.0
apache-airflow-providers-slack==8.6.2
delta-spark==3.2.0
deltalake==0.17.3
duckdb==0.10.2
polars==0.20.31
pyspark==3.5.1
2 changes: 1 addition & 1 deletion docker/jupyter/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM jupyter/base-notebook:python-3.11.5

USER root
RUN apt-get update && apt-get install -y openjdk-8-jdk-headless wget
RUN apt-get update && apt-get install -y openjdk-8-jdk-headless wget telnet netcat

RUN mkdir -p /opt/spark/jars
RUN wget -P /opt/spark/jars https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
Expand Down
2 changes: 2 additions & 0 deletions docker/jupyter/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
delta-spark
deltalake
duckdb==1.0.0
# duckdb_engine==0.12.0
grpcio
grpcio-status
Expand All @@ -8,4 +9,5 @@ kafka-python==2.0.2
pandas==2.2.2
polars==0.20.31
protobuf
pymongo==4.7.3
pyspark==3.5.1
1 change: 1 addition & 0 deletions docker/mongo/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM mongodb/mongodb-community-server:7.0.9-ubuntu2204
Loading

0 comments on commit 3af5ccf

Please sign in to comment.