[Kubernetes Data Platform][Part 9][Real-time Components]: Apache Kafka, Kafka connect and Debezium
These are well-known technologies, so I’ll provide a brief overview:
- Apache Kafka is an open-source distributed event streaming platform used by thousands of organizations for high-performance data pipelines, streaming analytics, data integration, and critical applications.
- Kafka Connect specializes in streaming data to and from Kafka, simplifying the creation of high-quality, reliable, and efficient connector plugins. Kafka Connect also offers guarantees that are challenging to achieve with other frameworks.
- Debezium is an open-source distributed platform for capturing data changes. Once set up and connected to your databases, your applications can react to all inserts, updates, and deletes made to those databases. Debezium is robust and fast, ensuring your applications can respond promptly without missing any events, even in case of failures.
The architecture is as follows:
In this article, I will use Strimzi to deploy a Kafka cluster and Kafka Connect on a Kubernetes environment. Strimzi provides a method to run an Apache Kafka cluster on Kubernetes or OpenShift in various deployment configurations.
DEPLOYMENT STEPS
1. Initialize a Kubernetes cluster with Kind.
2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes
3. Install data sources:
- Install MySQL on Kubernetes.
- Install PostgreSQL on Kubernetes.
4. Install Kafka
- Install Strimzi Operator
- Create Kafka Cluster
- Create Kafka topic
5. Test Produce message to user topic using Python
6. Install kafka connect
- Build kafka connect image
- Create kafka connect cluster
7. Config Amazon S3 Sink connector for user topic
8. Config Debezium MySQL connector & Config Amazon S3 Sink connector for mysql topic
9. Config Debezium PostgreSQL connector & Config Amazon S3 Sink connector for PostgreSQL topic
10. Destroy the Kind cluster
HANDS-ON STEP
Reference Repository: https://github.com/viethqb/data-platform-notes/tree/main/kafka
1. Initialize a Kubernetes cluster with Kind.
> cd ~/Documents
> git clone https://github.com/viethqb/data-platform-notes.git
> cd data-platform-notes/kafka
> kind create cluster --name dev --config deployment/kind/kind-config.yaml2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes
Install Nginx Ingress Controller
> helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
> helm repo update
> helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx --set controller.hostNetwork=true,controller.service.type="",controller.kind=DaemonSet --namespace ingress-nginx --version 4.10.1 --create-namespace --debug
> kubectl -n ingress-nginx get po -owideInstall MinIO
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install minio -n minio -f deployment/minio/minio-values.yaml bitnami/minio --create-namespace --debug --version 14.6.0
> kubectl -n minio get po
> kubectl get no -owide
# NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
# dev-control-plane Ready control-plane 3m53s v1.30.0 172.18.0.2 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker Ready <none> 3m11s v1.30.0 172.18.0.5 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker2 Ready <none> 3m12s v1.30.0 172.18.0.3 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker3 Ready <none> 3m11s v1.30.0 172.18.0.4 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15# Add the following lines to the end of the /etc/hosts
# 172.18.0.4 minio.lakehouse.localInstall Hive Metastore
# hive-metastore-postgresql
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install metastore-db -n metastore -f deployment/hive/hive-metastore-postgres-values.yaml bitnami/postgresql --create-namespace --debug --version 15.4.2
# Hive metastore
# docker pull rtdl/hive-metastore:3.1.2
# kind load docker-image rtdl/hive-metastore:3.1.2 --name dev
> helm upgrade --install hive-metastore -n metastore -f deployment/hive/hive-metastore-values.yaml ../../charts/hive-metastore --create-namespace --debugInstall Trino
> helm repo add trino https://trinodb.github.io/charts
> helm upgrade --install trino -n trino -f deployment/trino/trino-values.yaml trino/trino --create-namespace --debug --version 0.21.0
> kubectl -n trino get po3. Install data sources
Install MySQL on Kubernetes
> kubectl apply -f deployment/mysql/namespace.yaml
> kubectl apply -f deployment/mysql/deployment.yaml
> kubectl apply -f deployment/mysql/serrvice.yaml
# user: root / password: debeziumCheck mysql data
> kubectl -n data-source exec -it deployments/mysql bash
bash-4.4# mysql -uroot -P3306 -hmysql.data-source -p
mysql> show databases;
mysql> show tables in inventory;
mysql> select * from inventory.customers;Install PostgreSQL on Kubernetes
> kubectl apply -f deployment/postgres/postgres.yaml
# user/password: data_engineer/password
> kubectl apply -f deployment/postgres/postgresql-client.yml
> kubectl -n data-source exec -it postgresql-client sh
~ $ psql -h postgres.data-source -p 5432 -U data_engineer -d data_engineer
data_engineer=# \dt inventory.*
data_engineer=# select * from inventory.customers;4. Install Kafka
Install Strimzi Operator
> helm repo add strimzi https://strimzi.io/charts/
> helm install kafka-operator strimzi/strimzi-kafka-operator --namespace=kafka --create-namespace --debug --version 0.41.0kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-kafka-cluster
namespace: kafka
spec:
kafka:
version: 3.7.0
replicas: 3
listeners:
- name: plain
port: 9092
type: nodeport
tls: false
configuration:
bootstrap:
nodePort: 32100
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.7"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}Create Kafka Cluster
> kubectl apply -f deployment/kafka/kafka-cluster.yamlCreate Kafka Topic
Create topic for kafka connect
> kubectl apply -f deployment/kafka/connect-configs-topic.yaml
> kubectl apply -f deployment/kafka/connect-offsets-topic.yaml
> kubectl apply -f deployment/kafka/connect-status-topic.yamlList all topic
> kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092Using nodeport
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server 172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:321005. Produce message to user topic using Python
src/producer.py
from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time
import os
BOOTSTRAP_SERVERS = os.getenv(
"BOOTSTRAP_SERVERS", "172.25.0.2:32100,172.25.0.3:32100,172.25.0.4:32100"
)
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
faker = Faker()
def get_register():
return {"name": faker.name(), "add": faker.year()}
def get_partitioner(key_bytes, all_partitions, available_partitions):
return 0
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS, # server name
value_serializer=json_serializer, # function callable
# partitioner = get_partitioner, # function return 0 >>> only partition_0 can received messages
)
while 1 == 1:
user = get_register()
# print(user)
producer.send("users", user)
time.sleep(1)# Python 3.10.14
> python3.10 -m venv .venv
> source .venv/bin/activate
> pip install -r src/requirements.txt
> export BOOTSTRAP_SERVERS="172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:32100"
> python3 src/producer.py
> kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
> bin/kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:321006. Install kafka connect
Dockerfile
FROM confluentinc/cp-kafka-connect:7.6.1 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.12
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.6.1
RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.6.18
RUN confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.5.4
RUN confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.5.4
RUN confluent-hub install --no-prompt debezium/debezium-connector-mongodb:2.4.2
FROM quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
USER root:root
# Add S3 dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/ /opt/kafka/plugins/kafka-connect-s3/
# Add Avro Converter dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/ /opt/kafka/plugins/confluentinc-kafka-connect-avro-converter/
# Add Iceberg dependency
COPY --from=cp /usr/share/confluent-hub-components/tabular-iceberg-kafka-connect/ /opt/kafka/plugins/tabular-iceberg-kafka-connect/
# Add debezium-connector-mysql
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-mysql/ /opt/kafka/plugins/debezium-debezium-connector-mysql/
# Add debezium-connector-postgresql
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/ /opt/kafka/plugins/debezium-debezium-connector-postgresql/
# Add debezium-connector-sqlserver
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-sqlserver/ /opt/kafka/plugins/debezium-debezium-connector-sqlserver/
# Add debezium-connector-mongodb
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-mongodb/ /opt/kafka/plugins/debezium-debezium-connector-mongodb/Build kafka connect image
# Replace viet1846 by your repository
> docker build -t viet1846/kafka-connect:0.41.0 -f deployment/kafka/Dockerfile deployment/kafka
> docker push viet1846/kafka-connect:0.41.0connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: viet1846/kafka-connect:0.41.0
version: 3.7.0
replicas: 3
bootstrapServers: my-kafka-cluster-kafka-plain-bootstrap:9092
# tls:
# trustedCertificates:
# - secretName: my-kafka-cluster-cluster-ca-cert
# certificate: ca.crt
config:
bootstrap.servers: my-kafka-cluster-kafka-plain-bootstrap:9092
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
internal.key.converter: org.apache.kafka.connect.json.JsonConverter
internal.value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-configs
config.storage.replication.factor: 1
status.storage.topic: connect-status
status.storage.replication.factor: 1
offset.flush.interval.ms: 10000
plugin.path: /opt/kafka/plugins
offset.storage.file.filename: /tmp/connect.offsets
template:
connectContainer:
env:
- name: AWS_ACCESS_KEY_ID
value: "admin"
- name: AWS_SECRET_ACCESS_KEY
value: "password"Create kafka connect cluster
kubectl apply -f deployment/kafka/connect.yaml7. Config Amazon S3 Sink connector for user topic
kubectl apply -f deployment/kafka/sink-user-topic-to-s3-connector.yamlProduce message to users topic using Python
Check Minio file
Trino
trino> CREATE SCHEMA lakehouse.raw WITH (location = 's3a://lakehouse/raw.db/');
trino> CREATE TABLE IF NOT EXISTS minio.raw.python_users(
json_string varchar,
year varchar,
month varchar,
day varchar,
hour varchar
)WITH
(
format = 'TEXTFILE',
partitioned_by = ARRAY[ 'year', 'month', 'day', 'hour' ],
external_location = 's3a://kafka/topics/users'
);
trino> CALL minio.system.sync_partition_metadata('raw', 'mysql_customers', 'FULL');
trino> select
cast(json_extract(json_string, '$.add') as int) as add,
cast(json_extract(json_string, '$.name') as varchar) as name,
cast(json_extract(json_string, '$.message_ts') as varchar) as message_ts
from minio.raw.python_users
limit 5;8. Config Debezium MySQL connector & Config Amazon S3 Sink connector for mysql topic
debezium-connector-mysql.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
namespace: kafka
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql.data-source
database.port: 3306
database.user: root
database.password: debezium
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: my-kafka-cluster-kafka-plain-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventoryConfig Mysql CDC source connector
kubectl apply -f deployment/kafka/debezium-connector-mysql.yamlUpdate Mysql
> kubectl -n data-source exec -it deployments/mysql bash
bash-4.4# mysql -uroot -P3306 -hmysql.data-source -p
mysql> update inventory.customers set first_name='Sally updated' where id=1001;
mysql> insert into inventory.customers values (1005, 'Viet', 'Hoang', '[email protected]');
mysql> delete from inventory.customers where id=1005;Check kafka topic
> kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-console-consumer.sh --topic mysql.inventory.customers --from-beginning --bootstrap-server 172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:32100sink-mysql-kafka-topic-to-s3-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "sink-mysql-kafka-topic-to-s3-connector"
namespace: "kafka"
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
task.max: "1"
topics: mysql.inventory.addresses, mysql.inventory.customers, mysql.inventory.geom, mysql.inventory.orders, mysql.inventory.products, mysql.inventory.products_on_hand
s3.region: us-east-1
s3.bucket.name: kafka
s3.part.size: "5242880"
flush.size: 1
store.url: <http://minio.minio.svc.cluster.local:9000>
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class: "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator"
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
transforms: "insertTS,formatTS"
transforms.insertTS.type: "org.apache.kafka.connect.transforms.InsertField$Value"
transforms.insertTS.timestamp.field: "message_ts"
transforms.formatTS.type: "org.apache.kafka.connect.transforms.TimestampConverter$Value"
transforms.formatTS.format: "yyyy-MM-dd HH:mm:ss:SSS"
transforms.formatTS.field: "message_ts"
transforms.formatTS.target.type: "string"
behavior.on.null.values: ignore
partition.duration.ms: 60000
path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"
locale: "vi-VN"
timezone: "Asia/Ho_Chi_Minh"Config Amazon S3 Sink connector for mysql topic
kubectl apply -f deployment/kafka/sink-mysql-kafka-topic-to-s3-connector.yamlCheck MinIO
9. Config Debezium PostgreSQL connector & Config Amazon S3 Sink connector for PostgreSQL topic
debezium-connector-postgres.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-postgres
namespace: kafka
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
tasks.max: 1
plugin.name: pgoutput
database.hostname: postgres.data-source
database.port: 5432
database.user: "data_engineer"
database.password: "password"
database.dbname: "data_engineer"
database.server.id: 184055
topic.prefix: postgres
schema.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: my-kafka-cluster-kafka-plain-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventoryConfig Debezium PostgreSQL connector
kubectl apply -f deployment/kafka/debezium-connector-postgres.yamlConfig Amazon S3 Sink connector for PostgreSQL topic
kubectl apply -f deployment/kafka/sink-postgres-kafka-topic-to-s3-connector.yamlUpdate Postgres
k -n kafka exec -it postgresql-client sh
data_engineer=# \\dt inventory.*
# List of relations
# Schema | Name | Type | Owner
# -----------+------------------+-------+---------------
# inventory | customers | table | data_engineer
# inventory | geom | table | data_engineer
# inventory | orders | table | data_engineer
# inventory | products | table | data_engineer
# inventory | products_on_hand | table | data_engineer
# inventory | spatial_ref_sys | table | data_engineer
# (6 rows)
update inventory.customers set first_name='Sally Marie' where id=1001;Check Kafka topic
kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic postgres.inventory.customers --from-beginningCheck MinIO
10. Destroy the Kind cluster
kind delete cluster -n dev