Kafka - stanislawbartkowski/hdpactivedirectory GitHub Wiki
This page describes how to use HDP Kafka broker after AD/Kerberos enablement. When HDP is Kerberized, not only AD/Kerberos authentication is enabled but also Kafka ACL authorization is enabled.
More details:
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/bk_security/content/configuring_kafka_for_kerberos_using_ambari.html
For Java testing, I'm using a simple Java client. (https://github.com/stanislawbartkowski/KafkaSample)
There are two ways to authenticate:
- using an existing Kerberos ticket, should be preceded by kinit command.
- automatically, by exploiting keytab file.
Run kinit and make sure that Kerberos ticket is obtained. Here the user is bench.
klist
Ticket cache: FILE:/tmp/krb5cc_1603201293
Default principal: [email protected]
Valid starting Expires Service principal
29.03.2019 13:14:10 29.03.2019 21:06:28 krbtgt/[email protected]
renew until 05.04.2019 12:06:24
As a default, all users are allowed to list only ATLAS_HOOK topic. In case of Kerberos ticket authentication, exist Kafka jaas file is used. java.security.auth.login.config parameter should point to valid file name.
export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper a1.fyre.ibm.com:2181,aa1.fyre.ibm.com:2181,hurds1.fyre.ibm.com:2181 --list
ATLAS_ENTITIES
ATLAS_HOOK
__consumer_offsets
ambari_kafka_service_check
test_topic
cd KafkaSample/sh
vi kafka.properties
Make sure that following lines are uncommented.
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
vi env.rc
KERBEROS=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf
export JAVAOPTS="$KERBEROS -cp KafkaSample.jar:/usr/hdp/current/kafka-broker/libs/* KafkaMain kafka.properties"
./list.sh
I'm creating KafkaConsumer.
Created, now pull the topic list
Print list of topics received.
Topic : ATLAS_HOOK PartitionInfo : [Partition(topic = ATLAS_HOOK, partition = 0, leader = 1001, replicas = [1001], isr = [1001], offlineReplicas = [])]
There is a difference between kafka-topics output and Java client output. kafka-topics is reaching list of topics registered in zookeeper. Java client is accessing Kafka broker directly and according to default ACL rules, only ATLAS_HOOK can be listed.
Kerberos keytab authentication does not require previous kinit. Two things should be at hand before.
- user keytab. Can be prepared in AD host by ktpass command line utility.
- dedicated jaas config file. Sample jaas config file.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/bench/jaas/bench.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="[email protected]";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/bench/jaas/bench.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="[email protected]";
};
Client section is used by kafka_topics utility and allows access to Zookeeper. KafkaClient section allows Java client to get access to Kafka broker. keyTab parameter point to keytab and principal is expected user principal.
Before running the test, run kdestroy command to make sure that Kerberos ticket is not involved.
kdestroy
klist
klist: No credentials cache found (filename: /tmp/krb5cc_1603201293)
export KAFKA_OPTS=-Djava.security.auth.login.config=/home/bench/jaas/kafka_jaas.conf
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper a1.fyre.ibm.com:2181,aa1.fyre.ibm.com:2181,hurds1.fyre.ibm.com:2181 --list
Output as before
vi env.rc
#KERBEROS=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf
KERBEROS=-Djava.security.auth.login.config=/home/bench/jaas/kafka_jaas.conf
export JAVAOPTS="$KERBEROS -cp KafkaSample.jar:/usr/hdp/current/kafka-broker/libs/* KafkaMain kafka.properties"
./list.sh
Output as before
As a default, the Kafka superuser is kafka and all administrative tasks should be done as kafka user. During cluster Kerberization, only kafka service user and corresponding keytab are created.
In order to run Kafka jobs as kafka user, a separate kafka account should be manually created in AD and related keytab file exported.
Prepare jaas configuration file. keyTab parameter points to keytab and principal is expected kafka principal data.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/bench/jaas/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="[email protected]";
};
Publisher:
export KAFKA_OPTS=-Djava.security.auth.login.config=/home/bench/jaas/kafka_user_jaas.conf
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh -security-protocol PLAINTEXTSASL --broker-list a1.fyre.ibm.com:6667 --topic test_topic
Subscriber from separate window:
export KAFKA_OPTS=-Djava.security.auth.login.config=
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh -security-protocol PLAINTEXTSASL --bootstrap-server a1.fyre.ibm.com:6667 --topic test_topic --from-beginning
Data entered in publisher windows should be outputed in subscriber screen.
cd KafkaSample/sh
vi env.rc
KERBEROS=-Djava.security.auth.login.config=/home/bench/jaas/kafka_user_jaas.conf
export JAVAOPTS="$KERBEROS -cp KafkaSample.jar:/usr/hdp/current/kafka-broker/libs/* KafkaMain kafka.properties"
Run Java publisher.
./produce.sh
'm producing lines to topic test_topic unless stopped by CTRL/C ...
Key:0
Key:100
Key:200
Key:300
Key:400
Key:500
Key:600
Key:700
Key:800
Key:900
Key:1000
From separate window, launch subscriber.
./consume.sh
1412 Line number: 1412
1413 Line number: 1413
1414 Line number: 1414
1415 Line number: 1415
1416 Line number: 1416
1417 Line number: 1417
1418 Line number: 1418
1419 Line number: 1419
1420 Line number: 1420
1421 Line number: 1421
1422 Line number: 1422
1423 Line number: 1423
1424 Line number: 1424
1425 Line number: 142
In a production environment, it is not recommended to run jobs as kafka superuser. A more granular and safe security policy should be implemented.
Assume that we want to split roles over test_topic Kafka topic.
- dataadmin group and user2 can publish messages and manage test_topic topic
- datascience group and user3 can only read messages
- user1, not belonging to neither datascience nor dataadmin, is forbidden to access the topic anyway
The Kafka ACL properties can be applied from command line, using kafka-alc.sh command. It is the hard way. But there is good news, the policy can be implemented in a more user friendly way by means of Ranger Kafka plugin.

