Kafka Shell Usage - tenji/ks GitHub Wiki

Kafka 命令行使用

一、创建

  • 创建Topic

    bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_cdr --partitions 30  --replication-factor 2
    

    注: partitions 指定 topic 分区数,replication-factor 指定 topic 每个分区的副本数

二、删除

  • 删除Topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
    bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic t_cdr
    

2.1 彻底删除Topic的方法

  1. 删除ZK信息

执行完上方命令之后,如果没有配置delete.topic.enable=true则此时Topic的状态为:- marked for deletion

在没有配置delete.topic.enable=true的情况下,如果要彻底删除Topic,需要到ZK中删除中手动删除Topic相关信息。

rmr /kafka/brokers/topics/topic_name

# 1. rmr 后面对应的就是kafka在zookeeper中的路径
# 2. topic_name 为要删除的topic名称
  1. 删除本地磁盘上的数据文件 从ZK中删除Topic相关信息之后,其实本地磁盘当中的数据还是存在的,我们需要找到并删除它。
rm -rf /data/kafka/topic_name*

# 1. 这个目录就是配置文件中指定的kafka数据存储路径

三、查询

  • 查询所有Topic列表

    bin/kafka-topics.sh --zookeeper node01:2181 --list
    
  • 查询指定Topic详情(包括 Leader, Replicas, Isr 等信息)

    bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic t_cdr
    
  • 查询所有Topic详情

    bin/kafka-topics.sh --zookeeper node01:2181 --describe
    
  • 获取最新的 Offset:

    ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --time -1 --topic 1024_k6H9k6ijh2PxRSeeTio_stz_test
    
  • 获取最早的 Offset:

    ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --time -2 --topic 1024_k6H9k6ijh2PxRSeeTio_stz_test
    
  • 获取指定时间戳对应的 Offset:

    ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --time 1645781546362 --topic 1024_k6H9k6ijh2PxRSeeTio_stz_test
    

四、生产

  • 控制台向Topic生产数据(注意:生产数据时使用的是broker的端口)

    bin/kafka-console-producer.sh --broker-list node01:9092 --topic t_cdr
    
  • 控制台向开启了 SSL 的集群生产数据

    bin/kafka-console-producer.sh --broker-list node01:9093 --topic t_cdr --producer.config client_ssl.properties
    

    client_ssl.properties配置文件内容:

    security.protocol=SSL
    ssl.truststore.location=/root/stz/client.truststore.jks
    ssl.keystore.location=/root/stz/client.keystore.jks
    ssl.truststore.password=Y2hyb21lLi42OWk1N2owbDMuMzY1OWowajcmc291cmNlaWQ9Y2hyb21lJmllPVVURi04
    ssl.keystore.password=Y2hyb21lLi42OWk1N2owbDMuMzY1OWowajcmc291cmNlaWQ9Y2hyb21lJmllPVVURi04
    

五、消费

  • 控制台消费Topic的数据
    bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic t_cdr --from-beginning
    
  • 新版本的 Kafka 直接指定 Broker List 来消费
    bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic t_cdr --from-beginning
    
  • 查询 Consumer Group 信息
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server node01:9092 --describe --group group_id
    

六、ZK启动停止

  • 启动ZK

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> logs/zookeeper.log &
    
  • 停止ZK

    bin/zookeeper-server-stop.sh
    

七、Kafka启动停止

  • 启动Kafka

    nohup bin/kafka-server-start.sh config/server.properties &
    
  • 停止Kafka

    bin/kafka-server-stop.sh
    

参考链接