Last chance! 7 days left!

Click for a free read!

[Kubernetes Data Platform][Part 9][Real-time Components]: Apache Kafka, Kafka connect and Debezium

Viet_1846

10 min read3 days ago

These are well-known technologies, so I’ll provide a brief overview:

  • Apache Kafka is an open-source distributed event streaming platform used by thousands of organizations for high-performance data pipelines, streaming analytics, data integration, and critical applications.
  • Kafka Connect specializes in streaming data to and from Kafka, simplifying the creation of high-quality, reliable, and efficient connector plugins. Kafka Connect also offers guarantees that are challenging to achieve with other frameworks.
  • Debezium is an open-source distributed platform for capturing data changes. Once set up and connected to your databases, your applications can react to all inserts, updates, and deletes made to those databases. Debezium is robust and fast, ensuring your applications can respond promptly without missing any events, even in case of failures.

The architecture is as follows:

Apache Kafka, Kafka connect and Debezium architecture

In this article, I will use Strimzi to deploy a Kafka cluster and Kafka Connect on a Kubernetes environment. Strimzi provides a method to run an Apache Kafka cluster on Kubernetes or OpenShift in various deployment configurations.

DEPLOYMENT STEPS

1. Initialize a Kubernetes cluster with Kind.

2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes

3. Install data sources:

  • Install MySQL on Kubernetes.
  • Install PostgreSQL on Kubernetes.

4. Install Kafka

  • Install Strimzi Operator
  • Create Kafka Cluster
  • Create Kafka topic

5. Test Produce message to user topic using Python

6. Install kafka connect

  • Build kafka connect image
  • Create kafka connect cluster

7. Config Amazon S3 Sink connector for user topic

8. Config Debezium MySQL connector & Config Amazon S3 Sink connector for mysql topic

9. Config Debezium PostgreSQL connector & Config Amazon S3 Sink connector for PostgreSQL topic

10. Destroy the Kind cluster

HANDS-ON STEP

Reference Repository: https://github.com/viethqb/data-platform-notes/tree/main/kafka

1. Initialize a Kubernetes cluster with Kind.

> cd  ~/Documents 
> git clone https://github.com/viethqb/data-platform-notes.git
> cd data-platform-notes/kafka
> kind create cluster --name dev --config deployment/kind/kind-config.yaml

2. Install Nginx Ingress Controller, MinIO, Hive Metastore, Trino on Kubernetes

Install Nginx Ingress Controller

> helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
> helm repo update
> helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx --set controller.hostNetwork=true,controller.service.type="",controller.kind=DaemonSet --namespace ingress-nginx --version 4.10.1 --create-namespace --debug
> kubectl -n ingress-nginx get po -owide

Install MinIO

> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install minio -n minio -f deployment/minio/minio-values.yaml bitnami/minio --create-namespace --debug --version 14.6.0
> kubectl -n minio get po
> kubectl get no -owide
# NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
# dev-control-plane Ready control-plane 3m53s v1.30.0 172.18.0.2 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker Ready <none> 3m11s v1.30.0 172.18.0.5 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker2 Ready <none> 3m12s v1.30.0 172.18.0.3 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# dev-worker3 Ready <none> 3m11s v1.30.0 172.18.0.4 <none> Debian GNU/Linux 12 (bookworm) 6.9.9-arch1-1 containerd://1.7.15
# Add the following lines to the end of the /etc/hosts
# 172.18.0.4 minio.lakehouse.local

Install Hive Metastore

# hive-metastore-postgresql
> helm repo add bitnami https://charts.bitnami.com/bitnami
> helm repo update
> helm upgrade --install metastore-db -n metastore -f deployment/hive/hive-metastore-postgres-values.yaml bitnami/postgresql --create-namespace --debug --version 15.4.2

#
Hive metastore
# docker pull rtdl/hive-metastore:3.1.2
# kind load docker-image rtdl/hive-metastore:3.1.2 --name dev
> helm upgrade --install hive-metastore -n metastore -f deployment/hive/hive-metastore-values.yaml ../../charts/hive-metastore --create-namespace --debug

Install Trino

> helm repo add trino https://trinodb.github.io/charts
> helm upgrade --install trino -n trino -f deployment/trino/trino-values.yaml trino/trino --create-namespace --debug --version 0.21.0
> kubectl -n trino get po

3. Install data sources

Install MySQL on Kubernetes

> kubectl apply -f deployment/mysql/namespace.yaml 
> kubectl apply -f deployment/mysql/deployment.yaml
> kubectl apply -f deployment/mysql/serrvice.yaml
# user: root / password: debezium

Check mysql data

> kubectl -n data-source exec -it deployments/mysql bash 
bash-4.4# mysql -uroot -P3306 -hmysql.data-source -p
mysql> show databases;
mysql> show tables in inventory;
mysql> select * from inventory.customers;

Install PostgreSQL on Kubernetes

