消费者——拦截器 - 969251639/study GitHub Wiki

和生产者一样,消费者也有一套自己的拦截器,可以在消息消费之前和commit之前和close之前触发

    private KafkaConsumer(ConsumerConfig config,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
        ...
        //interceptor.classes配置项控制,多个用逗号隔开
        List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ConsumerInterceptor.class);
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        ...
    }

看下ConsumerInterceptor接口的定义

public interface ConsumerInterceptor<K, V> extends Configurable {

    //消费者消费消息之前触发
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

    //commit提交后触发,异常忽略不调用
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    //关闭时触发
    public void close();
}
  1. 消费者消费消息之前触发
    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        ...
        return this.interceptors.onConsume(new ConsumerRecords<>(records));//每次轮询返回消息前都会调用拦截器的onConsume方法
        ...
    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {//遍历拦截器
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);//挨个调用onConsume方法
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }

  1. commit提交后触发,异常忽略不调用
    //异步提交
    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        ...
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);//提交到服务器后调用onCommit方法
                ...
            }
            ...
        });
    }
    //同步提交
    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
        ...
        if (future.succeeded()) {
            if (interceptors != null)
               interceptors.onCommit(offsets);//提交到服务器后调用onCommit方法
            return true;
        }
        ...
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {//遍历拦截器
            try {
                interceptor.onCommit(offsets);//挨个调用onCommit方法
            } catch (Exception e) {
                // do not propagate interceptor exception, just log
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }
⚠️ **GitHub.com Fallback** ⚠️