Prepare convenient bash scripts.
vi kafkaproduce.sh
export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf
exec /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --security-protocol PLAINTEXTSASL --broker-list a1.fyre.ibm.com:6667,aa1.fyre.ibm.com:6667,hurds1.fyre.ibm.com:6667 --topic $@
vi kafkaconsume.sh
export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf
exec /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --security-protocol PLAINTEXTSASL --bootstrap-server a1.fyre.ibm.com:6667,aa1.fyre.ibm.com:6667,hurds1.fyre.ibm.com:6667 --from-beginning --topic $@
Authenticate as user2 user and start emitting messages:
kafkaproduce.sh test_topic
>first message
>second message
>I'm tired
>
Authenticate as user3 user and read messages:
kafkaconsume.sh test_topic
first message
second message
I'm tired
As user3 try to send messages into the topic:
kafkaproduce.sh test_topic
>security message
>
>[2019-03-29 15:47:54,298] ERROR Error when sending message to topic test_topic with key: null, value: 16 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
[2019-03-29 15:47:54,301] ERROR Error when sending message to topic test_topic with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
Authenticate as rouge user1 user and try to intercept the messages:
kafkaconsume.sh test_topic
[2019-03-29 15:43:17,805] WARN [Consumer clientId=consumer-1, groupId=console-consumer-94896] Error while fetching metadata with correlation id 2 : {test_topic=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-03-29 15:43:17,808] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
[user1@varlet1 ~]$
Try to inject a false gospel into the topic:
kafkaproduce.sh test_topic
>fake news
[2019-03-29 15:45:10,018] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test_topic=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-03-29 15:45:10,019] ERROR Error when sending message to topic test_topic with key: null, value: 9 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
>
Authenticate as user2 user and start producing messages.
cd KafkaSample/sh
./produce.sh
I'm producing lines to topic test_topic unless stopped by CTRL/C ...
Key:0
Key:100
Key:200
Key:300
Key:400
Authenticate as user3 user and absorb messages
./consume.sh
I'm consuming lines from topic test_topic unless stopped by CTRL/C ...
1473 Line number: 1473
1474 Line number: 1474
1475 Line number: 1475
1476 Line number: 1476
1477 Line number: 1477
1478 Line number: 1478
1479 Line number: 1479
As user3 try producing messages.
./produce.sh
I'm producing lines to topic test_topic unless stopped by CTRL/C ...
Key:0
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at KafkaMain.produceLines(KafkaMain.java:58)
at KafkaMain.main(KafkaMain.java:103)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
As malicious user1 try to read or write messages.
./produce.sh
I'm producing lines to topic test_topic unless stopped by CTRL/C ...
Key:0
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1124)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
at KafkaMain.produceLines(KafkaMain.java:58)
at KafkaMain.main(KafkaMain.java:103)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
./consume.sh
I'm consuming lines from topic test_topic unless stopped by CTRL/C ...
Exception in thread "main" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]