Kafka - aidarko/dev-notes GitHub Wiki

# Read AVRO 
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --consumer.config <secure_properties_path> \
    --topic <my_topic> \
    --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property schema.registry.url=https://<my-schema-registry-url>
# Producing messages in kafka line by line

export CONFLUENT_HOME=<confluent-home>
export BROKER_LIST=localhost:9092
export TOPIC=<topic>
input="input_file.txt"

while IFS= read -r line
do
  echo "$line" | ${CONFLUENT_HOME}/bin/kafka-console-producer \
             --broker-list ${BROKER_LIST} \
             --topic ${TOPIC}
  sleep 5
done < "$input"

Truststore and keystore

https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/?_ga=2.259408909.688677908.1615137385-1112432842.1603103737

We need to generate a key and certificate for each broker and client in the cluster. The common name (CN) of the broker certificate must match the fully qualified domain name (FQDN) of the server as the client compares the CN with the DNS domain name to ensure that it is connecting to the desired broker (instead of a malicious one).

keystore, which stores each application’s identity

truststore stores all the certificates that the application should trust Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate.

Worth reading: Official Confluent SSL documentation

Confluent script to generate self-signed certificate: link

Certificates creation details

  1. Create a self-signed X509 certificate for the CA (the CSR will be signed with it):

     openssl req -new -x509 -days 10000 -key ca/ca.key -out ca/ca.crt
    
  2. Create CSR (sign request)

     openssl req -new -newkey rsa:1024 -nodes -keyout mykey.pem -out myreq.pem
    
  3. Now the trust store's private key (CA) will sign the keystore's certificate.

     openssl x509 -req -CA ca/ca.key -CAkey ca/ca.crt \
       -in myreq.pem -out my-signed-cert \
       -days 365 -CAcreateserial
    
  4. https://docs.oracle.com/cd/E19509-01/820-3503/ggfhb/index.html

    cat mykey.pem my-signed-cert > mykeycertificate.pem.txt

  5. Generate PKCS strore:

     openssl pkcs12 -export -in mykeycertificate.pem.txt -out mykeystore.pkcs12 \
       -name myAlias -noiter -nomaciter
    

Example of properties, which might be used for mtls authentication.

security.protocol=SSL
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=<your-truststore-password>
ssl.endpoint.identification.algorithm=

ssl.keystore.location=/path/to/keystore.p12
ssl.keystore.password=<your-keystore-password>
ssl.key.password=<your-keystore-password>
ssl.enabled.protocols=TLSv1.2
ssl.client.auth=required

Test flow

  1. Test flow (plaintext):

    ./bin/kafka-topics --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic test1
    ./bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test1
    ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
    
  2. Test flow (ssl authentication): Prepare mtls.props for internal tooling:

    security.protocol=SSL
    ssl.truststore.location=/path/to/truststore.jks
    ssl.truststore.password=<your-truststore-password>
    ssl.endpoint.identification.algorithm=
    
    ssl.keystore.location=/path/to/keystore.jks
    ssl.keystore.password=<your-keystore-password>
    ssl.key.password=<your-keystore-password>
    ssl.enabled.protocols=TLSv1.2
    ssl.client.auth=required
    

    Commands:

    ./bin/kafka-topics --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic test1 --command-config mtls.props
    ./bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test1 --producer.config mtls.props
    ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning --consumer.config mtls.props
    
  3. To verify if your server is set up properly for SSL mode, use:

    openssl s_client -debug -connect localhost:9092 -tls1
    

    Check certificate:

    openssl x509 -in cerfile.cer -noout -text
    
  4. In case of OOM error, use:

    export KAFKA_HEAP_OPTS="-Xms256M -Xmx4G"
    
⚠️ **GitHub.com Fallback** ⚠️