kafka安装使用 - wtdig/study GitHub Wiki
获取安装文件
wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
解压文件
tar -zxvf kafka_2.11-1.1.0.tgz
涉及的配置文件:zookeeper.properties、producer.properties、consumer.properties、server.properties
一般修改下:server.properties
listeners=PLAINTEXT://45.78.9.159:9092 45.78.9.159为ip地址
启动内置的zookepper:
./bin/zookeeper-server-start.sh config/zookeeper.properties
启动broker
./bin/kafka-server-start.sh config/server.properties
注意:
Kafka在启动一段时间后,如果出现服务自动关闭情况,可在启动kafka的时使用守护进程模式启动,即在原启动命令中加 -daemon
./bin/kafka-server-start.sh -daemon config/server.properties
创建一个主题:Demo1
./bin/kafka-topics.sh --create --zookeeper 45.78.9.159:2181 --replication-factor 1 --partitions 1 --topic Demo1
查看所有的主题:
./bin/kafka-topics.sh --zookeeper 45.78.9.159:2181 --list
查看Demo1的主题信息描述
./bin/kafka-topics.sh --describe --zookeeper 45.78.9.159:2181 --topic Demo1
向Demo1主题发送消息
./bin/kafka-console-producer.sh --broker-list 45.78.9.159:9092 --topic Demo1
接受Demo1主题消息
./bin/kafka-console-consumer.sh --zookeeper 45.78.9.159:2181 --from-beginning --topic Demo1
修改Demo1的主题消息
./bin/kafka-topics.sh --alter --zookeeper 45.78.9.159:2181 --partitions 11 --topic Demo1
删除Demo1的主题消息
./bin/kafka-topics.sh --delete --zookeeper 45.78.9.159:2181 --topic Demo1
停止zookeeper
./bin/zookeeper-server-stop.sh
停止kafka
./bin/kafka-server-stop.sh
1)pom文件
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
2)java代码
package com.wtdig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "45.78.9.159:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic-wtdig"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
package com.wtdig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 消息生产者
*/
public class ProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "45.78.9.159:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic-wtdig", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}