消息可靠传输 - 969251639/study GitHub Wiki

为了防止消息不丢,发送方必须确保消息存储到消息中间的磁盘中,消费者必须幂等执行,另外所有的消息内容统一为json串

实现思路

  1. 生产者
    发送方必须等服务器返回后方可确认发送完成,否则一直重试
  2. 消费者
    由于生产者可能会重复发送,所以消费者必须实现幂等,保证有且仅此一次消费

实现方案

  1. 将发送的消息包装起来,存储在本地数据库,通过定时任务定时扫描,这里消息入库后即可对调用者返回,相当于消息发送成功,由定时任务确保数据的真正发送
  2. 消费者的幂等可以通过以下三种方式实现
    2.1 数据库方式:通过生产者自动产生一个与业务无关的ID(可以用UUID),消费者将其存储到数据库进行去重
    2.2 BloomFilter:通过生产者自动产生一个与业务无关的ID(可以用UUID),消费者将其存储到布隆过滤器进行去重,但有误判的风险
    2.3 Redis:通过生产者自动产生一个与业务无关的ID(ID只能是数字递增),消费者将其存储到Redis的Map中,key为业务标识,value为这个id,那么消费者接收到之后会先写入这个id,比如写进5,那么5或5之前的表示已经被接收,可以丢弃

实现流程:

  1. 客户端本地业务和发送消息的事务绑在一起,这样消息和业务一起在本地事务处理
  2. 消息入库后唤醒线程池中的线程去同步发送该消息,同时生成一个去重ID绑定到该消息中,得到服务器的ack后将该消息在数据库中标记为已发送
  3. 消费者使用数据库方式去重,消费者接收消息将去重ID入库,并标记为已接收,当消费者处理完后标记为已完成
  4. 生产者启动一个Timer,定时扫描待发送的消息去发送,消费者启动一个Timer,定时删除扫描标记已完成的消息Id

实现

  1. 创建表
    生产者创建消息表
CREATE TABLE message (
  id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  msg_id VARCHAR(32) DEFAULT NULL COMMENT '消息ID',
  content VARCHAR(2000) NOT NULL COMMENT '消息内容,一般为json串',
  destination VARCHAR(100) DEFAULT NULL COMMENT 'MQ的队列名或主题名',
  type TINYINT DEFAULT NULL COMMENT '类型:1:队列,2:主题订阅',
  status TINYINT NOT NULL DEFAULT 0 COMMENT '状态:0:待发送,1:已发送',
  retry_count INT NOT NULL DEFAULT 0 COMMENT '重发次数',
  description VARCHAR(100) DEFAULT NULL COMMENT '描述',
  create_time DATETIME DEFAULT NULL COMMENT '创建时间',
  update_time DATETIME DEFAULT NULL COMMENT '最后修改时间',
  PRIMARY KEY (id),
  UNIQUE KEY unique_msg_id (msg_id)
) ENGINE=INNODB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

消费者创建ID去重表

CREATE TABLE message_id (
  id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  msg_id VARCHAR(32) DEFAULT NULL COMMENT '消息ID',
  status TINYINT NOT NULL DEFAULT 0 COMMENT '状态:0:已接收,1:已完成',
  create_time DATETIME DEFAULT NULL COMMENT '创建时间',
  update_time DATETIME DEFAULT NULL COMMENT '最后修改时间',
  PRIMARY KEY (id),
  UNIQUE KEY unique_msg_id (msg_id)
) ENGINE=INNODB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
  1. 创建实体
public class Message {
	private Integer id;
	private String msgId;
	private String content;
	private String destination;
	private Integer type;
	private Integer status;
	private Integer retryCount;
	private String description;
	private Date createTime;
	private Date updateTime;
        private List<Integer> ids;
        ...Getter Setter
}
public class Message {
	private Integer id;
	private String msgId;
	private String content;
	private String destination;
	private Integer type;
	private Integer status;
	private Integer retryCount;
	private String description;
	private Date createTime;
	private Date updateTime;

	public Message() {
		
	}

	public Message(String destination, String content, Integer type) {
		this.description = destination;
		this.content = content;
		this.type = type;
	}
	
