Last chance! 7 days left!

Click for a free read!

[Kubernetes Data Platform][Part 5.1][Main Components]: Apache Spark with Kubernetes Spark Operator

Viet_1846

8 min read5 days ago

To be honest, I don’t like Apache Spark, not because of its features or performance, but because of its deployment, configuration, and dependencies. There are many ways to deploy, but today I will present how to use Spark Operator, a custom resource of Kubernetes.

The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications.

In simple terms, after installing Spark Operator, you will have a new resource type called SparkApplication on your Kubernetes cluster. You can then configure a manifest similar to other resource types and run kubectl to submit a Spark job.

The Kubernetes Operator for Apache Spark currently supports the following list of features:

  • Supports Spark 2.3 and up.
  • Enables declarative application specification and management of applications through custom resources.
  • Automatically runs spark-submit on behalf of users for each SparkApplication eligible for submission.
  • Provides native cron support for running scheduled applications.
  • Supports customization of Spark pods beyond what Spark natively is able to do through the mutating admission webhook, e.g., mounting ConfigMaps and volumes, and setting pod affinity/anti-affinity.
  • Supports automatic application re-submission for updated SparkApplication objects with updated specification.
  • Supports automatic application restart with a configurable restart policy.
  • Supports automatic retries of failed submissions with optional linear back-off.
  • Supports mounting local Hadoop configuration as a Kubernetes ConfigMap automatically via sparkctl.
  • Supports automatically staging local application dependencies to Google Cloud Storage (GCS) via sparkctl.
  • Supports collecting and exporting application-level metrics and driver/executor metrics to Prometheus.

The architecture is as follows:

DEPLOYMENT STEPS

1. Initialize a Kubernetes cluster with Kind.

2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes.

  • This step will be repeated in subsequent labs. Practicing multiple times will help you remember better.

3. Install Spark Operator and Test with the spark_pi application

4. Integrate Spark with Iceberg and Hive Metastore:

  • Build Base images
  • Build application images

5. Submit main_iceberg job

6. Using Trino query Iceberg table create by Spark

7. Destroy the Kind cluster.

HANDS-ON STEP

Reference Repository: https://github.com/viethqb/data-platform-notes/tree/main/spark

1. Initialize a Kubernetes cluster with Kind

> cd  ~/Documents 
> git clone https://github.com/viethqb/data-platform-notes.git
> cd data-platform-notes/spark
> kind create cluster --name dev --config deployment/kind/kind-config.yaml
> kubectl get no -owide

2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes

Install Nginx Ingress Controller

> helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
> helm repo update
> helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx --set controller.hostNetwork=true,controller.service.type="",controller.kind=DaemonSet --namespace ingress-nginx --version 4.10.1 --create-namespace --debug
> kubectl -n ingress-nginx get po -owide

Install MinIO

> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install minio -n minio -f deployment/minio/minio-values.yaml bitnami/minio --create-namespace --debug --version 14.6.0
> kubectl -n minio get po
> kubectl get no -owide
# NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
# dev-control-plane Ready control-plane 3m53s v1.30.0 172.18.0.2 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker Ready <none> 3m11s v1.30.0 172.18.0.5 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker2 Ready <none> 3m12s v1.30.0 172.18.0.3 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker3 Ready <none> 3m11s v1.30.0 172.18.0.4 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15

Add the following lines to the end of the /etc/hosts

172.18.0.4 minio.lakehouse.local

Download and Upload file https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-01.parquet to s3a://lakehouse/raw/yellow_tripdata/y=2009/m=01/yellow_tripdata_2009–01.parquet

Install Hive Metastore

# hive-metastore-postgresql
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install metastore-db -n metastore -f deployment/hive/hive-metastore-postgres-values.yaml bitnami/postgresql --create-namespace --debug --version 15.4.2

# Hive metastore
# docker pull rtdl/hive-metastore:3.1.2
# kind load docker-image rtdl/hive-metastore:3.1.2 --name dev
> helm upgrade --install hive-metastore -n metastore -f deployment/hive/hive-metastore-values.yaml ../charts/hive-metastore --create-namespace --debug

Install Trino

> helm repo add trino https://trinodb.github.io/charts
> helm upgrade --install trino -n trino -f deployment/trino/trino-values.yaml trino/trino --create-namespace --debug --version 0.21.0

> kubectl -n trino get po

3. Install Spark Operator and Test with the spark_pi application

Install Spark Operator

