Publish from Kafka, Persist on MinIO in k8s - cniackz/public GitHub Wiki

Publish from Kafka, Persist on MinIO in k8s
- https://strimzi.io/
- https://strimzi.io/quickstarts/
- https://github.com/minio/minio/issues/9623
- https://kafka.apache.org/documentation/#connect
- https://strimzi.io/docs/0.11.4/#kafka-connect-str
- https://strimzi.io/docs/operators/latest/configuring.html
- https://github.com/strimzi/strimzi-kafka-operator/issues/3027
- https://github.com/strimzi/strimzi-kafka-operator/discussions/6888
- https://github.com/cniackz/public/wiki/Publish-from-Kafka,-Persist-on-MinIO
- https://stackoverflow.com/questions/64919947/how-to-use-kafka-connect-in-strimzi
- https://stackoverflow.com/questions/61848747/kafka-s3-connector-fails-to-connect-minio
- https://stackoverflow.com/questions/57306651/kafka-connect-s3-partitioner-class-timebasedpartitioner-could-not-be-found
- https://blog.kubernauts.io/kafka-topic-backup-to-s3-and-restore-to-another-kafka-cluster-using-strimzi-and-openebs-38a2bdf37fc2
- https://stackoverflow.com/questions/51644409/kafka-broker-fails-because-all-log-dirs-have-failed
- Create the cluster
- File: kind-config.yaml
# four node (two workers) cluster config
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: workerkind delete cluster
kind create cluster --config kind-config.yaml- From https://github.com/cniackz/public/wiki/Publish-from-Kafka,-Persist-on-MinIO you need to install the plugin with Confluent Hub so that you can build the docker image in quay and load the plugin:
$ confluent-hub install confluentinc/kafka-connect-s3:latest \
> --component-dir /Users/cniackz/confluent-plugins \
> --worker-configs /Users/cniackz/kafka/kafka_2.13-3.3.1/config/connect-distributed.properties
Component's license:
Confluent Community License
http://www.confluent.io/confluent-community-license
I agree to the software license agreement (yN) y
Downloading component Kafka Connect S3 10.3.0, provided by Confluent, Inc. from Confluent Hub and installing into /Users/cniackz/confluent-plugins
Do you want to uninstall existing version 10.3.0? (yN) y
Adding installation directory to plugin path in the following files:
/Users/cniackz/kafka/kafka_2.13-3.3.1/config/connect-distributed.properties
Completed - Get MinIO UP and Running!
kubectl apply -k github.com/minio/operator/
kubectl apply -k ~/operator/examples/kustomization/tenant-lite
kubectl create namespace kafka
k apply -f ~/minio/ubuntu.yaml -n kafkaapt update
apt install -y wget
apt install -y iputils-ping
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/mc
mc alias set myminio http://minio.tenant-lite.svc.cluster.local minio minio123
mc mb myminio/kafka-bucket
mc ls myminio/kafka-bucket
- Get Kafka Running:
# Delete any previous Kafka as it may be stuck or outdated:
kubectl delete -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl delete -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
kubectl delete -f /Users/cniackz/strimzi-kafka-operator/examples/connect/kafka-connect.yaml -n kafka
# TODO: Make sure to delete all PVC related to Kafka.kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
- Create the topics:
minio-topic-1andconnect-offsets:
exec kubectl exec -i -t -n kafka my-cluster-kafka-0 -c kafka -- sh -c "clear; (bash || ash || sh)"
cd bin./kafka-topics.sh --bootstrap-server localhost:9092 --topic minio-topic-1 --create --partitions 1 --replication-factor 1 --config cleanup.policy=compact./kafka-topics.sh --bootstrap-server localhost:9092 --topic connect-offsets --create --partitions 1 --replication-factor 1 --config cleanup.policy=compactExpected:
192:~ cniackz$ exec kubectl exec -i -t -n kafka my-cluster-kafka-0 -c kafka -- sh -c "clear; (bash || ash || sh)"
sh: clear: command not found
[kafka@my-cluster-kafka-0 kafka]$ cd bin
[kafka@my-cluster-kafka-0 bin]$ ./kafka-topics.sh \
> --bootstrap-server localhost:9092 \
> --topic minio-topic-1 \
> --create --partitions 1 \
> --replication-factor 1 \
> --config cleanup.policy=compact
Created topic minio-topic-1.
[kafka@my-cluster-kafka-0 bin]$ [kafka@my-cluster-kafka-0 bin]$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic connect-offsets --create --partitions 1 --replication-factor 1 --config cleanup.policy=compact
Created topic connect-offsets.- Produce 4 messages to send data to minio
flush.size: '3':
kubectl -n kafka run kafka-producer -ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true --restart=Never -- bin/kafka-console-producer.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--topic minio-topic-1 \
--property parse.key=true \
--property key.separator=,$ kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic minio-topic-1 --property parse.key=true --property key.separator=,
If you don't see a command prompt, try pressing enter.
>123,123
>234,234
>345,345
>456,456
>- Create image for plugin:
cd /Users/cniackz/kafka-connect
docker build .
docker login --username cniackz4 https://quay.io
docker tag 721f43699515 quay.io/cniackz4/kafkaconnect:latest
docker push quay.io/cniackz4/kafkaconnect:latest
-
When running
docker build .I did this in Ubuntu x86 for our Intel cluster otherwise there will be architecture issue, if you run local in kind, it can be build on MacBook Pro M1. -
After
docker pushdon't forget to make itPublicso that it can be downloaded from within the cluster. -
File:
/Users/cniackz/kafka-connect/Dockerfile -
Don't forget to update accordingly:
ENV AWS_ACCESS_KEY_ID=<USER>andENV AWS_SECRET_ACCESS_KEY=<PASSWORD>
FROM quay.io/strimzi/kafka:latest-kafka-3.3.1
USER root:root
COPY ./confluent-plugins/ /opt/kafka/plugins/
USER kafka:kafka
ENV AWS_ACCESS_KEY_ID=<USER>
ENV AWS_SECRET_ACCESS_KEY=<PASSWORD>
- Apply Kafka Connect:
-
Make sure IP is correct
bootstrap.servers: 10.244.1.5:9092 -
Make sure to update
store.url: http://10.244.4.13:9000accordingly get the IP from the MinIO Pod. -
Don't forget to create the bucket:
s3.bucket.name: kafka-bucket -
Remember you need more than 3 messages to send file to MinIO:
flush.size: '3' -
Also remember to get/update topic accordingly:
topics: minio-topic-1 -
File:
/Users/cniackz/strimzi-kafka-operator/examples/connect/kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: quay.io/cniackz4/kafkaconnect:latest
version: 3.3.1
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
bootstrap.servers: 10.244.1.5:9092
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-configs
config.storage.replication.factor: 1
status.storage.topic: connect-status
status.storage.replication.factor: 1
offset.flush.interval.ms: 10000
plugin.path: /opt/kafka/plugins
offset.storage.file.filename: /tmp/connect.offsets
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "minio-connector"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
task.max: '1'
topics: minio-topic-1
s3.region: us-east-1
s3.bucket.name: kafka-bucket
s3.part.size: '5242880'
flush.size: '3'
store.url: http://10.244.3.10:9000
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility: NONE
behavior.on.null.values: ignore
kubectl config use-context kubernetes-admin@kubernetes
cd /Users/cniackz/strimzi-kafka-operator
kubectl delete -f examples/connect/kafka-connect.yaml -n kafka
kubectl apply -f examples/connect/kafka-connect.yaml -n kafka
Expected Log is from pod
my-connect-cluster-connect-c47f64b46-d4xnz:
2022-12-29 12:22:16,084 INFO [minio-connector|task-0] Files committed to S3. Target commit offset for minio-topic-1-0 is 3 (io.confluent.connect.s3.TopicPartitionWriter) [task-thread-minio-connector-0]
Expected output is:
root@ubuntu:/# mc ls myminio/kafka-bucket
[2022-12-29 12:22:38 UTC] 0B topics/
root@ubuntu:/# mc ls myminio/kafka-bucket/topics
[2022-12-29 12:26:10 UTC] 0B minio-topic-1/
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1
[2022-12-29 12:27:57 UTC] 0B partition=0/
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2022-12-29 12:22:16 UTC] 12B STANDARD minio-topic-1+0+0000000000.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000000.json
123
234
345
root@ubuntu:/# -
Tested locally on MacBook Pro M1: Passed [Wed Dec 28 2022]
-
Tested locally on MacBook Pro M1: Passed [Fri Jan 06 2023]
-
Tested in our DC cluster: Passed [Fri Dec 30 2022]
192:~ cniackz$ kubectl -n kafka run kafka-producer -ti \
> --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
> --rm=true --restart=Never -- bin/kafka-console-producer.sh \
> --bootstrap-server my-cluster-kafka-bootstrap:9092 \
> --topic minio-topic-1 \
> --property parse.key=true \
> --property key.separator=,
If you don't see a command prompt, try pressing enter.
>123,123
>234,234
>345,345
>456,456
>555,555
>666,666
>777,777
>888,{"a":"a"}
>999,{"a":"a"}
>AAA,{"a":"a"} <--- This will break us: Unrecognized token 'AAA': was expecting:
|
|___ JSON String, Number, Array, Object or token 'null', 'true' or 'false'
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2023-01-06 17:25:14 UTC] 12B STANDARD minio-topic-1+0+0000000000.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000000.json
123
234
345
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000000.json
123
234
345
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2023-01-06 17:25:14 UTC] 12B STANDARD minio-topic-1+0+0000000000.json
[2023-01-06 17:30:03 UTC] 12B STANDARD minio-topic-1+0+0000000003.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000003.json
456
555
666
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2023-01-06 17:25:14 UTC] 12B STANDARD minio-topic-1+0+0000000000.json
[2023-01-06 17:30:03 UTC] 12B STANDARD minio-topic-1+0+0000000003.json
[2023-01-06 17:33:00 UTC] 24B STANDARD minio-topic-1+0+0000000006.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000006.json
777
{"a":"a"}
{"a":"a"}
root@ubuntu:/#
import pdb
import csv
import json
def csv_to_json(csvFilePath, jsonFilePath):
jsonArray = []
#read csv file
with open(csvFilePath, encoding='utf-8') as csvf:
#load csv file data using csv library's dictionary reader
csvReader = csv.DictReader(csvf)
#convert each csv row into python dict
for row in csvReader:
#add this python dict to json array
jsonArray.append(row)
#pdb.set_trace()
f = open(jsonFilePath, "a")
counter = 0
for item in jsonArray:
line = str(counter) + "," + json.dumps(item, indent=None, separators=(",",":"))
counter = counter + 1
f.write(line + '\n')
f.close()
csvFilePath = r'taxi-data.csv'
jsonFilePath = r'taxi-data.json'
csv_to_json(csvFilePath, jsonFilePath)- From:
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,07/11/2018 07:15:43 PM,07/11/2018 07:19:03 PM,1,0.45,1,N,249,158,1,4.5,1,0.5,1.08,0,0.3,7.38
2,07/11/2018 07:36:35 PM,07/11/2018 07:46:31 PM,1,1.24,1,N,100,48,1,8,1,0.5,2.45,0,0.3,12.25
2,07/11/2018 07:51:37 PM,07/11/2018 07:58:25 PM,1,0.97,1,N,48,142,1,6.5,1,0.5,1.66,0,0.3,9.96
1,07/11/2018 07:13:51 PM,07/11/2018 07:16:29 PM,1,0.6,1,N,90,90,2,4,1,0.5,0,0,0.3,5.8
1,07/11/2018 07:06:21 PM,07/11/2018 07:20:18 PM,1,2,1,N,234,231,1,11,1,0.5,2.55,0,0.3,15.35
1,07/11/2018 07:24:54 PM,07/11/2018 07:26:24 PM,1,0.5,1,N,125,249,1,3.5,1,0.5,0.79,0,0.3,6.09
1,07/11/2018 07:27:31 PM,07/11/2018 07:47:54 PM,2,5.5,1,N,249,239,1,19,1,0.5,4.15,0,0.3,24.95
1,07/11/2018 07:15:18 PM,07/11/2018 07:21:40 PM,1,1.1,1,N,162,170,1,6,1,0.5,1.55,0,0.3,9.35
1,07/11/2018 07:24:52 PM,07/11/2018 07:30:04 PM,1,1.2,1,N,233,141,2,6,1,0.5,0,0,0.3,7.8
- To
0,{"VendorID":"2","tpep_pickup_datetime":"07/11/2018 07:15:43 PM","tpep_dropoff_datetime":"07/11/2018 07:19:03 PM","passenger_count":"1","trip_distance":"0.45","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"249","DOLocationID":"158","payment_type":"1","fare_amount":"4.5","extra":"1","mta_tax":"0.5","tip_amount":"1.08","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"7.38"}
1,{"VendorID":"2","tpep_pickup_datetime":"07/11/2018 07:36:35 PM","tpep_dropoff_datetime":"07/11/2018 07:46:31 PM","passenger_count":"1","trip_distance":"1.24","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"100","DOLocationID":"48","payment_type":"1","fare_amount":"8","extra":"1","mta_tax":"0.5","tip_amount":"2.45","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"12.25"}
2,{"VendorID":"2","tpep_pickup_datetime":"07/11/2018 07:51:37 PM","tpep_dropoff_datetime":"07/11/2018 07:58:25 PM","passenger_count":"1","trip_distance":"0.97","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"48","DOLocationID":"142","payment_type":"1","fare_amount":"6.5","extra":"1","mta_tax":"0.5","tip_amount":"1.66","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"9.96"}
3,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:13:51 PM","tpep_dropoff_datetime":"07/11/2018 07:16:29 PM","passenger_count":"1","trip_distance":"0.6","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"90","DOLocationID":"90","payment_type":"2","fare_amount":"4","extra":"1","mta_tax":"0.5","tip_amount":"0","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"5.8"}
4,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:06:21 PM","tpep_dropoff_datetime":"07/11/2018 07:20:18 PM","passenger_count":"1","trip_distance":"2","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"234","DOLocationID":"231","payment_type":"1","fare_amount":"11","extra":"1","mta_tax":"0.5","tip_amount":"2.55","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"15.35"}
5,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:24:54 PM","tpep_dropoff_datetime":"07/11/2018 07:26:24 PM","passenger_count":"1","trip_distance":"0.5","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"125","DOLocationID":"249","payment_type":"1","fare_amount":"3.5","extra":"1","mta_tax":"0.5","tip_amount":"0.79","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"6.09"}
6,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:27:31 PM","tpep_dropoff_datetime":"07/11/2018 07:47:54 PM","passenger_count":"2","trip_distance":"5.5","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"249","DOLocationID":"239","payment_type":"1","fare_amount":"19","extra":"1","mta_tax":"0.5","tip_amount":"4.15","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"24.95"}
7,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:15:18 PM","tpep_dropoff_datetime":"07/11/2018 07:21:40 PM","passenger_count":"1","trip_distance":"1.1","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"162","DOLocationID":"170","payment_type":"1","fare_amount":"6","extra":"1","mta_tax":"0.5","tip_amount":"1.55","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"9.35"}
8,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:24:52 PM","tpep_dropoff_datetime":"07/11/2018 07:30:04 PM","passenger_count":"1","trip_distance":"1.2","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"233","DOLocationID":"141","payment_type":"2","fare_amount":"6","extra":"1","mta_tax":"0.5","tip_amount":"0","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"7.8"}
- You see all events:
- Example:
root@ubuntu:/# mc cat kafka/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000339.json
{"DOLocationID":"209","RatecodeID":"1","fare_amount":"5.5","tpep_dropoff_datetime":"07/11/2018 07:55:40 PM","VendorID":"2","passenger_count":"2","tolls_amount":"0","improvement_surcharge":"0.3","trip_distance":"0.92","store_and_fwd_flag":"N","payment_type":"1","total_amount":"8.3","extra":"1","tip_amount":"1","mta_tax":"0.5","tpep_pickup_datetime":"07/11/2018 07:50:18 PM","PULocationID":"261"}
{"DOLocationID":"48","RatecodeID":"1","fare_amount":"8","tpep_dropoff_datetime":"07/11/2018 07:15:46 PM","VendorID":"2","passenger_count":"2","tolls_amount":"0","improvement_surcharge":"0.3","trip_distance":"0.97","store_and_fwd_flag":"N","payment_type":"2","total_amount":"9.8","extra":"1","tip_amount":"0","mta_tax":"0.5","tpep_pickup_datetime":"07/11/2018 07:05:17 PM","PULocationID":"186"}
{"DOLocationID":"114","RatecodeID":"1","fare_amount":"11.5","tpep_dropoff_datetime":"07/11/2018 07:36:22 PM","VendorID":"2","passenger_count":"2","tolls_amount":"0","improvement_surcharge":"0.3","trip_distance":"2.4","store_and_fwd_flag":"N","payment_type":"1","total_amount":"16.62","extra":"1","tip_amount":"3.32","mta_tax":"0.5","tpep_pickup_datetime":"07/11/2018 07:21:14 PM","PULocationID":"48"}apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: quay.io/cniackz4/kafkaconnect:latest
version: 3.3.1
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
bootstrap.servers: 192.168.177.26:9092
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-configs
config.storage.replication.factor: 1
status.storage.topic: connect-status
status.storage.replication.factor: 1
offset.flush.interval.ms: 10000
plugin.path: /opt/kafka/plugins
offset.storage.file.filename: /tmp/connect.offsets
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "minio-connector"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
task.max: '1'
topics: minio-topic-1
s3.region: us-east-1
s3.bucket.name: kafka-bucket
s3.part.size: '5242880'
flush.size: '3'
store.url: https://ns-3.ic.min.dev
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility: NONE
behavior.on.null.values: ignore