zms client接入手册 - ZTO-Express/zms GitHub Wiki
zms-client支持Rocketmq和Kafka集群消息的发送和消费,使用zms-client需要在zms管理系统进行元数据的维护。
<dependency>
<groupId>com.zto.zms</groupId>
<artifactId>zms-client</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
zms_zk地址是在zms管理系统配置的zookeeper数据源地址。
-
在启动参数上设置zms_zk地址
-
在启动命令上设置zms_zk地址
java -Dzms_zk=127.0.0.1:2181 -jar xxx.jar
@Before
public void setup() {
System.setProperty(ZmsConst.ZK.ZMS_STARTUP_PARAM, "127.0.0.1:2181");
}
/**
* 同步发送示例
*/
@Test
public void testSendSync() {
for (int i = 0; i < 10; i++) {
Properties properties = new Properties();
properties.put("timeout", 30);
properties.put("retries", 0);
SendResult response = Zms.send("test-send-message", new SimpleMessage((System.currentTimeMillis() + "").getBytes()), properties);
System.out.println(response.getCode() + response.getMsgId() + response.getMsg());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 异步发送示例
*/
@Test
public void testSendAsync() {
for (int i = 0; i < 10; i++) {
Properties properties = new Properties();
properties.put("timeout", 100);
properties.put("retries", 0);
Zms.sendAsync("test-send-message", new SimpleMessage((System.currentTimeMillis() + "").getBytes()), properties, new SendCallback() {
@Override
public void onException(Throwable exception) {
System.out.println(exception);
System.out.println("1");
}
@Override
public void onResult(SendResult response) {
System.out.println("2");
System.out.println(response.getCode() + response.getMsgId() + response.getMsg());
}
});
}
}
/**
* 消费消息
*/
@Test
public void testSubscribe() {
Zms.subscribe("test-receive-message", new MessageListener() {
@Override
public MsgConsumedStatus onMessage(ConsumeMessage msg) {
System.out.println(new String(msg.getPayload()));
return MsgConsumedStatus.SUCCEED;
}
});
try {
Thread.sleep(1000 * 1000 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
-
发送消息
public static void sendAsync(String topic, SimpleMessage simpleMessage, Properties properties, SendCallback callBack)
topic:主题
simpleMessage:包含参数:
key:消息检索和分片key,顺序消息使用
tags:标签,进一步区分相同topic不同目的的数据,如发送给某一个项目
delayLevel:延时发送级别,默认值0,发送延时消息设置此参数,为1~18之前的整数,
payload:消息体
properties:生产者配置,包含参数:
timeout:生产者在发送数据时等待服务器返回响应的时间,默认值3s
retries:发送失败时重试次数,默认值2
callBack:异步发送后的回调方法
-
消费消息
public static void subscribe(String consumerGroup, Set<String> tags, MessageListener listener, Properties properties)
consumerGroup:消费组
tags:标签,筛选相同topic不同目的的数据,如只订阅自己项目的消息
listener:消费监听回调
MessageListener:接收经zms转换后的实体类
RocketmqMessageListener:接收rocketmq原生的批量消息
properties:消费者配置,包含参数:
isOrderly:是否顺序消费,true/false,默认值false
consumeMessagesSize:批量拉取消息数量,一次最多拉多少条,默认值32
rocketmqConsumeBatchSize:消息并发消费时一次消费消息条数,通俗点说就是每次传入MessageListtener#consumeMessage中的消息条数,默认值1
consumeThreadMin:消费最小线程数,Integer,默认值20
consumeThreadMax:消费最大线程数,Integer,默认值64
另:Rocketmq消费时的是否最早消费、是否广播消费,都是在zms管理系统创建消费组时设置的
-
发送消息
public static void sendAsync(String topic, SimpleMessage simpleMessage, Properties properties, SendCallback callBack)
topic:主题
simpleMessage:包含参数:
key:分片key,顺序消息使用
payload:消息体
properties:生产者配置,包含参数:
timeout:生产者在发送数据时等待服务器返回响应的时间,无默认值
retries:发送失败时重试次数,默认值0
acks:发送成功确认逻辑,默认all
0:只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,直接认为这个消息发送成功。
1:只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管其他的Follower有没有同步过去这条消息了。这是Kafka默认的设置方式。
all:Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。
callBack:异步发送后的回调方法
-
消费消息
public static void subscribe(String consumerGroup, Set<String> tags, MessageListener listener, Properties properties)
consumerGroup:消费组
listener:消费监听回调
MessageListener:接收经zms转换后的实体类
KafkaMessageListener:接收kafka原生的消息
KafkaBatchMsgListener:接收kafka原生的批量消息
properties:消费者配置,包含参数:
consumeMessagesSize:批量拉取消息数量,默认500
consumeThreadMin:消费最小线程数,Integer,默认值处理器个数
consumeThreadMax:消费最大线程数,Integer,默认值处理器个数*2,但不小于consumeThreadMin
另:Kafka消费时的是否最早消费参数,是在zms管理系统创建消费组时设置的。