[Kubernetes Data Platform][Part 5.1][Main Components]: Apache Spark with Kubernetes Spark Operator
To be honest, I don’t like Apache Spark, not because of its features or performance, but because of its deployment, configuration, and dependencies. There are many ways to deploy, but today I will present how to use Spark Operator, a custom resource of Kubernetes.
The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications.
In simple terms, after installing Spark Operator, you will have a new resource type called SparkApplication on your Kubernetes cluster. You can then configure a manifest similar to other resource types and run kubectl to submit a Spark job.
The Kubernetes Operator for Apache Spark currently supports the following list of features:
- Supports Spark 2.3 and up.
- Enables declarative application specification and management of applications through custom resources.
- Automatically runs
spark-submiton behalf of users for eachSparkApplicationeligible for submission. - Provides native cron support for running scheduled applications.
- Supports customization of Spark pods beyond what Spark natively is able to do through the mutating admission webhook, e.g., mounting ConfigMaps and volumes, and setting pod affinity/anti-affinity.
- Supports automatic application re-submission for updated
SparkApplicationobjects with updated specification. - Supports automatic application restart with a configurable restart policy.
- Supports automatic retries of failed submissions with optional linear back-off.
- Supports mounting local Hadoop configuration as a Kubernetes ConfigMap automatically via
sparkctl. - Supports automatically staging local application dependencies to Google Cloud Storage (GCS) via
sparkctl. - Supports collecting and exporting application-level metrics and driver/executor metrics to Prometheus.
The architecture is as follows:
DEPLOYMENT STEPS
1. Initialize a Kubernetes cluster with Kind.
2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes.
- This step will be repeated in subsequent labs. Practicing multiple times will help you remember better.
3. Install Spark Operator and Test with the spark_pi application
4. Integrate Spark with Iceberg and Hive Metastore:
- Build Base images
- Build application images
5. Submit main_iceberg job
6. Using Trino query Iceberg table create by Spark
7. Destroy the Kind cluster.
HANDS-ON STEP
Reference Repository: https://github.com/viethqb/data-platform-notes/tree/main/spark
1. Initialize a Kubernetes cluster with Kind
> cd ~/Documents
> git clone https://github.com/viethqb/data-platform-notes.git
> cd data-platform-notes/spark
> kind create cluster --name dev --config deployment/kind/kind-config.yaml
> kubectl get no -owide 2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes
Install Nginx Ingress Controller
> helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
> helm repo update
> helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx --set controller.hostNetwork=true,controller.service.type="",controller.kind=DaemonSet --namespace ingress-nginx --version 4.10.1 --create-namespace --debug
> kubectl -n ingress-nginx get po -owideInstall MinIO
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install minio -n minio -f deployment/minio/minio-values.yaml bitnami/minio --create-namespace --debug --version 14.6.0
> kubectl -n minio get po
> kubectl get no -owide
# NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
# dev-control-plane Ready control-plane 3m53s v1.30.0 172.18.0.2 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker Ready <none> 3m11s v1.30.0 172.18.0.5 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker2 Ready <none> 3m12s v1.30.0 172.18.0.3 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker3 Ready <none> 3m11s v1.30.0 172.18.0.4 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15Add the following lines to the end of the /etc/hosts
172.18.0.4 minio.lakehouse.localDownload and Upload file https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-01.parquet to s3a://lakehouse/raw/yellow_tripdata/y=2009/m=01/yellow_tripdata_2009–01.parquet
Install Hive Metastore
# hive-metastore-postgresql
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install metastore-db -n metastore -f deployment/hive/hive-metastore-postgres-values.yaml bitnami/postgresql --create-namespace --debug --version 15.4.2
# Hive metastore
# docker pull rtdl/hive-metastore:3.1.2
# kind load docker-image rtdl/hive-metastore:3.1.2 --name dev
> helm upgrade --install hive-metastore -n metastore -f deployment/hive/hive-metastore-values.yaml ../charts/hive-metastore --create-namespace --debugInstall Trino
> helm repo add trino https://trinodb.github.io/charts
> helm upgrade --install trino -n trino -f deployment/trino/trino-values.yaml trino/trino --create-namespace --debug --version 0.21.0
> kubectl -n trino get po3. Install Spark Operator and Test with the spark_pi application
Install Spark Operator
> helm repo add spark-operator https://kubeflow.github.io/spark-operator
> helm repo update
> helm upgrade --install spark-operator spark-operator/spark-operator --namespace spark-operator --set webhook.enable=true --set image.tag=v1beta2-1.4.6-3.5.0 --create-namespace --debug --version 1.3.2
> kubectl -n spark-operator get poTest with the spark_pi application
spark_pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
# name: pyspark-pi-{{ ts_nodash|lower }}-{{ task_instance.try_number }}
name: pyspark-pi
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "viet1846/spark-py:v3.5.1"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.5.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.1
serviceAccount: spark-operator-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.1Spark submit job
# docker pull viet1846/spark-py:v3.5.1
# kind load docker-image viet1846/spark-py:v3.5.1 --name dev
> kubectl apply -f jobs/spark_pi.yaml
> kubectl -n spark-operator get po -w
> kubectl -n spark-operator logs -f pyspark-pi-driverGet sparkapplications resource:
> kubectl -n spark-operator get sparkapplications
> kubectl -n spark-operator describe sparkapplications pyspark-pi4. Integrate Spark with Iceberg and Hive Metastore
Environment config: src/config.py
import os
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "admin")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "password")
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "minio.minio.svc.cluster.local:9000")
S3_SSL_ENABLE = os.getenv("S3_SSL_ENABLE", "false")
S3_PATH_STYLE_ACCESS = os.getenv("S3_PATH_STYLE_ACCESS", "true")
S3_ATTEMPTS_MAXIMUM = os.getenv("S3_ATTEMPTS_MAXIMUM", "1")
S3_CONNECTION_ESTABLISH_TIMEOUT = os.getenv("S3_CONNECTION_ESTABLISH_TIMEOUT", "5000")
S3_CONNECTION_TIMEOUT = os.getenv("S3_CONNECTION_TIMEOUT", "10000")
S3_WAREHOUSE = os.getenv("S3_WAREHOUSE", "s3a://lakehouse/")
S3_HTTP_ENDPOINT = (
f"https://{S3_ENDPOINT}" if S3_SSL_ENABLE == "true" else f"http://{S3_ENDPOINT}"
)
METASTORE_URI = os.getenv(
"METASTORE_URI", "thrift://hive-metastore.metastore.svc.cluster.local:9083"
)Spark config with Hive Metastore, Minio and Iceberg: src/utils.py
import logging
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from config import *
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY
)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.ssl.enabled", S3_SSL_ENABLE
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.path.style.access", S3_PATH_STYLE_ACCESS
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.attempts.maximum", S3_ATTEMPTS_MAXIMUM
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.establish.timeout", S3_CONNECTION_ESTABLISH_TIMEOUT
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.timeout", S3_CONNECTION_TIMEOUT
)
def get_spark(
job_name,
):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(job_name)
# adding iceberg configs
conf = (
SparkConf()
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
) # Use Iceberg with Spark
.set("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
.set(
"spark.sql.catalog.lakehouse.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"
)
.set("spark.sql.catalog.lakehouse.warehouse", S3_WAREHOUSE)
.set("spark.sql.catalog.lakehouse.s3.path-style-access", S3_PATH_STYLE_ACCESS)
.set(
"spark.sql.catalog.lakehouse.s3.endpoint",
S3_HTTP_ENDPOINT,
)
.set("spark.sql.defaultCatalog", "lakehouse") # Name of the Iceberg catalog
.set("spark.sql.catalogImplementation", "in-memory")
.set("spark.sql.catalog.lakehouse.type", "hive") # Iceberg catalog type
.set("spark.sql.catalog.lakehouse.uri", METASTORE_URI)
.set("spark.executor.heartbeatInterval", "300000")
.set("spark.network.timeout", "400000")
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")
load_config(spark.sparkContext)
return spark, loggerDemo code: etl/main_iceberg.py
import os
from utils import get_spark
spark, logger = get_spark(job_name="main_spark")
DATA_PATH = os.getenv("DATA_PATH", "s3a://lakehouse/raw/yellow_tripdata/")
df = spark.read.option("mergeSchema", "true").parquet(DATA_PATH)
create_schema_df = spark.sql("CREATE DATABASE IF NOT EXISTS raw ")
create_schema_df.show()
# Create Iceberg table "nyc.taxis_large" from RDD
# df.write.mode("overwrite").saveAsTable("raw.taxis_spark")
df.writeTo("raw.taxis_spark").tableProperty(
"write.format.default", "parquet"
).partitionedBy("y", "m").createOrReplace()
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM raw.taxis_spark")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")Base image (viet1846/spark-iceberg:3.4.1) Dockerfile: Dockerfile.spark-iceberg
FROM apache/spark:3.4.1-scala2.12-java11-python3-ubuntu
USER root
ENV SPARK_VERSION_SHORT=3.4
ENV SPARK_VERSION=3.4.2
ENV AWS_SDK_VERSION=1.12.262
ENV HADOOP_AWS_VERSION=3.3.4
# Configure SPARK
RUN apt-get update -y && apt-get install -y curl wget
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_AWS_VERSION}/hadoop-aws-${HADOOP_AWS_VERSION}.jar -o ${SPARK_HOME}/jars/hadoop-aws-${HADOOP_AWS_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/${SPARK_VERSION}/spark-hadoop-cloud_2.12-3.3.0.jar -o ${SPARK_HOME}/jars/spark-hadoop-cloud_2.12-${SPARK_VERSION}.jar
# Configure ICEBERG
ENV ICEBERG_VERSION=1.5.0
ENV AWS_SDK_BUNDLE_VERSION=2.20.18
RUN curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${SPARK_VERSION_SHORT}_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-${SPARK_VERSION_SHORT}_2.12-${ICEBERG_VERSION}.jar -o ${SPARK_HOME}/jars/iceberg-spark-runtime-${SPARK_VERSION_SHORT}_2.12-${ICEBERG_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${AWS_SDK_BUNDLE_VERSION}/bundle-${AWS_SDK_BUNDLE_VERSION}.jar -Lo /opt/spark/jars/aws-bundle-${AWS_SDK_BUNDLE_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/${AWS_SDK_BUNDLE_VERSION}/url-connection-client-${AWS_SDK_BUNDLE_VERSION}.jar -Lo /opt/spark/jars/url-connection-client-${AWS_SDK_BUNDLE_VERSION}.jar
# Configure PYTHON
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATHApplication image (viet1846/spark-lakehouse:v1) Dockerfile: Dockerfile
FROM viet1846/spark-iceberg:3.4.1
ENV PYTHONPATH=/app:$PYTHONPATH
WORKDIR /app
COPY src/ /app/Spark Iceberg demo job: jobs/main_iceberg.yaml
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-iceberg
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "viet1846/spark-lakehouse:v1"
imagePullPolicy: Always
mainApplicationFile: local:///app/etl/main_iceberg.py
sparkVersion: "3.4.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.4.1
serviceAccount: spark-operator-spark
env:
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: S3_ENDPOINT
value: minio.minio.svc.cluster.local:9000
- name: S3_SSL_ENABLE
value: "false"
- name: S3_PATH_STYLE_ACCESS
value: "true"
- name: S3_WAREHOUSE
value: "s3a://lakehouse/"
- name: METASTORE_URI
value: "thrift://hive-metastore.metastore.svc.cluster.local:9083"
- name: DATA_PATH
value: "s3a://lakehouse/raw/yellow_tripdata/"
executor:
cores: 1
instances: 3
memory: "2048m"
labels:
version: 3.4.1
env:
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: S3_ENDPOINT
value: minio.minio.svc.cluster.local:9000
- name: S3_SSL_ENABLE
value: "false"
- name: S3_PATH_STYLE_ACCESS
value: "true"
- name: S3_WAREHOUSE
value: "s3a://lakehouse/"
- name: METASTORE_URI
value: "thrift://hive-metastore.metastore.svc.cluster.local:9083"
- name: DATA_PATH
value: "s3a://lakehouse/raw/yellow_tripdata/"Build Base images
> docker build -t viet1846/spark-iceberg:3.4.1 -f Dockerfile.spark-iceberg .
> docker push viet1846/spark-iceberg:3.4.1Build application images
> docker build -t viet1846/spark-lakehouse:v1 -f Dockerfile .
> docker push viet1846/spark-lakehouse:v15. Submit Spark Iceberg demo job
> docker pull viet1846/spark-lakehouse:v1
> kind load docker-image viet1846/spark-lakehouse:v1 --name dev
> kubectl apply -f jobs/main_iceberg.yaml
> kubectl -n spark-operator logs -f spark-iceberg-driver6. Using Trino query Iceberg table create by Spark
> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> show schemas from lakehouse;
trino> show tables from lakehouse.raw;
trino> select * from lakehouse.raw.taxis_spark limit 10;7. Destroy the Kind cluster
> kind delete cluster --name dev