	public Message(String destination, String content, Integer type, String description) {
		this.description = destination;
		this.content = content;
		this.type = type;
		this.description = description;
	}
        ...Getter Setter
}
public class MessageId {
	private Integer id;
	private String msgId;
	private Integer status;
	private Date createTime;
	private Date updateTime;
        ...Getter Setter
}
  1. 创建Mapper、Dao、Service
    Mapper:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.yujinyi.message.dao.MessageIdDao">

  <resultMap type="com.yujinyi.message.dao.model.MessageId" id="MessageIdResultMap">
		    <result  property="id" column="id" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="msgId" column="msg_id" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="status" column="status" jdbcType="INTEGER" javaType="Integer"/>
		    <result  property="createTime" column="create_time" jdbcType="TIMESTAMP" javaType="Date"/> 
		    <result  property="updateTime" column="update_time" jdbcType="TIMESTAMP" javaType="Date"/>
  </resultMap>

  <insert id="insert" parameterType="com.yujinyi.message.dao.model.MessageId" >
    INSERT INTO message_id (
    	msg_id,status,create_uptime,update_time
	)
    <trim prefix="VALUES (" suffix=")" suffixOverrides="," >
        #{msgId,jdbcType=VARCHAR},
        #{status,jdbcType=INTEGER},
        #{createTime,jdbcType=TIMESTAMP},
        #{updateTime,jdbcType=TIMESTAMP},
    </trim>
  </insert>
  
  <update id="updateStatusByMsgId" parameterType="com.yujinyi.message.dao.model.MessageId">
	UPDATE message_id SET status=#{status,jdbcType=INTEGER}, update_time=#{updateTime,jdbcType=TIMESTAMP}
	WHERE msg_id=#{msgId,jdbcType=VARCHAR}
  </update>
  
  <update id="deleteDone">
    DELETE FROM message_id WHERE status=1 ORDER BY id ASC LIMIT 100
  </update>

</mapper>

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.yujinyi.message.dao.MessageDao">

  <resultMap type="com.yujinyi.message.dao.model.Message" id="MessageResultMap">
		    <result  property="id" column="id" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="msgId" column="msg_id" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="content" column="content" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="destination" column="destination" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="type" column="type" jdbcType="INTEGER" javaType="Integer"/>
		    <result  property="status" column="status" jdbcType="INTEGER" javaType="Integer"/>
		    <result  property="retryCount" column="retry_count" jdbcType="INTEGER" javaType="Integer"/>
		    <result  property="description" column="description" jdbcType="VARCHAR" javaType="String"/>
		    <result  property="createTime" column="create_time" jdbcType="TIMESTAMP" javaType="Date"/> 
		    <result  property="updateTime" column="update_time" jdbcType="TIMESTAMP" javaType="Date"/>
  </resultMap>

  <insert id="insert" parameterType="com.yujinyi.message.dao.model.Message" >
    INSERT INTO message (
    	msg_id,content,destination,type,status,retry_count,description,create_uptime,update_time
	)
    <trim prefix="VALUES (" suffix=")" suffixOverrides="," >
        #{msgId,jdbcType=VARCHAR},
        #{content,jdbcType=VARCHAR},
        #{destination,jdbcType=VARCHAR},
        #{type,jdbcType=INTEGER},
        #{status,jdbcType=INTEGER},
        #{retryCount,jdbcType=INTEGER},
        #{description,jdbcType=VARCHAR},
        #{createTime,jdbcType=TIMESTAMP},
        #{updateTime,jdbcType=TIMESTAMP},
    </trim>
  </insert>
  
  <update id="updateRetryCount" parameterType="com.yujinyi.message.dao.model.Message">
	UPDATE message SET retry_count=retry_count+1, update_time=#{updateTime,jdbcType=TIMESTAMP} 
	WHERE id IN
	<foreach collection="ids" index="index" item="item" open="(" separator="," close=")">   
	    #{item}   
	</foreach>
  </update>
  
  <update id="updateStatus" parameterType="com.yujinyi.message.dao.model.Message">
	UPDATE message SET status=#{status,jdbcType=INTEGER}, update_time=#{updateTime,jdbcType=TIMESTAMP} 
	WHERE id IN
	<foreach collection="ids" index="index" item="item" open="(" separator="," close=")">   
	    #{item}   
	</foreach>
  </update>
  
  <select id="queryByLimit" resultMap="MessageResultMap">
    SELECT msg_id,content,destination,type FROM message WHERE status=0 ORDER BY id ASC LIMIT 100
  </select>

</mapper>

Dao:

@Mapper
public interface MessageDao {
	public int insert(Message message);
	public int updateRetryCount(Message message);
	public int updateStatus(Message message);
	public List<Message> queryByLimit();
}

@Mapper
public interface MessageIdDao {
	public int insert(MessageId messageId);
	public int updateStatusByMsgId(MessageId messageId);
	public int deleteDone();
}

