生产者——消息发送 - 969251639/study GitHub Wiki

从下面的kafka发送消息的一个简单例子入手:

public class KafkaProducer {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
                //是否异步
		boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
                //配置生产者属性
		Properties props = new Properties();
		props.put("bootstrap.servers", "120.77.182.40:9092");
		props.put("client.id", "DemoProducer");//生产者标识
		props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(props);
		
		String topic = "test";
		int messageNo = 1; //消息的key
		while(true) {
			String messageStr = "Message_" + messageNo;//消息的value
			long startTime = System.currentTimeMillis();
			if(isAsync) {//异步发送
				/**
				 * 第一个参数是ProducerRecord类型的对象,封装了目标topic、消息的key,消息的value
				 * 第二个参数是一个Callback对象,当生产者接收到Kafka发来的ack确认的时候,会调用Callback对象的onCompletion()方法
				 */
				producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallback(startTime, messageNo, messageStr));
				System.out.println("sent message: " + messageNo + ", " + messageStr);
			}else {//同步发送
				//KafkaProducer.send()方法返回值类型是Future<RecordMetadata>,这里通过Future.get()方法阻塞当前线程,等待kafka服务器ACK响应
				producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get();
				System.out.println("sent message: " + messageNo + ", " + messageStr);
			}
			++messageNo;
			Thread.sleep(100);
		}
	}
}
class DemoCallback implements Callback {
	private long startTime;
	private Integer key;
	private String message;
	
	public DemoCallback(long startTime, Integer key, String message) {
		super();
		this.startTime = startTime;
		this.key = key;
		this.message = message;
	}

	/**
	 * 生产者成功发送消息,收到Kafka服务端发来的ACK确认后,会调用此回调函数
	 * @param arg0 生产者发送的消息元数据,如果发送过程中出现异常,此参数为null
	 * @param arg1 发送过程中出现异常,如果发送成功,则此参数为null
	 */
	public void onCompletion(RecordMetadata arg0, Exception arg1) {
		long time = System.currentTimeMillis() - startTime;
		if(arg0 != null) {
			//RecordMetadata中包含了分区信息,Offset消息等
			System.out.println("message(" + key + ", " + message + ") sent to partition(" + 
					arg0.partition() + "), offset(" + arg0.offset() + ") in" + time + " ms");
		}
	}
	
}

首先看到kafka的生产者通过properties的配置参数创建了一个org.apache.kafka.clients.producer.KafkaProducer对象

    public KafkaProducer(Properties properties) {
        this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
    }

这个构造方法会将传进来的properties的属性转成map的方式

    private static Map<String, Object> propsToMap(Properties properties) {
        Map<String, Object> map = new HashMap<>(properties.size());
        for (Map.Entry<Object, Object> entry : properties.entrySet()) {//遍历properties属性
            if (entry.getKey() instanceof String) {
                String k = (String) entry.getKey();
                map.put(k, properties.get(k));//逐个put到map中
            } else {
                throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
            }
        }
        return map;
    }

然后在调用它的统一构造方法

    KafkaProducer(Map<String, Object> configs,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
        ...
    }

上面的那个构造方法就是kafka生产者的核心,包含了创建所有生产者至少需要用到的组建的构造,下面来逐行分析下这个构造方法的实现

    KafkaProducer(Map<String, Object> configs,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
                valueSerializer));
        try {
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;

            String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                    (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");

            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                    valueSerializer, interceptorList, reporters);
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            this.transactionManager = configureTransactionState(config, logContext, log);
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = new ApiVersions();
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    config.getInt(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    true, true, clusterResourceListeners);
                this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
            }
            this.errors = this.metrics.sensor("errors");
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }
  1. 将配置属性map转成ProducerConfig
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
                valueSerializer));

这个ProducerConfig主要是用来通过配置属性动态的反射创建对象和将配置属性从字符串转成需要的数据类型,比如下面的

