[Kubernetes Data Platform][Part 6][Main Components]: Data Orchestrate with Apache Airflow
In previous sections, we’ve explored how to utilize Trino and Spark for data processing on our data platform. Another crucial aspect of data management is Data Orchestration. Data orchestration is the automated process of gathering siloed data from various storage locations, combining and organizing it, and making it available for analysis. Numerous tools are available for this purpose, and in this article, we’ll delve into deploying Airflow on Kubernetes to manage ETL workflows using Spark Operator, Spark Connect Server, and dbt + Trino.
Airflow is a powerful tool with diverse deployment options. Here, we’ll present an approach that we consider particularly compelling.
Our Airflow setup will utilize Kubernetes with the following configurations:
Executor: The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster.
Logging: Before terminating a Pod, Airflow pushes logs to Minio, enabling us to debug issues that arise.
GitSync: GitSync streamlines DAG development and deployment for Airflow environments by automatically syncing pipeline code from Git to Airflow’s DAG folder. This eliminates manual synchronization and simplifies the CI/CD process.
Key Benefits of GitSync:
- Automated DAG deployment
- Streamlined development workflow
- Simplified CI/CD
- Reduced development time
- Improved code quality
- Enhanced collaboration
- Simplified deployment
How GitSync Works:
- Develop DAGs in Git
- Commit and push changes
- GitSync automatically deploys
Embrace Automated DAG Deployment with GitSync
- GitSync simplifies DAG management and revolutionizes pipeline deployment.
DEPLOYMENT STEPS
- Initialize a Kubernetes cluster with Kind.
- Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino, Spark Operator and Spark Connect Server on Kubernetes
- Install Apache Airflow
- Airflow with Spark Connect Server
- Airflow with Spark Operator
- Airflow with Trino + dbt
- Destroy the Kind cluster
HANDS-ON STEP
Reference Repository: https://github.com/viethqb/data-platform-notes/tree/main/airflow
1. Initialize a Kubernetes cluster with Kind.
> cd ~/Documents
> git clone https://github.com/viethqb/data-platform-notes.git
> cd data-platform-notes/airflow
> kind create cluster --name dev --config deployment/kind/kind-config.yaml2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino, Spark Operator and Spark Connect Server on Kubernetes
Install Nginx Ingress Controller
> helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
> helm repo update
> helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx --set controller.hostNetwork=true,controller.service.type="",controller.kind=DaemonSet --namespace ingress-nginx --version 4.10.1 --create-namespace --debug
> kubectl -n ingress-nginx get po -owideInstall MinIO
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install minio -n minio -f deployment/minio/minio-values.yaml bitnami/minio --create-namespace --debug --version 14.6.0
> kubectl -n minio get po
> kubectl get no -owide
# NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
# dev-control-plane Ready control-plane 3m53s v1.30.0 172.18.0.2 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker Ready <none> 3m11s v1.30.0 172.18.0.5 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker2 Ready <none> 3m12s v1.30.0 172.18.0.3 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker3 Ready <none> 3m11s v1.30.0 172.18.0.4 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15# Add the following lines to the end of the /etc/hosts
# 172.18.0.4 minio.lakehouse.local airflow.lakehouse.localDownload and Upload file https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-01.parquet to s3a://lakehouse/raw.db/yellow_tripdata/y=2009/m=01/yellow_tripdata_2009-01.parquet
Install Hive Metastore
# hive-metastore-postgresql
> helm repo add bitnami <https://charts.bitnami.com/bitnami>
> helm repo update
> helm upgrade --install metastore-db -n metastore -f deployment/hive/hive-metastore-postgres-values.yaml bitnami/postgresql --create-namespace --debug --version 15.4.2
# Hive metastore
# docker pull rtdl/hive-metastore:3.1.2
# kind load docker-image rtdl/hive-metastore:3.1.2 --name dev
> helm upgrade --install hive-metastore -n metastore -f deployment/hive/hive-metastore-values.yaml ../../charts/hive-metastore --create-namespace --debugInstall Trino
> helm repo add trino https://trinodb.github.io/charts
> helm upgrade --install trino -n trino -f deployment/trino/trino-values.yaml trino/trino --create-namespace --debug --version 0.21.0
> kubectl -n trino get poInstall Spark Operator
> helm repo add spark-operator https://kubeflow.github.io/spark-operator
> helm repo update
> helm upgrade --install spark-operator spark-operator/spark-operator --namespace spark-operator --set webhook.enable=true --set image.tag=v1beta2-1.4.6-3.5.0 --create-namespace --debug --version 1.3.2
> kubectl -n spark-operator get poInstall Spark Connect Server
> kubectl create ns sparglim
> kubectl create clusterrolebinding serviceaccounts-cluster-admin --clusterrole=cluster-admin --group=system:serviceaccounts
> kubectl apply -f deployment/spark-connect-server/
> kubectl -n sparglim logs -f deployments/sparglim-server3. Install Apache Airflow
viet1846/airflow:2.8.3 Dockerfile
FROM apache/airflow:2.8.3
USER airflow
COPY requirements.txt .
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --no-cache-dir -r requirements.txtrequirements.txt
pandas==2.0.3
dbt-core==1.8.2
dbt-trino==1.8.0
astronomer-cosmos==1.4.3
apache-airflow-providers-trino==5.7.1
apache-airflow-providers-cncf-kubernetes==7.13.0
pyspark==3.4.1
grpcio-status
pyarrowairflow-values.yaml
defaultAirflowRepository: viet1846/airflow
defaultAirflowTag: "2.8.3"
airflowVersion: "2.8.3"
images:
airflow:
pullPolicy: Always
# Select certain nodes for airflow pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []
schedulerName: ~
# Ingress configuration
ingress:
enabled: ~
web:
# Enable web ingress resource
enabled: true
# Annotations for the web Ingress
annotations: {}
# The path for the web Ingress
path: "/"
# The pathType for the above path (used only with Kubernetes v1.19 and above)
pathType: "ImplementationSpecific"
# The hostname for the web Ingress (Deprecated - renamed to `ingress.web.hosts`)
host: "airflow.lakehouse.local"
# The hostnames or hosts configuration for the web Ingress
hosts: []
# # The hostname for the web Ingress (can be templated)
# - name: ""
# # configs for web Ingress TLS
# tls:
# # Enable TLS termination for the web Ingress
# enabled: false
# # the name of a pre-created Secret containing a TLS private key and certificate
# secretName: ""
# The Ingress Class for the web Ingress (used only with Kubernetes v1.19 and above)
ingressClassName: "nginx"
# configs for web Ingress TLS (Deprecated - renamed to `ingress.web.hosts[*].tls`)
tls:
# Enable TLS termination for the web Ingress
enabled: false
# the name of a pre-created Secret containing a TLS private key and certificate
secretName: ""
# HTTP paths to add to the web Ingress before the default path
precedingPaths: []
# Http paths to add to the web Ingress after the default path
succeedingPaths: []
executor: "KubernetesExecutor"
extraEnv: |
- name: AIRFLOW__CORE__DAGS_FOLDER
value: '/opt/airflow/dags/repo/airflow/dags'
- name: AIRFLOW__CORE__DEFAULT_TIMEZONE
value: 'Asia/Ho_Chi_Minh'
- name: AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT
value: '300'
- name: AIRFLOW__SCHEDULER__PARSING_CLEANUP_INTERVAL
value: '30'
# Airflow database & redis config
data:
# Otherwise pass connection values in
metadataConnection:
user: postgres
pass: postgres
protocol: postgresql
host: ~
port: 5432
db: postgres
sslmode: disable
webserverSecretKey: 2cbfd5b635391cae990a3b0e7553a68e
scheduler:
enabled: true
hostAliases: []
# - ip: "127.0.0.1"
# hostnames:
# - "foo.local"
# - ip: "10.1.2.3"
# hostnames:
# - "foo.remote"
replicas: 1
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
nodeSelector: {}
# Airflow database migration job settings
migrateDatabaseJob:
enabled: true
# Airflow webserver settings
webserver:
enabled: true
replicas: 1
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
defaultUser:
enabled: true
role: Admin
username: admin
email: [email protected]
firstName: admin
lastName: user
password: admin
nodeSelector: {}
env: []
triggerer:
enabled: true
replicas: 1
persistence:
enabled: true
size: 100Gi
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
nodeSelector: {}
affinity: {}
env: []
# StatsD settings
statsd:
enabled: true
resources: {}
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
nodeSelector: {}
env: []
redis:
enabled: false
# Not recommended for production
postgresql:
enabled: true
auth:
enablePostgresUser: true
postgresPassword: postgres
username: "admin"
password: "admin"
config:
logging:
remote_logging: "True"
remote_base_log_folder: "s3://airflow/logs"
remote_log_conn_id: "s3_default"
encrypt_s3_logs: "False"
# Git sync
dags:
gitSync:
enabled: true
repo: https://github.com/viethqb/data-platform-notes.git
branch: main
rev: HEAD
depth: 1
subPath: "airflow"
# if your repo needs a user name password
# you can load them to a k8s secret like the one below
# ---
# apiVersion: v1
# kind: Secret
# metadata:
# name: git-credentials
# data:
# # For git-sync v3
# GIT_SYNC_USERNAME: <base64_encoded_git_username>
# GIT_SYNC_PASSWORD: <base64_encoded_git_password>
# # For git-sync v4
# GITSYNC_USERNAME: <base64_encoded_git_username>
# GITSYNC_PASSWORD: <base64_encoded_git_password>
# and specify the name of the secret below
#
# credentialsSecret: git-credentials
#
#
# If you are using an ssh clone url, you can load
# the ssh private key to a k8s secret like the one below
# ---
# apiVersion: v1
# kind: Secret
# metadata:
# name: airflow-ssh-secret
# data:
# # key needs to be gitSshKey
# gitSshKey: <base64_encoded_data>
# and specify the name of the secret below
# sshKeySecret: airflow-ssh-secret
#
# If you are using an ssh private key, you can additionally
# specify the content of your known_hosts file, example:
#
# knownHosts: |
# <host1>,<ip1> <key1>
# <host2>,<ip2> <key2>Install Airflow on Kubernetes
# docker build -t viet1846/airflow:2.8.3 .
# docker push viet1846/airflow:2.8.3
> helm repo add airflow <https://airflow.apache.org/>
> helm repo update
> helm upgrade --install airflow airflow/airflow -f deployment/airflow/airflow-values.yaml --namespace airflow --create-namespace --debug --version 1.13.1 --timeout 600s
> kubectl -n airflow get poAccess Airflow at http://airflow.lakehouse.local/connection/list/ with user: admin & password: admin
Config S3 Connection and Kubernetes Connection in Airflow UI
Access Airflow Connection at http://airflow.lakehouse.local/connection/list/ ⇒ add new record
Connection Id: s3_default
Connection Type: Amazon Web Services
AWS Access Key ID: admin
AWS Secret Access Key: password
Extra: {"endpoint_url": "http://minio.minio.svc.cluster.local:9000"}Connection Id: kubernetes_default
Connection Type: Kubernetes Cluster Connection
In cluster configuration: yes
Disable SSL: yesConfig Airflow permission to submit Spark job (Spark Operator)
> kubectl create role spark-operator-submitter --verb=create,get --resource=sparkapplications,pods/log --namespace=spark-operator
> kubectl create rolebinding airflow-worker-spark-submitter --role=spark-operator-submitter --serviceaccount=airflow:airflow-worker --namespace=spark-operator4. Airflow with Spark Connect Server
from __future__ import print_function
from builtins import range
from airflow.operators.empty import EmptyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from pyspark.sql import SparkSession
from datetime import datetime, date, timedelta
from pyspark.sql import Row
import os
import time
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7), datetime.min.time())
args = {
"owner": "airflow",
"start_date": seven_days_ago,
}
dag = DAG(
dag_id="spark_connect_server_example", default_args=args, schedule_interval=None
)
def spark_connect_server_example():
SPARK_CONNECT_SERVER = (
"sc://sparglim-server-service.sparglim.svc.cluster.local:15002"
)
spark = SparkSession.builder.remote(SPARK_CONNECT_SERVER).getOrCreate()
df = spark.createDataFrame(
[
Row(
a=1,
b=2.0,
c="string1",
d=date(2000, 1, 1),
e=datetime(2000, 1, 1, 12, 0),
),
Row(
a=2,
b=3.0,
c="string2",
d=date(2000, 2, 1),
e=datetime(2000, 1, 2, 12, 0),
),
Row(
a=4,
b=5.0,
c="string3",
d=date(2000, 3, 1),
e=datetime(2000, 1, 3, 12, 0),
),
]
)
df.show()
create_schema_df = spark.sql("CREATE DATABASE IF NOT EXISTS raw ")
df.writeTo("raw.demo").tableProperty(
"write.format.default", "parquet"
).createOrReplace()
start = EmptyOperator(
task_id="start",
dag=dag,
)
end = EmptyOperator(
task_id="end",
dag=dag,
)
spark_connect_server_example = PythonOperator(
task_id="spark_connect_server_example",
provide_context=True,
python_callable=spark_connect_server_example,
dag=dag,
)
start >> spark_connect_server_example >> endLink DAG: http://airflow.lakehouse.local/dags/spark_connect_server_example/grid
Trigger DAG:
Using Trino query Iceberg table created by Spark Connect Server
> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> select * from lakehouse.raw.demo;5. Airflow with Spark Operator
Code: https://github.com/viethqb/data-platform-notes/blob/main/airflow/dags/spark_main_iceberg.py
from datetime import timedelta, datetime
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import (
SparkKubernetesOperator,
)
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import (
SparkKubernetesSensor,
)
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago(1),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"max_active_runs": 1,
"retries": 3,
}
# [END default_args]
dag = DAG(
"spark_main_iceberg",
default_args=default_args,
schedule_interval=None,
tags=["example", "spark"],
)
submit = SparkKubernetesOperator(
task_id="main_iceberg_submit",
namespace="spark-operator",
application_file="spark_jobs/main_iceberg.yaml",
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
dag=dag,
)
sensor = SparkKubernetesSensor(
task_id="main_iceberg_monitor",
namespace="spark-operator",
application_name="{{ task_instance.xcom_pull(task_ids='main_iceberg_submit')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag,
attach_log=True,
)
submit >> sensorLink DAG: http://airflow.lakehouse.local/dags/spark_main_iceberg/grid
Trigger DAG:
Running Pod:
> kubectl -n spark-operator get poUsing Trino query Iceberg table created by Spark Operator
> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> select * from lakehouse.raw.taxis_spark limit 10;6. Airflow with Trino + dbt
The idea of this project is to demonstrate the power of two of the most successful Open Source data projects, dbt and Trino:
- dbt (data build tool) enables analytics engineers to transform data in their warehouses by simply writing SQL select statements. dbt handles turning these SQL select statements into tables and views.
- Trino, formerly PrestoSQL, is a fast distributed SQL query engine for big data analytics that helps you explore your data universe and mix multiple data sources.
Run dbt Projects Seamlessly with Airflow using Astronomer Cosmos
Astronomer Cosmos simplifies integrating dbt Core projects with Apache Airflow. With just a few lines of code, you can execute dbt models as Airflow DAGs and Task Groups.
Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code. Benefits include:
- Run dbt projects against Airflow connections instead of dbt profiles
- Native support for installing and running dbt in a virtual environment to avoid dependency conflicts with Airflow
- Run tests immediately after a model is done to catch issues early
- Utilize Airflow’s data-aware scheduling to run models immediately after upstream ingestion
- Turn each dbt model into a task/task group complete with retries, alerting, etc.
For this demonstration, I’ll be utilizing the jaffle-shop-classic project.
The profiles.yml file houses database connection details. However, due to security considerations, these credentials cannot be stored directly in the Git repository. Instead, they will be passed as Environment Variables.
We can leverage airflow secret to manage and transfer these sensitive parameters securely. (Not in this lab)
profiles.yml
trino:
target: dev
outputs:
dev:
type: trino
method: none
user: "{{ env_var('TRINO_USER') }}"
host: "{{ env_var('TRINO_HOST') }}"
port: "{{ env_var('TRINO_PORT') | int }}"
http_scheme: http
database: lakehouse
schema: jaffle_shopCode: https://github.com/viethqb/data-platform-notes/blob/main/airflow/dags/dbt_jaffle-shop-classic.py
from pathlib import Path
from datetime import datetime
from cosmos import DbtDag, ProjectConfig, ProfileConfig
from cosmos import ExecutionConfig
jaffle_shop_path = Path("/opt/airflow/dags/repo/airflow/dbt/jaffle-shop-classic")
dbt_executable = Path("/home/airflow/.local/bin/dbt")
venv_execution_config = ExecutionConfig(
dbt_executable_path=str(dbt_executable),
)
dbt_profile_example = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
dbt_project_path=jaffle_shop_path,
env_vars={
"TRINO_HOST": "trino.trino.svc.cluster.local",
"TRINO_PORT": "8080",
"TRINO_USER": "dbt",
},
),
profile_config=ProfileConfig(
# these map to dbt/jaffle_shop/profiles.yml
profile_name="trino",
target_name="dev",
profiles_yml_filepath=jaffle_shop_path / "profiles.yml",
),
execution_config=venv_execution_config,
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="dbt_jaffle-shop-classic_example",
tags=["dbt"],
)Link DAG: http://airflow.lakehouse.local/dags/dbt_jaffle-shop-classic_example/grid
Create jaffle_shop schema:
> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> CREATE SCHEMA lakehouse.jaffle_shop WITH (location = 's3a://lakehouse/jaffle_shop.db/');Trigger DAG:
Running Pod:
> kubectl -n airflow get poUsing Trino query Iceberg table created by bdt + Trino
> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> show tables from lakehouse.jaffle_shop;
trino> select * from lakehouse.jaffle_shop.customers limit 5;7. Destroy the Kind cluster
> kind delete cluster --name dev