Service:

public interface MessageService {
	public int insert(Message message);
	public int updateRetryCount(Message message);
	public int updateStatus(Message message);
	public List<Message> queryByLimit();
}
public interface MessageIdService {
	public int insert(MessageId messageId);
	public int updateStatusByMsgId(MessageId messageId);
	public int deleteDone();
}
public class MessageServiceImpl implements MessageService {
	
	@Autowired
	private MessageDao messageDao;

	@Override
	@Transactional
	public int insert(Message message) {
		return messageDao.insert(message);
	}

	@Override
	@Transactional
	public int updateRetryCount(Message message) {
		return messageDao.updateRetryCount(message);
	}

	@Override
	@Transactional
	public int updateStatus(Message message) {
		return messageDao.updateStatus(message);
	}

	@Override
	public List<Message> queryByLimit() {
		return messageDao.queryByLimit();
	} 
	
}
public class MessageIdServiceImpl implements MessageIdService {
	
	@Autowired
	private MessageIdDao messageIdDao; 

	@Override
	@Transactional
	public int insert(MessageId messageId) {
		return messageIdDao.insert(messageId);
	}

	@Override
	@Transactional
	public int updateStatusByMsgId(MessageId messageId) {
		return messageIdDao.updateStatusByMsgId(messageId);
	}

	@Override
	@Transactional
	public int deleteDone() {
		return messageIdDao.deleteDone();
	}
	
}

  1. 编写定时器
    生产者:
@Component
public class MessageTask {
	private static final Logger LOGGER = LoggerFactory.getLogger(MessageTask.class);
	private static final int QUEUE = 1;//队列
	private static final int SUBJECT = 2;//主题
	
	@Autowired
	private MessageService messageService;
	
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
	
	@Scheduled(cron="0 */5 * * * ?")//每五分钟执行一次
	public void task() {
		List<Message> list = messageService.queryByLimit();
		List<Integer> retryIds = new ArrayList<>();
		List<Integer> commitIds = new ArrayList<>();
		for(Message message : list) {
			ActiveMQDestination destination = null;
			if(message.getType().intValue() == QUEUE) {
				destination = new ActiveMQQueue(message.getDestination());
			}else if(message.getType().intValue() == SUBJECT) {
				destination = new ActiveMQTopic(message.getDestination());
			}
			if(destination != null) {
				try {
					String s = jmsMessagingTemplate.convertSendAndReceive(destination, message.getContent(), String.class);
					if(StringUtils.isNotBlank(s)) {
						commitIds.add(message.getId());
					}else {
						retryIds.add(message.getId());
					}
				} catch (Exception e) {
					LOGGER.error("send to mq error", e);
					retryIds.add(message.getId());
				}
			}
		}
		Message message = new Message();
		message.setUpdateTime(new Date());
		
		message.setIds(retryIds);
		messageService.updateRetryCount(message);//发送失败的消息,更新重试次数
		
		
		message.setStatus(1);
		message.setIds(commitIds);
		messageService.updateStatus(message);//发送成功的消息,更新发送成功
	}
	
}

消费者:

@Component
public class MessageIdTask {
	
	@Autowired
	private MessageIdService messageIdService;
	
	@Scheduled(cron="0 */5 * * * ?")//每五分钟执行一次
	public void task() {
		messageIdService.deleteDone();
	}
	
}

  1. 编写消费者的拦截器,消费之前都去确认一下是否已存在未确认的消息,消费完后去自动确认消息
@Aspect
@Component
@EnableAutoConfiguration
@ComponentScan
public class JmsAspect {
	private static final Logger LOGGER = LoggerFactory.getLogger(JmsAspect.class);
	
	@Autowired
	private MessageIdService messageIdService;
	
	@Pointcut("@annotation(org.springframework.jms.annotation.JmsListener)")
	public void aspect() {
	}

