kafka - 9dian/Index GitHub Wiki
介绍
producer
consumer
consumer性能非常好,KAFKA这种消息队列在生产端和消费端分别采取的push和pull的方式,也就是你生产端可以认为KAFKA是个无底洞,有多少数据可以使劲往里面推送,消费端则是根据自己的消费能力,需要多少数据,你自己过来KAFKA这里拉取,KAFKA能保证只要这里有数据,消费端需要多少,都尽可以自己过来拿。
ZeroCopy
具体到消息的落地保存,broker维护的消息日志本身就是文件的目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。维护这个公共的格式并允许优化最重要的操作:网络传输持久性日志块。 现代的unix操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系统调用来完成的。Java提供了访问这个系统调用的方法:FileChannel.transferTo API。
要理解senfile的影响,重要的是要了解将数据从文件传输到socket的公共数据路径,如下图所示,数据从磁盘传输到socket要经过以下几个步骤:
文件传输到socket的常规方式:
- 操作系统将数据从磁盘读入到内核空间的页缓存。
- 应用程序将数据从内核空间读入到用户空间缓存中。
- 应用程序将数据写回到内核空间到socket缓存中。
- 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出。
这里有四次拷贝,两次系统调用,这是非常低效的做法。
如果使用sendfile,只需要一次拷贝就行:
允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
Normal VS ZeroCopy
文件大小 | 常规文件传输(ms) | ZeroCopy(ms) |
---|---|---|
10 MB | 205 | 61 |
20 MB | 330 | 120 |
50 MB | 750 | 310 |
100 MB | 1300 | 600 |
200 MB | 330 | 120 |
500 MB | 5055 | 2708 |
假设一个Topic有多个consumer的情况, 首先数据被复制到页缓存中一次,并使用上面的零拷贝优化,则数据可以在在每个消费上重复使用(而不是存储在存储器中);也不在每次读取时复制到用户空间。这使得以接近网络连接限制的速度消费消息。
这种页缓存和sendfile组合,意味着KAFKA集群的消费者大多数都完全从缓存消费消息,而磁盘没有任何读取活动。
批量压缩
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络带宽,对于需要在广域网上的数据中心之间发送消息的数据流尤其如此;所以数据压缩就很重要。可以每个消息都压缩,但是压缩率相对很低。所以KAFKA使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。
KAFKA允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。
KAFKA支持Gzip和Snappy压缩协议。
partition
KAFKA的消息保存在Topic中,Topic可分为多个分区,每个分区又有多个Replicate, 保证数据的安全性。 分区的设计的特点:
- 可以并发读写,加快读写速度;
- 多分区的存储可以实现数据的负载均衡;
- 基于2.的原因也能加快数据的恢复速率,一但某台机器挂了,整个集群只需要恢复一部分数据,可加快故障恢复的时间。
每个Partition分为多个Segment,每个Segment有.log和.index 两个文件,每个log文件承载具体的数据,每条消息都有一个递增的offset,Index文件是对log文件的索引,Consumer查找offset时使用的是二分法根据文件名去定位到哪个Segment,然后解析msg,匹配到对应的offset的message。
Partition recovery过程
每个Partition会在磁盘记录一个RecoveryPoint,,记录已经flush到磁盘的最大offset。当broker 失败重启时,会进行loadLogs。首先会读取该Partition的RecoveryPoint,找到包含RecoveryPoint的segment及以后的segment, 这些segment就是可能没有完全flush到磁盘segments。然后调用segment的recover,重新读取各个segment的msg,并重建索引。每次重启KAFKA的broker时,都可以在输出的日志看到重建各个索引的过程。
配置
producer
dev sandbox list
2018-10
excluding zookeeper
ubuntu@dev01:/usr/local/confluent-4.0.0/etc$ find . -type f -exec grep '172\.' {} \; -print
zookeeper.connect=172.18.98.75:2181,172.18.98.76:2181,172.18.98.77:2181
bootstrap.servers=PLAINTEXT://172.18.98.75:9092,PLAINTEXT://172.18.98.76:9092,PLAINTEXT://172.18.98.77:9092
./kafka-rest/kafka-rest.properties
listeners = PLAINTEXT://172.18.98.75:9092
zookeeper.connect=172.18.98.75:2181,172.18.98.76:2181,172.18.98.77:2181
./kafka/server.properties
bootstrap.servers=172.18.98.75:9092,172.18.98.76:9092,172.18.98.77:9092
./schema-registry/connect-avro-distributed.properties
kafkastore.connection.url=172.18.98.75:2181,172.18.98.76:2181,172.18.98.77:2181
./schema-registry/schema-registry.properties
bootstrap.servers=172.18.98.75:9092,172.18.98.76:9092,172.18.98.77:9092
./schema-registry/connect-avro-standalone.properties
ubuntu@dev01:/usr/local/confluent-4.0.0/etc$ vim ./kafka-rest/kafka-rest.properties
20181023
允许 自定义 TCP
80/80 地址段访问
允许 自定义 TCP
5001/5001 地址段访问
[zk: 172.18.98.75,172.18.98.76,172.18.98.77(CONNECTED) 2] get /controller {"version":1,"brokerid":1,"timestamp":"1535473474613"}
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 kafka-configs --zookeeper 172.18.98.75,172.18.98.76,172.18.98.77 --entity-type brokers --entity-name 0 --describe
kafka-topics --zookeeper 172.18.98.75,172.18.98.76,172.18.98.77 --topic 'cWhkY2NnY2Nn.2018.10.24' --describe
Segment logs are where messages are stored Each message is its value, offset, timestamp, key, message size, compression codec, checksum, and version of the message format. The data format on disk is exactly the same as what the broker receives from the producer over the network and sends to its consumers. This allows Kafka to efficiently transfer data with zero copy.
kafka-topics --zookeeper dev01,dev02,dev03 --list
kafka-topics --zookeeper mvxl58434:2181,mvxl65161:2181,mvxl74089:2181 --list
[root@ics-kafka-prod3 cWhkY2NnY2Nn.2018.10.24-0]# kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /data/kafka/kafka-logs/cWhkY2NnY2Nn.2018.10.24-0/00000000000000000000.log | more
Dumping /data/kafka/kafka-logs/cWhkY2NnY2Nn.2018.10.24-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1540310456711 isvalid: true keysize: 54 valuesize: 129 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional:
false headerKeys: [] key:
offset: 1 position: 253 CreateTime: 1540310531470 isvalid: true keysize: 54 valuesize: 121 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactiona
l: false headerKeys: [] key:
offset: 2 position: 498 CreateTime: 1540310742615 isvalid: true keysize: 54 valuesize: 104 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactiona
l: false headerKeys: [] key:
kafka-consumer-groups --bootstrap-server dev01:9092,dev02:9092,dev03:9092 --group test-consumer-group --describe
第六:这个问题如何解决?
官方给出的默认示例并不可靠,并没有考虑到网络繁忙的情况,并不适合生产。
故kafka一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。
延云认为,增加如下参数会较大幅度的减少kafka写入数据照成的数据丢失,在公司实测,目前还没遇到数据丢失的情况。
props.put("compression.type", "gzip");
props.put("linger.ms", "50");
props.put("acks", "all");
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000);
2020-07
in kafka-run-classh.sh - kafka uses the following variables:
$KAFKA_HEAP_OPTS
$KAFKA_JVM_PERFORMANCE_OPTS
$KAFKA_GC_LOG_OPTS
$KAFKA_JMX_OPTS
$KAFKA_LOG4J_OPTS
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=12346 -Dcom.sun.management.jmxremote.rmi.port=12346 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
bin/kafka-server-start.sh -daemon config/server.properties
官方的说明来看,未来offset的zookeeper存储将会被弃用。因此现有的基于kafka的项目如果今后计划保持更新的话,可以考虑在合适的时候将offset迁移到kafka broker上。
以下是迁移步骤:
- consumer.properties: 在consume config中,修改offsets.storage=kafka并且dual.commit.enabled=true。 offsets.storage=kafka
同时提交offset到zookeeper和offset manager上,这是为了保证在迁移过程中offset不会丢失
dual.commit.enabled=true 2. rolling restart consumers(参考下面)并且确认运行正常。这时已经完成offset的迁移工作。 3. 修改consumer config,dual.commit.enabled=false。双重提交offset会带来额外的开销,在完成迁移工作之后最好把这项配置关闭。 4. rolling restart consumers
kafka rolling restart This is a production cluster and you don't want any data loss. You have partition replicas spanned across the brokers For each partition you have at least one replica on each broker all zks are accessible by each broker This is how I would do it
Take down individual broker. When one of the broker is down, then describe the topics to check if only replicas that are not shown, belong to broker which was taken down. Restart the broker and again verify that all partitions are in sync before going for next broker. Then stop-start each zk individually, each time tracking if all replicas and partitions are in sync. That way at least one zk is available for 2 brokers to maintain their meta data
broker3 stop & start [apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: myTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
kafka-server-stop && tail -50f /opt/confluent/logs/kafkaServer.out
[apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" [2020-07-20 20:57:44,072] WARN [AdminClient clientId=adminclient-1] Connection to node -3 (mvxl74089/10.18.62.71:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2 Topic: myTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
kafka-server-start -daemon /opt/confluent/etc/kafka/server.properties && tail -50f /opt/confluent/logs/kafkaServer.out
[apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic"
Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2 Topic: myTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
broker 2 stop & start [apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: myTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3 kafka-server-stop && tail -50f /opt/confluent/logs/kafkaServer.out [apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" [2020-07-20 21:06:30,461] WARN [AdminClient clientId=adminclient-1] Connection to node -2 (mvxl65161/10.18.62.70:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3 Topic: myTopic Partition: 1 Leader: 3 Replicas: 2,3,1 Isr: 1,3
kafka-server-start -daemon /opt/confluent/etc/kafka/server.properties && tail -50f /opt/confluent/logs/kafkaServer.out [apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 Topic: myTopic Partition: 1 Leader: 3 Replicas: 2,3,1 Isr: 1,3,2
broker 1 stop & start [apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 Topic: myTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,3,2 kafka-server-stop && tail -50f /opt/confluent/logs/kafkaServer.out
zookeeper : controller {"version":1,"brokerid":2,"timestamp":"1595250693050"}
[apps@mvxl58434 kafka-logs]$ kafka-topics --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --describe --topic "myTopic" [2020-07-20 21:11:55,111] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (mvxl58434/10.18.62.69:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Topic: myTopic PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824 Topic: myTopic Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 3,2 Topic: myTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 3,2 kafka-server-start -daemon /opt/confluent/etc/kafka/server.properties && tail -50f /opt/confluent/logs/kafkaServer.out zookeeper-server-start -daemon /opt/confluent/etc/kafka/zookeeper.properties && tail -20f /opt/confluent/logs/zookeeper.out
--bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092
kafka-console-consumer --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --topic "00sftest" --from-beginning
kafka-console-producer --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --topic "00sftest"
datarev_0x1c_state_prod_iot dual.commit.enabled=true offsets.storage=kafka
-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
Looking at kafka-run-classh.sh - kafka uses the following variables:
$KAFKA_HEAP_OPTS
$KAFKA_JVM_PERFORMANCE_OPTS
$KAFKA_GC_LOG_OPTS
$KAFKA_JMX_OPTS
$KAFKA_LOG4J_OPTS
You can run it via:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=12346 -Dcom.sun.management.jmxremote.rmi.port=12346 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
bin/kafka-server-start.sh -daemon config/server.properties
in /bin/kafka-server-start.sh: export KAFKA_HEAP_OPTS="-Xmx8G -Xms4G" or in /bin/kafka-run-class.sh: KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
log info:
Error processing append operation on partition __consumer_offsets-17 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(3) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-17
Error processing append operation on partition __consumer_offsets-17 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(3) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-17
Error processing append operation on partition __consumer_offsets-17 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(3) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-17
/opt/confluent/logs/server.log.2020-07-21-12:org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(2) is insufficient to satisfy the min.isr requirement of 2 for partition __consum
er_offsets-16
Soft deletion:- (rentention.ms=1000) (using kafka-configs.sh).
bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.
Setting to default:- 7 days
bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics --add-config retention.ms=604800000
获取 topic 当前消息数
--time -1表示最大位移 --time -2表示最早位移
kafka-run-class kafka.tools.GetOffsetShell --broker-list mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --topic 1sftest --time -1
kafka-run-class kafka.tools.GetOffsetShell --broker-list mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --topic datarev_0x1c_state_prod_iot --time -2
kafka-run-class kafka.tools.DumpLogSegments --files /data/kafka-logs/datarev_0x1c_state_prod_iot-0/00000000000074859490.log --print-data-log --deep-iteration > /tmp/secLog.log
kafka-consumer-groups --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --group iot00 --describe
kafka-consumer-groups --bootstrap-server mvxl58434:9092,mvxl65161:9092,mvxl74089:9092 --reset-offsets --group "iot01" --topic "datarev_0x1c_state_prod_iot" --partition 0 --to-latest
kafka-console-consumer --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" $bs --topic __consumer_offsets --from-beginning