String boot 使用 redis stream 消息队列 - zhouted/zhouted.github.io GitHub Wiki

标签: java redis stream


依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-redis</artifactId>
</dependency>

实体class SampleObj

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleObj {
    String name;
    ......
}

发消息

@Service
@Slf4j
public class SampleService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public RecordId produce(String streamKey, SampleObj obj ){
        ObjectRecord<String, SampleObj> record = StreamRecords.newRecord().ofObject(obj).withStreamKey(streamKey);
        RecordId recordId = this.redisTemplate.opsForStream().add(record);
        if (Objects.isNull(recordId)) {
            log.info("error sending {} event: {}", streamKey, obj);
            return null;
        }
        return recordId;
    }
}

配置class SampleRedisConfig

@Configuration
@Slf4j
public class SampleRedisConfig {
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, SampleObj>> streamMessageListenerContainer(
            RedisConnectionFactory connectionFactory
    ) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
                ObjectRecord<String, SampleObj>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofMillis(100))
                .targetType(SampleObj.class)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, SampleObj>> container =
                StreamMessageListenerContainer.create(connectionFactory, options);
        return container;
    }
}

监听器 class AStreamListener

@Slf4j
@Service
public class AStreamListener implements StreamListener<String, ObjectRecord<String, SampleObj>>{
    private String streamKey = "stream_a";
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Autowired
    StreamMessageListenerContainer<String, ObjectRecord<String, SampleObj>> streamMessageListenerContainer;

    @PostConstruct
    public void init(){
        Subscription receive = streamMessageListenerContainer.receive(StreamOffset.create(streamKey, ReadOffset.latest()), this);
        log.info("register listener: {} {}", streamKey, receive);
    }

    @Override
    public void onMessage(ObjectRecord<String, SampleObj> message) {
        try {
            RecordId messageId = message.getId();
            SampleObj obj = message.getValue();
            log.info("收到消息:messageId={}, stream={}, body={}", messageId, streamKey, obj);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            this.redisTemplate.opsForStream().delete(message);
        }
    }
}

进阶

如果需要多个队列,用多个streamKey做多个StreamListener即可。

如果需要消费者组(一个队列多个消费者分组消费),用这个方法创建消费者组:

private void createConsumerGroupIfNotExists(RedisConnectionFactory redisConnectionFactory,
                                            String streamKey, String groupName){
    try {
        try {
            redisConnectionFactory.getConnection().streamCommands()
                    .xGroupCreate(streamKey.getBytes(), groupName, ReadOffset.from("0-0"), true);
        } catch (RedisSystemException exception) {
            log.warn(exception.getCause().getMessage());
        }
    } catch (RedisSystemException ex){
        log.error(ex.getMessage());
    }
}

注册Listener和onMessage的变化如下:

    @PostConstruct
    public void init(){
        createConsumerGroupIfNotExists(connectionFactory, streamKey, groupName);
        StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
        String consumerName = InetAddress.getLocalHost().getHostName();
        Subscription receive = streamMessageListenerContainer.receive(Consumer.from(groupName, consumerName), streamOffset, this);
        log.info("register listener: {} {} {}", streamKey, groupName, receive);
    }

    @Override
    public void onMessage(ObjectRecord<String, SampleObj> message) {
        ......
        this.redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageId.getValue());//delete(message);
        ......
    }

参考

⚠️ **GitHub.com Fallback** ⚠️