	@Around("@annotation(jmsListener)")
	public Object around(ProceedingJoinPoint pjp, JmsListener jmsListener) throws Throwable {
		String msgId = null;
		boolean result = false;
		try {
			Gson gson = new Gson();
			Object s = pjp.getArgs()[0];
			JsonElement je = gson.fromJson(s.toString(), JsonElement.class);
			if(je != null && je.isJsonObject()) {
				msgId = je.getAsJsonObject().get("msgId").getAsString();
				MessageId messageId = new MessageId();
				messageId.setCreateTime(new Date());
				messageId.setMsgId(msgId);
				messageId.setStatus(0);
				messageIdService.insert(messageId);
			}
			Object o = pjp.proceed();
			result = true;
			return o;
		}catch(Exception e) {
			LOGGER.error("message repeat consum; msgId: " + msgId, e);
			return null;
		}finally {
			if(result) {//如果result=true,那么消费方法已经成功执行
				MessageId messageId = new MessageId();
				messageId.setMsgId(msgId);
				messageId.setStatus(1);
				messageId.setUpdateTime(new Date());
				messageIdService.updateStatusByMsgId(messageId);
			}
		}
	}
}
  1. 编写生产者在发送之前将json对象新增msgId
    5.1 修改message的insert方法
public class MessageServiceImpl implements MessageService {
	public final static ThreadLocal<List<Message>> THREAD_LOCAL = new ThreadLocal<>();

	@Override
	@Transactional
	public int insert(Message message) {
		//生成一个与业务无关的uuid作为msgId
		message.setMsgId(UUID.randomUUID().toString());
		Gson gson = new Gson();
		JsonElement je = gson.fromJson(message.getContent(), JsonElement.class);
		je.getAsJsonObject().addProperty("msgId", message.getMsgId());
                message.setContent(gson.toJson(je));
		int result = messageDao.insert(message);
		List<Message> list = THREAD_LOCAL.get();
		if(CollectionUtils.isEmpty(list)) {
			list = new ArrayList<>(5);
			THREAD_LOCAL.set(list);
		}
		list.add(message);
		return result;
	}
        ...
}

新增一个ThreadLocal记录当前线程已有message需要在事务提交完成后发送,且需要用List记录,因为一个线程一次操作可能插入多条message,也就是需要发送多次mq

5.2 拦截事务方法,在事务方法正确完成后去发送这些message

@Aspect
@Component
@EnableAutoConfiguration
@ComponentScan
public class TransactionalAspect {
	private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalAspect.class);
	private static final int QUEUE = 1;//队列
	private static final int SUBJECT = 2;//主题
	private final static Object OBJECT = new Object();
	private final static ThreadLocal<Object> THREAD_LOCAL = new ThreadLocal<>();
	private final static ExecutorService EXECUTOR = Executors.newFixedThreadPool(10);
	
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
	
	@Autowired
	private MessageService messageService;
	
	// Service层切点
	@Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)")
	public void aspect() {
	}

	@Around("@annotation(transactional)")
	public Object around(ProceedingJoinPoint pjp, Transactional transactional) throws Throwable {
		//记录第一个Transactional,也就是发起点
		Object object = TransactionalAspect.THREAD_LOCAL.get();
		if(object == null) {//空则是事务发起者
			TransactionalAspect.THREAD_LOCAL.set(OBJECT);
			try {
				Object o = pjp.proceed();
				//获取需要发送的message
				final List<Message> messages = MessageServiceImpl.THREAD_LOCAL.get();
				if(CollectionUtils.isEmpty(messages)) {
					EXECUTOR.submit(() -> {
						List<Integer> retryIds = new ArrayList<>();
						List<Integer> commitIds = new ArrayList<>();
						for(Message message : messages) {
							ActiveMQDestination destination = null;
							if(message.getType().intValue() == QUEUE) {
								destination = new ActiveMQQueue(message.getDestination());
							}else if(message.getType().intValue() == SUBJECT) {
								destination = new ActiveMQTopic(message.getDestination());
							}
							if(destination != null) {
								try {
									String s = jmsMessagingTemplate.convertSendAndReceive(destination, message.getContent(), String.class);
									if(StringUtils.isNotBlank(s)) {
										commitIds.add(message.getId());
									}else {
										retryIds.add(message.getId());
									}
								} catch (Exception e) {
									LOGGER.error("send to mq error", e);
									retryIds.add(message.getId());
								}
							}
						}
						Message message = new Message();
						message.setUpdateTime(new Date());
						
						message.setIds(retryIds);
						messageService.updateRetryCount(message);//发送失败的消息,更新重试次数
						
						message.setStatus(1);
						message.setIds(commitIds);
						messageService.updateStatus(message);//发送成功的消息,更新发送成功
					});
				}
				return o;
			}finally {//发起者执行完必须要清除掉threadLocal的内容
				TransactionalAspect.THREAD_LOCAL.remove();
				MessageServiceImpl.THREAD_LOCAL.remove();
			}
		}else {
			return pjp.proceed();//调用业务方法
		}
	}
}
⚠️ **GitHub.com Fallback** ⚠️