[Kubernetes Data Platform][Part 5.2][Main Components]: Apache Spark with Spark Connect Server
In Part 5.1, I discussed how to deploy SparkApplications on a Kubernetes environment using Spark Operator. This involved integrating Spark with Hive Metastore, MinIO, and Iceberg.
The development process for SparkApplications using Spark Operator involves the following steps:
- Develop SparkApplications according to business requirements.
- Write a Manifest file describing the SparkApplications.
- Build a dev Docker image.
- Submit SparkApplications through Spark Operator for testing.
- Repeat steps 1–4 until the application passes.
- Merge code and build a production image.
- Schedule SparkApplications to run (daily, hourly, etc.) using data orchestration tools.
To ensure a smooth workflow, it’s crucial to set up CI/CD pipelines. This multi-step process can be quite complex, making Spark less appealing.
However, Spark 3.4 introduced a promising new feature called “Spark Connect,” which simplifies Spark development.
Spark Connect introduces a decoupled client-server architecture, enabling remote connectivity to Spark clusters using the DataFrame API and unresolved logical plans as the protocol. This separation allows Spark and its open ecosystem to be leveraged from anywhere, including modern data applications, IDEs, Notebooks, and programming languages.
High-level Spark Connect Architecture
Spark Connect defines a protocol for client applications to communicate with a remote Spark Server. Clients implementing this protocol can connect and make requests to remote Spark Servers, similar to how client applications connect to databases using a JDBC driver. A query like spark.table("some_table").limit(5) should simply return the result. This architecture provides an excellent developer experience.
- A connection is established between the Client and Spark Server.
- The Client converts a DataFrame query to an unresolved logical plan, describing the operation’s intent rather than its execution.
- The unresolved logical plan is encoded and sent to the Spark Server.
- The Spark Server optimizes and runs the query.
- The Spark Server sends the results back to the Client.
Combining Spark Connect and Spark Operator
A skilled engineer must carefully consider resource utilization. Many real-world problems revolve around optimizing resource usage.
So, how do Spark Connect and Spark Operator manage resources?
Spark Operator: When submitting a SparkApplication, Spark Operator uses the Manifest file to initialize Driver and Executor pods. After execution, these pods are terminated to free up resources, leading to better resource management.
Spark Connect: A dedicated resource pool must be allocated on the server side to handle incoming client requests.
Conclusion:
Spark Operator offers better resource management but a more complex development process. Spark Connect provides a great developer experience but suboptimal resource management.
I propose combining both approaches:
- Use Spark Connect for development and integration with tools like Jupyter Notebook (for data scientists).
- Employ Spark Operator for efficient resource management in production environments.
This approach simplifies SparkApplications development while maintaining efficient resource utilization.
DEPLOYMENT STEPS
To deploy Spark Connect on Kubernetes, follow these steps:
- Build sparglim-server (viet1846/sparglim-server) images
- Sparglim is an open-source tool on GitHub that makes it easier to deploy Spark Connect Server on Kubernetes!
- However, by default, Sparglim does not have configurations integrated with Hive Metastore, MinIO, and Iceberg ⇒ We need to add some configurations related to Hive Metastore, MinIO, and Iceberg to Sparglim. For details, you can see the file https://github.com/viethqb/data-platform-notes/blob/main/spark-connect-server/sparglim/config/configer.py
- Build sparglim-server images using Dockerfile: https://github.com/viethqb/data-platform-notes/blob/main/spark-connect-server/Dockerfile.sparglim-server
- Initialize a Kubernetes cluster with Kind.
- Install Nginx Ingress Controller, MinIO, Hive Metastore, and Trino on Kubernetes. (This scene is familiar. We’ve seen it somewhere before.)
- Install Spark Connect Server
- Test Spark Connect Server
- Using Trino query Iceberg table created by Spark
- Destroy the Kind cluster.
HANDS-ON STEP
Reference Repository: https://github.com/viethqb/data-platform-notes/tree/main/spark-connect-server
1. Build sparglim-server (viet1846/sparglim-server) images
Dockerfile.sparglim-server
FROM python:3.10.12-slim-bookworm as builder
RUN pip install build twine hatch
COPY . /source
WORKDIR /source
RUN python -m build
FROM viet1846/spark-lakehouse:v1
COPY --from=builder --chown=executor:executor /source/dist/*.whl /tmp/
RUN for f in $(echo /tmp/*.whl); do pip install --no-cache-dir $f; done
ENTRYPOINT [ "tini" ,"--"]
CMD [ "sparglim-server", "start" ]
# docker buildx build --platform linux/amd64,linux/arm64/v8 -t wh1isper/sparglim-server:latest -f docker/Dockerfile.sparglim-server --push .Build sparglim-server images
> cd ~/Documents
> git clone https://github.com/viethqb/data-platform-notes.git
> cd data-platform-notes/spark-connect-server
> docker build -t viet1846/sparglim-server -f Dockerfile.sparglim-server .
> docker push viet1846/sparglim-server2. Initialize a Kubernetes cluster with Kind
> cd deployment
> kind create cluster --name dev --config ./kind-config.yaml
> kubectl get no -owide 3. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes
Install Nginx Ingress Controller
> helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
> helm repo update
> helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx --set controller.hostNetwork=true,controller.service.type="",controller.kind=DaemonSet --namespace ingress-nginx --version 4.10.1 --create-namespace --debug
> kubectl -n ingress-nginx get po -owideInstall MinIO
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install minio -n minio -f ./minio/minio-values.yaml bitnami/minio --create-namespace --debug --version 14.6.0
> kubectl -n minio get po
> kubectl get no -owide
# NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
# dev-control-plane Ready control-plane 3m53s v1.30.0 172.18.0.2 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker Ready <none> 3m11s v1.30.0 172.18.0.5 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker2 Ready <none> 3m12s v1.30.0 172.18.0.3 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker3 Ready <none> 3m11s v1.30.0 172.18.0.4 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15# Add the following lines to the end of the /etc/hosts
# 172.18.0.4 minio.lakehouse.localInstall Hive Metastore
# hive-metastore-postgresql
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install metastore-db -n metastore -f ./hive/hive-metastore-postgres-values.yaml bitnami/postgresql --create-namespace --debug --version 15.4.2
# Hive metastore
# docker pull rtdl/hive-metastore:3.1.2
# kind load docker-image rtdl/hive-metastore:3.1.2 --name dev
> helm upgrade --install hive-metastore -n metastore -f ./hive/hive-metastore-values.yaml ../../charts/hive-metastore --create-namespace --debugInstall Trino
> helm repo add trino https://trinodb.github.io/charts
> helm upgrade --install trino -n trino -f ./trino/trino-values.yaml trino/trino --create-namespace --debug --version 0.21.0
> kubectl -n trino get po4. Install Spark Connect Server
# docker pull viet1846/sparglim-server
# kind load docker-image viet1846/sparglim-server --name dev
> kubectl create ns sparglim
> kubectl create clusterrolebinding serviceaccounts-cluster-admin --clusterrole=cluster-admin --group=system:serviceaccounts
> kubectl apply -f spark-connect-server/
> kubectl -n sparglim logs -f deployments/sparglim-serverTo change the configuration, see file: spark-connect-server/deployment.yaml
5. Test Spark Connect Server
demo.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
import os
SPARK_CONNECT_SERVER = os.getenv("SPARK_CONNECT_SERVER", "sc://172.25.0.2:30052")
spark = SparkSession.builder.remote(SPARK_CONNECT_SERVER).getOrCreate()
df = spark.createDataFrame(
[
Row(a=1, b=2.0, c="string1", d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3.0, c="string2", d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5.0, c="string3", d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)),
]
)
df.show()
create_schema_df = spark.sql("CREATE DATABASE IF NOT EXISTS raw ")
df.writeTo("raw.demo").tableProperty(
"write.format.default", "parquet"
).createOrReplace()> python3 -m venv .venv
> source .venv/bin/activate
(.venv)> pip3 install pyspark==3.4.1 grpcio-status pandas pyarrow
(.venv)> export SPARK_CONNECT_SERVER="sc://172.18.0.4:30052"
(.venv)> python3 demo.py
(.venv)> deactivate6. Using Trino query Iceberg table created by Spark
> kubectl -n trino exec -it deployments/trino-coordinator trino
trino> show schemas from lakehouse;
trino> show tables from lakehouse.raw;
trino> select * from lakehouse.raw.demo;7. Destroy the Kind cluster
> kind delete cluster --name dev