> kubectl apply -f deployment/postgres/postgres.yaml 
# user/password: data_engineer/password
> kubectl apply -f deployment/postgres/postgresql-client.yml
> kubectl -n data-source exec -it postgresql-client sh
~ $ psql -h postgres.data-source -p 5432 -U data_engineer -d data_engineer
data_engineer=# \dt inventory.*
data_engineer=# select * from inventory.customers;

4. Install Kafka

Install Strimzi Operator

> helm repo add strimzi https://strimzi.io/charts/
> helm install kafka-operator strimzi/strimzi-kafka-operator --namespace=kafka --create-namespace --debug --version 0.41.0

kafka-cluster.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-kafka-cluster
namespace: kafka
spec:
kafka:
version: 3.7.0
replicas: 3
listeners:
- name: plain
port: 9092
type: nodeport
tls: false
configuration:
bootstrap:
nodePort: 32100
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.7"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}

Create Kafka Cluster

> kubectl apply -f deployment/kafka/kafka-cluster.yaml

Create Kafka Topic

Create topic for kafka connect

> kubectl apply -f deployment/kafka/connect-configs-topic.yaml 
> kubectl apply -f deployment/kafka/connect-offsets-topic.yaml
> kubectl apply -f deployment/kafka/connect-status-topic.yaml

List all topic

> kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash 
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Using nodeport

[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server 172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:32100

5. Produce message to user topic using Python

src/producer.py

from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time
import os

BOOTSTRAP_SERVERS = os.getenv(
"BOOTSTRAP_SERVERS", "172.25.0.2:32100,172.25.0.3:32100,172.25.0.4:32100"
)
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
faker = Faker()
def get_register():
return {"name": faker.name(), "add": faker.year()}
def get_partitioner(key_bytes, all_partitions, available_partitions):
return 0
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS, # server name
value_serializer=json_serializer, # function callable
# partitioner = get_partitioner, # function return 0 >>> only partition_0 can received messages
)
while 1 == 1:
user = get_register()
# print(user)
producer.send("users", user)
time.sleep(1)
# Python 3.10.14
> python3.10 -m venv .venv
> source .venv/bin/activate
> pip install -r src/requirements.txt
> export BOOTSTRAP_SERVERS="172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:32100"
> python3 src/producer.py

>
kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
> bin/kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:32100

6. Install kafka connect

Dockerfile

FROM confluentinc/cp-kafka-connect:7.6.1 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.12
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.6.1
RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.6.18
RUN confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.5.4
RUN confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.5.4
RUN confluent-hub install --no-prompt debezium/debezium-connector-mongodb:2.4.2
FROM quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
USER root:root
# Add S3 dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/ /opt/kafka/plugins/kafka-connect-s3/
# Add Avro Converter dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/ /opt/kafka/plugins/confluentinc-kafka-connect-avro-converter/
# Add Iceberg dependency
COPY --from=cp /usr/share/confluent-hub-components/tabular-iceberg-kafka-connect/ /opt/kafka/plugins/tabular-iceberg-kafka-connect/
# Add debezium-connector-mysql
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-mysql/ /opt/kafka/plugins/debezium-debezium-connector-mysql/
# Add debezium-connector-postgresql
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/ /opt/kafka/plugins/debezium-debezium-connector-postgresql/
# Add debezium-connector-sqlserver
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-sqlserver/ /opt/kafka/plugins/debezium-debezium-connector-sqlserver/
# Add debezium-connector-mongodb
COPY --from=cp /usr/share/confluent-hub-components/debezium-debezium-connector-mongodb/ /opt/kafka/plugins/debezium-debezium-connector-mongodb/

Build kafka connect image

# Replace viet1846 by your repository
> docker build -t viet1846/kafka-connect:0.41.0 -f deployment/kafka/Dockerfile deployment/kafka
> docker push viet1846/kafka-connect:0.41.0

connect.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: viet1846/kafka-connect:0.41.0
version: 3.7.0
replicas: 3
bootstrapServers: my-kafka-cluster-kafka-plain-bootstrap:9092
# tls:
# trustedCertificates:
# - secretName: my-kafka-cluster-cluster-ca-cert
# certificate: ca.crt
config:
bootstrap.servers: my-kafka-cluster-kafka-plain-bootstrap:9092
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
internal.key.converter: org.apache.kafka.connect.json.JsonConverter
internal.value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-configs
config.storage.replication.factor: 1
status.storage.topic: connect-status
status.storage.replication.factor: 1
offset.flush.interval.ms: 10000
plugin.path: /opt/kafka/plugins
offset.storage.file.filename: /tmp/connect.offsets
template:
connectContainer:
env:
- name: AWS_ACCESS_KEY_ID
value: "admin"
- name: AWS_SECRET_ACCESS_KEY
value: "password"

Create kafka connect cluster

kubectl apply -f deployment/kafka/connect.yaml

7. Config Amazon S3 Sink connector for user topic

