生产者——分区器 - 969251639/study GitHub Wiki
kafka在发送消息时,会根据key进行分区
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
...
//根据key获取分区号
int partition = partition(record, serializedKey, serializedValue, cluster);
...
}
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
如果用户指定了分区号,那么用用户指定的分区号,否则使用partitioner的partition方法分区
构造生产者时会通过配置项构造分区器
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);//partitioner.class
默认是DefaultPartitioner,在构造配置项的时候会初始化它
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
define(PARTITIONER_CLASS_CONFIG,
Type.CLASS,
DefaultPartitioner.class,
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
进入到DefaultPartitioner的partition方法,看下默认的分区是怎么分的
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {//没有指定key,轮询,不推荐该方法
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {// 指定了key
// hash the keyBytes to choose a partition
//murmur2哈希算法散列后对分区数取模
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}