生产者——集群 - 969251639/study GitHub Wiki
KafkaProducer中的构造方法构造metadata时会调用下面的方法
this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
public static Cluster bootstrap(List<InetSocketAddress> addresses) {
List<Node> nodes = new ArrayList<>();
int nodeId = -1;
for (InetSocketAddress address : addresses)//根据配置的信息创建节点
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
//创建一个新的集群信息
return new Cluster(null, true, nodes, new ArrayList<>(0),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null);
}
Cluster保存整个kafka集群信息,下面是它的主要成员变量
private final List<Node> nodes;//一个Borker一个节点
private final Set<String> unauthorizedTopics;//存储需要授权验证的主题
private final Set<String> invalidTopics;//存储无效的主题
private final Set<String> internalTopics;//存储内部私有的主题
private final Node controller;//节点控制器
//下面都是为了方便操作上面的属性的映射器
//主题中的分区映射具体的分区信息
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
//主题下面的所有分区
private final Map<String, List<PartitionInfo>> partitionsByTopic;
//主题下所有可用的分区
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
//键所对应的节点,即Broker(分区后发送到那个机器上)
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//节点ID查找具体的节点信息
private final Map<Integer, Node> nodesById;
//集群资源
private final ClusterResource clusterResource;
Node保存每个Broker的信息,Node的主要成员变量如下
private final int id;//一个Borker的唯一ID
private final String idString;//一个Borker的唯一ID的字符串形式
private final String host;//一个Borker的主机ip
private final int port;//一个Borker的访问端口
private final String rack;//集群的机架
ClusterResource用于保存每个集群的主键ID,用于多个机架的情况下,不同机架下的不同集群映射唯一一个集群ID public class ClusterResource {
//集群的ID
private final String clusterId;
/**
* Create {@link ClusterResource} with a cluster id. Note that cluster id may be {@code null} if the
* metadata request was sent to a broker without support for cluster ids. The first version of Kafka
* to support cluster id is 0.10.1.0.
* @param clusterId
*/
public ClusterResource(String clusterId) {
this.clusterId = clusterId;
}
/**
* Return the cluster id. Note that it may be {@code null} if the metadata request was sent to a broker without
* support for cluster ids. The first version of Kafka to support cluster id is 0.10.1.0.
*/
public String clusterId() {
return clusterId;
}
@Override
public String toString() {
return "ClusterResource(clusterId=" + clusterId + ")";
}
}
接下来看它的主要构造方法构造了那些内容
private Cluster(String clusterId,
boolean isBootstrapConfigured,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
this.isBootstrapConfigured = isBootstrapConfigured;
this.clusterResource = new ClusterResource(clusterId);
// make a randomized, unmodifiable copy of the nodes
List<Node> copy = new ArrayList<>(nodes);
Collections.shuffle(copy);
this.nodes = Collections.unmodifiableList(copy);//设置所有节点信息
this.nodesById = new HashMap<>();
for (Node node : nodes)//添加节点映射
this.nodesById.put(node.id(), node);
// index the partitions by topic/partition for quick lookup
this.partitionsByTopicPartition = new HashMap<>(partitions.size());
for (PartitionInfo p : partitions)/创建主题和分区号与具体的分区信息的映射
this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
// index the partitions by topic and node respectively, and make the lists
// unmodifiable so we can hand them out in user-facing apis without risk
// of the client modifying the contents
HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<>();
HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
for (Node n : this.nodes) {//收集节点下多个主题
partsForNode.put(n.id(), new ArrayList<>());
}
for (PartitionInfo p : partitions) {//收集一个主题下的所对应的分区
if (!partsForTopic.containsKey(p.topic()))//如果不包含该主题
partsForTopic.put(p.topic(), new ArrayList<>());//创建一个新的分区容器
List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
psTopic.add(p);
if (p.leader() != null) {
List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
psNode.add(p);//收集节点的主副本分区
}
}
this.partitionsByTopic = new HashMap<>(partsForTopic.size());
this.availablePartitionsByTopic = new HashMap<>(partsForTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
String topic = entry.getKey();
List<PartitionInfo> partitionList = entry.getValue();
this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
List<PartitionInfo> availablePartitions = new ArrayList<>();
for (PartitionInfo part : partitionList) {
if (part.leader() != null)
availablePartitions.add(part);
}
this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));//可用分区就是master下的分区
}
this.partitionsByNode = new HashMap<>(partsForNode.size());
for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);//默认空
this.invalidTopics = Collections.unmodifiableSet(invalidTopics);//默认空
this.internalTopics = Collections.unmodifiableSet(internalTopics);//默认空
this.controller = controller;
}
Cluster比较简单,通过Node保存真正的broker信息,然后提供以下几个变量方便操控集群内容
//主题中的分区映射具体的分区信息
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
//主题下面的所有分区
private final Map<String, List<PartitionInfo>> partitionsByTopic;
//主题下所有可用的分区
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
//键所对应的节点,即Broker(分区后发送到那个机器上)
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//节点ID查找具体的节点信息
private final Map<Integer, Node> nodesById;