> helm repo add spark-operator https://kubeflow.github.io/spark-operator 
> helm repo update
> helm upgrade --install spark-operator spark-operator/spark-operator --namespace spark-operator --set webhook.enable=true --set image.tag=v1beta2-1.4.6-3.5.0 --create-namespace --debug --version 1.3.2
> kubectl -n spark-operator get po

Test with the spark_pi application

spark_pi.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
# name: pyspark-pi-{{ ts_nodash|lower }}-{{ task_instance.try_number }}
name: pyspark-pi
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "viet1846/spark-py:v3.5.1"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.5.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.1
serviceAccount: spark-operator-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.1

Spark submit job

# docker pull viet1846/spark-py:v3.5.1
# kind load docker-image viet1846/spark-py:v3.5.1 --name dev
> kubectl apply -f jobs/spark_pi.yaml
> kubectl -n spark-operator get po -w
> kubectl -n spark-operator logs -f pyspark-pi-driver

Get sparkapplications resource:

> kubectl -n spark-operator get sparkapplications
> kubectl -n spark-operator describe sparkapplications pyspark-pi

4. Integrate Spark with Iceberg and Hive Metastore

Environment config: src/config.py

import os

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "admin")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "password")
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "minio.minio.svc.cluster.local:9000")
S3_SSL_ENABLE = os.getenv("S3_SSL_ENABLE", "false")
S3_PATH_STYLE_ACCESS = os.getenv("S3_PATH_STYLE_ACCESS", "true")
S3_ATTEMPTS_MAXIMUM = os.getenv("S3_ATTEMPTS_MAXIMUM", "1")
S3_CONNECTION_ESTABLISH_TIMEOUT = os.getenv("S3_CONNECTION_ESTABLISH_TIMEOUT", "5000")
S3_CONNECTION_TIMEOUT = os.getenv("S3_CONNECTION_TIMEOUT", "10000")

S3_WAREHOUSE = os.getenv("S3_WAREHOUSE", "s3a://lakehouse/")
S3_HTTP_ENDPOINT = (
f"https://{S3_ENDPOINT}" if S3_SSL_ENABLE == "true" else f"http://{S3_ENDPOINT}"
)
METASTORE_URI = os.getenv(
"METASTORE_URI", "thrift://hive-metastore.metastore.svc.cluster.local:9083"
)

Spark config with Hive Metastore, Minio and Iceberg: src/utils.py

import logging
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from config import *


def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY
)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.ssl.enabled", S3_SSL_ENABLE
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.path.style.access", S3_PATH_STYLE_ACCESS
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.attempts.maximum", S3_ATTEMPTS_MAXIMUM
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.establish.timeout", S3_CONNECTION_ESTABLISH_TIMEOUT
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.timeout", S3_CONNECTION_TIMEOUT
)


def get_spark(
job_name,
):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(job_name)