String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);//获取配置属性中的内容
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);//将配置属性中的对象名转成对象   
  1. 设置clientId,如果没有配置那么则自动生成一个以producer-为前缀的数字递增作为clientId,如第一个producer:producer-1,第二个producer:producer-2,以此类推
            Map<String, Object> userProvidedConfigs = config.originals();//保存原有的map的配置
            this.producerConfig = config;//将生成的ProducerConfig赋给producerConfig成员变量
            this.time = time;//保存当前时间
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);//从配置文件中获取client.id配置项
            if (clientId.length() <= 0)//如果用户没有配置client.id,则自动生成一个,以producer-开头,然后拼一个单调递增的数字
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;//将生成的clientId赋给成员变量
  1. 事务相关(kafka的事务需要生产者手动配置transactional.id)
            //读取transactional.id的配置项
            String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                    (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");
  1. 生成性能监控组件metric
            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
  1. 实例化分区器,用于为消息指定分区,客户端可以通过实现Partitioner 接口自定义消息分配分区的规则,默认使用 DefaultPartitioner(该分区器分配分区的规则是:若消息指定了Key,则对Key取hash值,然后与可用的分区总数求模; 若没有指定Key,则DefalutPartitioner通过一个随机数与可用的总分区数取模)
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
  1. 配置每次metadata更新操作需要等待的时间间隔,防止刷新过快(retry.backoff.ms默认100ms),在每次重试之前,producer会更新相关topic的metadata,以此进行查看新的leader是否分配好了。因为leader的选择需要一点时间,此选项指定更新metadata之前producer需要等待的时间
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
  1. 实例化消息Key和Value进行序列化操作的Serializer,默认ByteArraySerializer(可自定义,但消费端也要指定相应的 反序列化操作)
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }
  1. 为每个producer(client.id做区分)绑定拦截器
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                    valueSerializer, interceptorList, reporters);
  1. 读取其他配置项
            //请求的最大大小为字节。要小于 message.max.bytes
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);//max.request.size
            //请求的内存池大小
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);//buffer.memory
            //压缩类型
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
            //最大阻塞时间(当申请的内存不够时会阻塞)
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);//max.block.ms
            //事务管理器
            this.transactionManager = configureTransactionState(config, logContext, log);
            //超时时间(毫秒),由delivery.timeout.ms,linger.ms,request.timeout.ms这几个参数控制
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
  1. 创建消息累加器
            this.apiVersions = new ApiVersions();//版本,区分低版本api和高版本api
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    config.getInt(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
  1. 创建元数据
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));//获取broker地址
            //实例化用于消息发送相关元数据信息的MetaData对象,MetaData由用于控制MetaData进行更新操作的相关配置信 息与集群信息Cluster(保存集群中所有主题与分区的各种信息)组成
            if (metadata != null) {
                this.metadata = metadata;
            } else {//初始化元数据
                this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    true, true, clusterResourceListeners);
                this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
            }
  1. 收集错误传感器
            this.errors = this.metrics.sensor("errors");
  1. 创建发送消息的线程
            this.sender = newSender(logContext, kafkaClient, this.metadata);//创建sender线程
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;//sender线程名
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();//启动sender线程

由此可见,kafka生产者的创建需要经历大量的步骤来创建,那么生成之后kafka生产者就有了以下几个重要的成员变量

    private final String clientId;//生产者唯一id,不配置自动累加生成
    private final Partitioner partitioner;//分区器
    private final int maxRequestSize;//最大请求大小,控制每次与broker通讯的大小
    private final long totalMemorySize;//最大内存池大小,kafka实现了一套自己的内存池控制buffer的大小
    private final Metadata metadata;//元数据,记录整个kafka的集群信息
    private final RecordAccumulator accumulator;//消息累加器,收集待发送的消息
    private final Sender sender;//发送消息的线程 
    private final CompressionType compressionType;//压缩类型才
    private final Serializer<K> keySerializer;//key序列化
    private final Serializer<V> valueSerializer;//value序列化
    private final long maxBlockTimeMs;//最大阻塞时间,受限于上面的最大内存池大小的控制
    private final ProducerInterceptors<K, V> interceptors;//生产者的拦截器,可以发送之前,收到ack之前触发事件
    private final TransactionManager transactionManager;//事务管理器

现在在回到最开始的例子中,在生成了KafkaProducer对象后会调用send方法进行发送消息

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

方法开最开始前会先回调拦截器的onSend方法,然后再调用最终执行发送的doSend方法

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            //如果生产者的if (ioThread == null || !ioThread.isAlive())则抛异常,也就是Sender线程不能停(守护线程)
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime;
            try {//阻塞获取集群元数据,maxBlockTimeMs是等待超时时间
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            //减去阻塞获取元数据的时间,也就是计算剩下的阻塞时间
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            //保存集群信息
            Cluster cluster = clusterAndWaitTime.cluster;
            //对消息的键值对序列化
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            //根据key获取分区号
            int partition = partition(record, serializedKey, serializedValue, cluster);
            //获取要发送的主题和主题下的哪个分区
            tp = new TopicPartition(record.topic(), partition);
            //从发送的消息体中获取是否是只读消息
            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            //根据序列化后的key,value和header等信息计算出最后的待发送的数据大小
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            //验证是否超过最大数据包的限制(maxRequestSize=max.request.size和totalMemorySize=buffer.memory控制)
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            //创建一个用于拦截器回调和消息回调的包装类
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
            //事务相关
            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
            //将待发送消息追加到消息收集器
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {//如果消息收集器的缓存满了或者newBatchCreated,那么唤醒发送线程可以去真正的发送
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;//返回一个Future,用于控制异步或同步发送
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }

上面的生产者的生成和发送基本囊括了kafka生产者相关的最重要的组件,现总结如下:

  1. 生产者——元数据
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E5%85%83%E6%95%B0%E6%8D%AE
  2. 生产者——集群
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E9%9B%86%E7%BE%A4
  3. 生产者——分区器
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E5%88%86%E5%8C%BA%E5%99%A8
  4. 生产者——拦截器
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E6%8B%A6%E6%88%AA%E5%99%A8
  5. 生产者——消息累加器
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E6%B6%88%E6%81%AF%E7%B4%AF%E5%8A%A0%E5%99%A8
  6. 生产者——发送器
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E5%8F%91%E9%80%81%E5%99%A8
  7. 生产者——网络
    https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E7%BD%91%E7%BB%9C
⚠️ **GitHub.com Fallback** ⚠️