kubectl apply -f deployment/kafka/sink-user-topic-to-s3-connector.yaml

Produce message to users topic using Python

Check Minio file

Trino

trino> CREATE SCHEMA lakehouse.raw WITH (location = 's3a://lakehouse/raw.db/');
trino> CREATE TABLE IF NOT EXISTS minio.raw.python_users(
json_string varchar,
year varchar,
month varchar,
day varchar,
hour varchar
)WITH
(
format = 'TEXTFILE',
partitioned_by = ARRAY[ 'year', 'month', 'day', 'hour' ],
external_location = 's3a://kafka/topics/users'
);
trino> CALL minio.system.sync_partition_metadata('raw', 'mysql_customers', 'FULL');
trino> select
cast(json_extract(json_string, '$.add') as int) as add,
cast(json_extract(json_string, '$.name') as varchar) as name,
cast(json_extract(json_string, '$.message_ts') as varchar) as message_ts
from minio.raw.python_users
limit 5;

8. Config Debezium MySQL connector & Config Amazon S3 Sink connector for mysql topic

debezium-connector-mysql.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
namespace: kafka
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql.data-source
database.port: 3306
database.user: root
database.password: debezium
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: my-kafka-cluster-kafka-plain-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory

Config Mysql CDC source connector

kubectl apply -f deployment/kafka/debezium-connector-mysql.yaml

Update Mysql

> kubectl -n data-source exec -it deployments/mysql bash
bash-4.4# mysql -uroot -P3306 -hmysql.data-source -p
mysql> update inventory.customers set first_name='Sally updated' where id=1001;
mysql> insert into inventory.customers values (1005, 'Viet', 'Hoang', '[email protected]');
mysql> delete from inventory.customers where id=1005;

Check kafka topic

> kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-console-consumer.sh --topic mysql.inventory.customers --from-beginning --bootstrap-server 172.18.0.3:32100,172.18.0.4:32100,172.18.0.5:32100

sink-mysql-kafka-topic-to-s3-connector.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "sink-mysql-kafka-topic-to-s3-connector"
namespace: "kafka"
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
task.max: "1"
topics: mysql.inventory.addresses, mysql.inventory.customers, mysql.inventory.geom, mysql.inventory.orders, mysql.inventory.products, mysql.inventory.products_on_hand
s3.region: us-east-1
s3.bucket.name: kafka
s3.part.size: "5242880"
flush.size: 1
store.url: <http://minio.minio.svc.cluster.local:9000>
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class: "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator"
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
transforms: "insertTS,formatTS"
transforms.insertTS.type: "org.apache.kafka.connect.transforms.InsertField$Value"
transforms.insertTS.timestamp.field: "message_ts"
transforms.formatTS.type: "org.apache.kafka.connect.transforms.TimestampConverter$Value"
transforms.formatTS.format: "yyyy-MM-dd HH:mm:ss:SSS"
transforms.formatTS.field: "message_ts"
transforms.formatTS.target.type: "string"
behavior.on.null.values: ignore
partition.duration.ms: 60000
path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"
locale: "vi-VN"
timezone: "Asia/Ho_Chi_Minh"

Config Amazon S3 Sink connector for mysql topic

kubectl apply -f deployment/kafka/sink-mysql-kafka-topic-to-s3-connector.yaml

Check MinIO

9. Config Debezium PostgreSQL connector & Config Amazon S3 Sink connector for PostgreSQL topic

debezium-connector-postgres.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-postgres
namespace: kafka
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
tasks.max: 1
plugin.name: pgoutput
database.hostname: postgres.data-source
database.port: 5432
database.user: "data_engineer"
database.password: "password"
database.dbname: "data_engineer"
database.server.id: 184055
topic.prefix: postgres
schema.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: my-kafka-cluster-kafka-plain-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory

Config Debezium PostgreSQL connector

kubectl apply -f deployment/kafka/debezium-connector-postgres.yaml

Config Amazon S3 Sink connector for PostgreSQL topic

kubectl apply -f deployment/kafka/sink-postgres-kafka-topic-to-s3-connector.yaml

Update Postgres

k -n kafka exec -it postgresql-client sh
data_engineer=# \\dt inventory.*
# List of relations
# Schema | Name | Type | Owner
# -----------+------------------+-------+---------------
# inventory | customers | table | data_engineer
# inventory | geom | table | data_engineer
# inventory | orders | table | data_engineer
# inventory | products | table | data_engineer
# inventory | products_on_hand | table | data_engineer
# inventory | spatial_ref_sys | table | data_engineer
# (6 rows)
update inventory.customers set first_name='Sally Marie' where id=1001;

Check Kafka topic

kubectl -n kafka exec -it my-kafka-cluster-kafka-0 bash
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
[kafka@my-kafka-cluster-kafka-0 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic postgres.inventory.customers --from-beginning

Check MinIO

10. Destroy the Kind cluster

kind delete cluster -n dev