# adding iceberg configs
conf = (
SparkConf()
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
) # Use Iceberg with Spark
.set("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
.set(
"spark.sql.catalog.lakehouse.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"
)
.set("spark.sql.catalog.lakehouse.warehouse", S3_WAREHOUSE)
.set("spark.sql.catalog.lakehouse.s3.path-style-access", S3_PATH_STYLE_ACCESS)
.set(
"spark.sql.catalog.lakehouse.s3.endpoint",
S3_HTTP_ENDPOINT,
)
.set("spark.sql.defaultCatalog", "lakehouse") # Name of the Iceberg catalog
.set("spark.sql.catalogImplementation", "in-memory")
.set("spark.sql.catalog.lakehouse.type", "hive") # Iceberg catalog type
.set("spark.sql.catalog.lakehouse.uri", METASTORE_URI)
.set("spark.executor.heartbeatInterval", "300000")
.set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")
load_config(spark.sparkContext)

return spark, logger

Demo code: etl/main_iceberg.py

import os
from utils import get_spark

spark, logger = get_spark(job_name="main_spark")

DATA_PATH = os.getenv("DATA_PATH", "s3a://lakehouse/raw/yellow_tripdata/")
df = spark.read.option("mergeSchema", "true").parquet(DATA_PATH)

create_schema_df = spark.sql("CREATE DATABASE IF NOT EXISTS raw ")
create_schema_df.show()
# Create Iceberg table "nyc.taxis_large" from RDD
# df.write.mode("overwrite").saveAsTable("raw.taxis_spark")
df.writeTo("raw.taxis_spark").tableProperty(
"write.format.default", "parquet"
).partitionedBy("y", "m").createOrReplace()
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM raw.taxis_spark")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

Base image (viet1846/spark-iceberg:3.4.1) Dockerfile: Dockerfile.spark-iceberg

FROM apache/spark:3.4.1-scala2.12-java11-python3-ubuntu

USER root

ENV SPARK_VERSION_SHORT=3.4
ENV SPARK_VERSION=3.4.2
ENV AWS_SDK_VERSION=1.12.262
ENV HADOOP_AWS_VERSION=3.3.4

# Configure SPARK
RUN apt-get update -y && apt-get install -y curl wget
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_AWS_VERSION}/hadoop-aws-${HADOOP_AWS_VERSION}.jar -o ${SPARK_HOME}/jars/hadoop-aws-${HADOOP_AWS_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/${SPARK_VERSION}/spark-hadoop-cloud_2.12-3.3.0.jar -o ${SPARK_HOME}/jars/spark-hadoop-cloud_2.12-${SPARK_VERSION}.jar

# Configure ICEBERG
ENV ICEBERG_VERSION=1.5.0
ENV AWS_SDK_BUNDLE_VERSION=2.20.18

RUN curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${SPARK_VERSION_SHORT}_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-${SPARK_VERSION_SHORT}_2.12-${ICEBERG_VERSION}.jar -o ${SPARK_HOME}/jars/iceberg-spark-runtime-${SPARK_VERSION_SHORT}_2.12-${ICEBERG_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${AWS_SDK_BUNDLE_VERSION}/bundle-${AWS_SDK_BUNDLE_VERSION}.jar -Lo /opt/spark/jars/aws-bundle-${AWS_SDK_BUNDLE_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/${AWS_SDK_BUNDLE_VERSION}/url-connection-client-${AWS_SDK_BUNDLE_VERSION}.jar -Lo /opt/spark/jars/url-connection-client-${AWS_SDK_BUNDLE_VERSION}.jar

# Configure PYTHON
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH

Application image (viet1846/spark-lakehouse:v1) Dockerfile: Dockerfile

FROM viet1846/spark-iceberg:3.4.1

ENV PYTHONPATH=/app:$PYTHONPATH


WORKDIR /app
COPY src/ /app/

Spark Iceberg demo job: jobs/main_iceberg.yaml

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-iceberg
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "viet1846/spark-lakehouse:v1"
imagePullPolicy: Always
mainApplicationFile: local:///app/etl/main_iceberg.py
sparkVersion: "3.4.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.4.1
serviceAccount: spark-operator-spark
env:
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: S3_ENDPOINT
value: minio.minio.svc.cluster.local:9000
- name: S3_SSL_ENABLE
value: "false"
- name: S3_PATH_STYLE_ACCESS
value: "true"
- name: S3_WAREHOUSE
value: "s3a://lakehouse/"
- name: METASTORE_URI
value: "thrift://hive-metastore.metastore.svc.cluster.local:9083"
- name: DATA_PATH
value: "s3a://lakehouse/raw/yellow_tripdata/"

executor:
cores: 1
instances: 3
memory: "2048m"
labels:
version: 3.4.1
env:
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: S3_ENDPOINT
value: minio.minio.svc.cluster.local:9000
- name: S3_SSL_ENABLE
value: "false"
- name: S3_PATH_STYLE_ACCESS
value: "true"
- name: S3_WAREHOUSE
value: "s3a://lakehouse/"
- name: METASTORE_URI
value: "thrift://hive-metastore.metastore.svc.cluster.local:9083"
- name: DATA_PATH
value: "s3a://lakehouse/raw/yellow_tripdata/"

Build Base images

> docker build -t viet1846/spark-iceberg:3.4.1 -f Dockerfile.spark-iceberg .
> docker push viet1846/spark-iceberg:3.4.1

Build application images

> docker build -t viet1846/spark-lakehouse:v1 -f Dockerfile .
> docker push viet1846/spark-lakehouse:v1

5. Submit Spark Iceberg demo job

> docker pull viet1846/spark-lakehouse:v1
> kind load docker-image viet1846/spark-lakehouse:v1 --name dev
> kubectl apply -f jobs/main_iceberg.yaml
> kubectl -n spark-operator logs -f spark-iceberg-driver

6. Using Trino query Iceberg table create by Spark

> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> show schemas from lakehouse;
trino> show tables from lakehouse.raw;
trino> select * from lakehouse.raw.taxis_spark limit 10;

7. Destroy the Kind cluster

> kind delete cluster --name dev