kafka 0.9 java producer and consumer demo - downgoon/hello-world GitHub Wiki

实验环境:

  • kafka_2.11-0.9.0.1.tgz
  • zookeeper-3.4.6.tar.gz

样例代码:

git clone https://github.com/downgoon/hello-world
git checkout kafka9
git checkout kafka-c1-message

kafka 0.9 java producer and consumer demo

客户端

kafka 客户端是官方的jar包: org.apache.kafka:kafka-clients:0.9.0.1,maven配置如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
     <version>0.9.0.1</version>
</dependency>

Producer

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerDemo {

	public static void main(String[] args) throws Exception {

		String topic = "test";

		// producer properties setting

		Properties props = new Properties();
		props.put("bootstrap.servers",
				"10.213.42.230:10062,10.213.42.234:10062,10.213.44.188:10062,10.213.44.189:10062");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("acks", "all");
		props.put("retries", 1);

		// create an instance of producer
		Producer<String, String> producer = new KafkaProducer<String, String>(props);

		// asynchronously & future
		// Send the given record asynchronously and return a future which will
		// eventually contain the response information

		for (int i = 0; i < 100; i++) {
			Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(topic, "Hello"));
			RecordMetadata recMeta = future.get();
			System.out.println(
					"record meta " + i + ":  offset=" + recMeta.offset() + ", partition=" + recMeta.partition());

		}

		// asynchronously & callback
		producer.send(new ProducerRecord<String, String>(topic, "World"), new Callback() {

			@Override
			public void onCompletion(RecordMetadata metadata, Exception e) {
				if (e != null) {
					e.printStackTrace();
				} else {
					System.out.println("metadata: offset=" + metadata.offset() + ", partition=" + metadata.partition());
				}
			}

		});

		// Flush any accumulated records from the producer.
		// Blocks until all sends are complete.
		producer.flush();

		producer.close();

	}

}

几点说明

  • broker 集群地址

Producer 需要知道 kafka broker 集群中的一个或多个broker 地址(并不需要全部,当然写得越多,越有利于高可用)。但是现在的Producer已经不需要知道ZK地址了,因为kafka一直在致力于把实现隐藏起来。

  • 异步发送

为了提高发送端的效率,都流行异步。但是异步如何保证消息不丢呢 ?


Consumer

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerDemo {

	public static void main(String[] args) throws Exception {
		
		String topic = "test";
		
		Properties props = new Properties();  
		props.put("bootstrap.servers", "10.213.42.230:10062,10.213.42.234:10062,10.213.44.188:10062,10.213.44.189:10062");  
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
		
		props.setProperty("group.id", "jconsumer-passport"); // 定义group  
		props.setProperty("enable.auto.commit", "true");  
		props.setProperty("auto.offset.reset", "earliest");
		
		  
		Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);  
		// consumer.subscribe(Lists.newArrayList(topic));
		
		consumer.subscribe(Arrays.asList(new String[] {topic})); // 可以同时消费多个 Topic
		
		  
		for (int i = 0; i < 2; i++) {  
		    
			ConsumerRecords<String, String> records = consumer.poll(1000); // 拉取 
		    System.out.println("polled: "+ records.count());  // 一次可拉取成千上万条
		    for (ConsumerRecord<String, String> record : records) {  
		        System.out.println("\t\trecord: " +record);  
		        //consumer.seekToBeginning(new TopicPartition(record.topic(), record.partition()));  
		    }  
		}
		
		consumer.close();
		
	}

}

几点说明

  • broker 集群地址

从0.9开始,Consumer API被kafka官方重写,Consumer也不需要依赖ZK了,以前用ZK是因为:Rebalance和Offset管理,现在是kafka用一个叫"__consumer_offsets"的Topic来管理。

  • groupid 与 topic 是多对多的关系

一个groupid 可以消费多个topic的内容,一个topic也可以被多个groupid消费。 我们可以把groupid理解为对数据的一个应用场景。

groupid与consumerid的关系是多对1的关系。


监控画面

可视化环境搭建,请参考:https://github.com/downgoon/hello-world/wiki/zookeeper-and-kafka-cluster

consumer group 查询.png

consumer offset.png

metrics.png

参考资料

⚠️ **GitHub.com Fallback** ⚠️