分布式ID - 969251639/study GitHub Wiki
分布式ID有很多种算法,比如常见的雪花算法,uuid等,这里使用基于数据库的flickr算法实现
公司从2017年末转型到spring cloud的微服务架构开始,就肯定需要一个分布式的ID算法来支持多个微服务的id生成,经过调研,发现数据库的实现思路简单,功能强大,无需远端生成,只需远端申请,故自己实现了一套简单的分布式id生成方案
新建一张表,用于记录号段的领取
create table seq_gen (
id 主键,
host_ip 最后一次领取的主机ip
service 业务 unique key
start_seq 开始号段 default 1
end_seq 结束号段
step 步长
description 描述
update_time 最后修改时间
)
其中规定,service为唯一索引,即一个业务只有一个号段,步长控制了号段的大小,比如A业务每次需要领1000个号段,那么
第一次:start_seq=1,end_seq=1000,step=1000
第二次:start_seq=1001,end_seq=2000,step=1000
以此类推
那么每次业务领取的时候都是通过一下的sql去查询
select start_seq, end_seq from seq_gen where service=?;
每次领取之后更新的sql
update seq_gen set start_seq=end_seq+1, end_seq=end_seq+step where service=?;
接下来解决并发很高的情况下如何防止机器漏拿和重复拿号段的问题: 因为领取和更新时两个sql,那么至少是两个sql的执行,如何保证高并发下的原子性呢?
lock住该业务即可:
begin;
if(update seq_gen set start_seq=end_seq+1, end_seq=end_seq+step where service=? == 0) {
insert into seq_gen values(...);
}
select start_seq, end_seq from seq_gen where service=?;
commit;
lock性能由下面的双buffer解决;
解决了原子性操作之后如何保证高可用呢?
可用mysql的主从;另外,由于该功能不能有延迟性,所以读写都走主库,不能读写分离
最后,需要解决申请号段的性能问题
这里使用双buffer机制,即每个业务都有两个buffer,其中一个用来使用号段的累加,直到这个号段使用完了立马切到下一个buffer,而下一个buffer在第一个buffer使用到一定程度时(可配置)立马去提前申请下一段号段缓存起来,这样就不用等到第一个号段用完的时候再去申请而造成的阻塞
- 创建表
CREATE TABLE seq_gen (
id INT NOT NULL AUTO_INCREMENT COMMENT '主键',
host_ip VARCHAR(50) COMMENT '最后一次领取的主机ip',
service VARCHAR(50) NOT NULL COMMENT '业务',
start_seq BIGINT NOT NULL DEFAULT 1 COMMENT '开始号段',
end_seq BIGINT NOT NULL COMMENT '结束号段',
step BIGINT NOT NULL COMMENT '步长',
description VARCHAR(100) COMMENT '描述',
update_time DATETIME COMMENT '最后修改时间',
PRIMARY KEY (id),
UNIQUE KEY unique_key_service(service)
);
- 创建mybatis的mapper,实现model, dao, service, controller
mapper:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.yujinyi.seq.gen.dao.SeqGenDao">
<resultMap type="com.yujinyi.seq.gen.dao.model.SeqGen" id="SeqGenResultMap">
<result property="id" column="id" jdbcType="VARCHAR" javaType="String"/>
<result property="hostIp" column="host_ip" jdbcType="VARCHAR" javaType="String"/>
<result property="service" column="service" jdbcType="VARCHAR" javaType="String"/>
<result property="startSeq" column="start_seq" jdbcType="BIGINT" javaType="Long"/>
<result property="endSeq" column="end_seq" jdbcType="BIGINT" javaType="Long"/>
<result property="step" column="step" jdbcType="BIGINT" javaType="Long"/>
<result property="description" column="description" jdbcType="VARCHAR" javaType="String"/>
<result property="updateTime" column="update_uptime" jdbcType="TIMESTAMP" javaType="Date"/>
</resultMap>
<insert id="insert" parameterType="com.yujinyi.seq.gen.dao.model.SeqGen" >
INSERT INTO seq_gen (
host_ip,service,start_seq,end_seq,step,description,update_time
)
<trim prefix="VALUES (" suffix=")" suffixOverrides="," >
#{hostIp,jdbcType=VARCHAR},
#{service,jdbcType=VARCHAR},
#{startSeq,jdbcType=BIGINT},
#{endSeq,jdbcType=BIGINT},
#{step,jdbcType=BIGINT},
#{description,jdbcType=VARCHAR},
#{updateTime,jdbcType=TIMESTAMP},
</trim>
</insert>
<update id="updateByService" parameterType="String">
UPDATE seq_gen SET start_seq=end_seq+1, end_seq=end_seq+step, update_time=NOW() WHERE service=#{service,jdbcType=VARCHAR}
</update>
<select id="queryByService" resultMap="SeqGenResultMap" parameterType="String" >
SELECT start_seq, end_seq, step FROM seq_gen WHERE service=#{service,jdbcType=VARCHAR}
</select>
<select id="queryAllService" resultType="String" >
SELECT service FROM seq_gen
</select>
</mapper>
model:
public class SeqGen {
private Integer id;
private String hostIp;
private String service;
private Long startSeq;
private Long endSeq;
private Long step;
private String description;
private Date updateTime;
getter, setter...
}
dao:
@Mapper
public interface SeqGenDao {
public int insert(SeqGen seqGen);
public int updateByService(@Param(value = "service")String service);
public SeqGen queryByService(@Param(value = "service")String service);
public List<String> queryAllService();
}
service:
public interface SeqGenService {
public int insert(SeqGen seqGen);
public int updateByService(String service);
public SeqGen queryByService(String service);
public List<String> queryAllService();
public SeqGen applySeq(SeqGen seqGen);
}
@Service
public class SeqGenServiceImpl implements SeqGenService {
@Autowired
private SeqGenDao seqGenDao;
@Override
@Transactional
public int insert(SeqGen seqGen) {
seqGen.setUpdateTime(new Date());
seqGen.setStartSeq(1L);
seqGen.setEndSeq(seqGen.getStep());
return seqGenDao.insert(seqGen);
}
@Override
@Transactional
public int updateByService(String service) {
return seqGenDao.updateByService(service);
}
@Override
public SeqGen queryByService(String service) {
return seqGenDao.queryByService(service);
}
@Override
public List<String> queryAllService() {
return seqGenDao.queryAllService();
}
@Override
@Transactional
public SeqGen applySeq(SeqGen seqGen) {
if(updateByService(seqGen.getService()) == 0) {
return null;
}
return queryByService(seqGen.getService());
}
}
controller:
@RestController
public class SeqGenController {
@Autowired
private SeqGenService seqGenService;
@RequestMapping(value = "/applySeq", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
@org.springframework.web.bind.annotation.ResponseBody
public SeqGen applySeq(SeqGen seqGen) throws Exception {
if(StringUtils.isBlank(seqGen.getService())) {
throw new Exception("param service is null");
}
return seqGenService.applySeq(seqGen);
}
@RequestMapping(value = "/initSeq", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
@org.springframework.web.bind.annotation.ResponseBody
public int initSeq(SeqGen seqGen) throws Exception {
if(StringUtils.isBlank(seqGen.getService())) {
throw new Exception("param service is null");
}
if(seqGen.getStep() == null) {
throw new Exception("param step is null");
}
return seqGenService.insert(seqGen);
}
@RequestMapping(value = "/queryAllService", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
@org.springframework.web.bind.annotation.ResponseBody
public List<String> queryAllService() throws Exception {
return seqGenService.queryAllService();
}
}
- 客户端启动自动的先去加载service到缓存中
@Configuration
public class ApplicationStartup implements ApplicationRunner {
@Autowired
private SeqGenService seqGenService;
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> list = seqGenService.queryAllService();
if (!CollectionUtils.isEmpty(list)) {
for (String s : list) {//申请service提前加载到内存
DoubleIdBuffer db = new DoubleIdBuffer(s);
SeqGen seqGen = IdUtils.applySeq(s);
db.init(seqGen.getStartSeq(), seqGen.getEndSeq());
BufferIdFactory.setDoubleBufferId(s, db);
}
}
}
}
- 编写service
@FeignClient("seq-gen")
public interface SeqGenService {
@RequestMapping(value = "/seq/gen/applySeq", method = RequestMethod.GET)
public SeqGen applySeq(@RequestParam("service")String service);
@RequestMapping(value = "/seq/gen/queryAllService", method = RequestMethod.GET)
public List<String> queryAllService();
}
- 双buffer实现
public class IdBuffer {
private AtomicLong a1 = new AtomicLong();
private long maxId = 0L;//最大值
private float resizeRate = 0.5F;//扩容比例,默认达到50%时自动加载下一段
private long threshold = 0L;//阈值
public void set(long id) {
a1.set(id);
}
public long get() {
long id = a1.getAndIncrement();
if(id > this.maxId) {
return 0;
}
return id;
}
public void setMinId(long id) {
a1.set(id);
}
public void setMaxId(long id) {
this.maxId = id;
this.threshold = Math.round(maxId * resizeRate);
}
public long getMaxId() {
return this.maxId;
}
public long getThreshold() {
return this.threshold;
}
public void setResizeRate(float resizeRate) {
this.resizeRate = resizeRate;
}
}
public class DoubleIdBuffer {
private static final ExecutorService EXECUTORS = Executors.newSingleThreadExecutor();
private IdBuffer idBuffer = null;
private IdBuffer idBuffer1 = new IdBuffer();
private IdBuffer idBuffer2 = new IdBuffer();
private String service;
public DoubleIdBuffer(String service) {
this.idBuffer = idBuffer1;
this.service = service;
}
public DoubleIdBuffer(String service, float resizeRate) {
this.idBuffer = idBuffer1;
this.service = service;
idBuffer1.setResizeRate(resizeRate);
idBuffer2.setResizeRate(resizeRate);
}
public IdBuffer getIdBuffer1() {
return idBuffer1;
}
public void setIdBuffer1(IdBuffer idBuffer1) {
this.idBuffer1 = idBuffer1;
}
public IdBuffer getIdBuffer2() {
return idBuffer2;
}
public void setIdBuffer2(IdBuffer idBuffer2) {
this.idBuffer2 = idBuffer2;
}
public long getId() {
long id = idBuffer.get();
handler(id);
while(id == 0) {
id = idBuffer.get();
}
return id;
}
private void handler(long id) {
if(id > idBuffer.getThreshold()) {//达到阈值则提前去加载数据到另外一个缓存
EXECUTORS.execute(() -> {
synchronized (EXECUTORS) {
SeqGen seqGen = IdUtils.applySeq(service);
if(idBuffer == idBuffer1) {
idBuffer2.setMinId(seqGen.getStartSeq());
idBuffer2.setMaxId(seqGen.getEndSeq());
}else if(idBuffer == idBuffer2) {
idBuffer1.setMinId(seqGen.getStartSeq());
idBuffer1.setMaxId(seqGen.getEndSeq());
}
}
});
}
if(id >= idBuffer.getMaxId()) {//该缓存用完了
//切换到下一个buffer
if(idBuffer == idBuffer1) {
idBuffer = idBuffer2;
}else if(idBuffer == idBuffer2) {
idBuffer = idBuffer1;
}
}
}
public void init(long startId, long endId) {
idBuffer.set(startId);
idBuffer.setMaxId(endId);
}
}
- 申请id的工厂
public class BufferIdFactory {
private static final ConcurrentHashMap<String, DoubleIdBuffer> BUFFER_CACHE = new ConcurrentHashMap<>();
public static DoubleIdBuffer genDoubleBufferId(String service) {
return genDoubleBufferId(service, 0F);
}
public static DoubleIdBuffer genDoubleBufferId(String service, float rate) {
DoubleIdBuffer db = BUFFER_CACHE.get(service);
return db;
}
public static void setDoubleBufferId(String key, DoubleIdBuffer d) {
BUFFER_CACHE.put(key, d);
}
}
public class IdUtils {
public static SeqGen applySeq(String service) {
SeqGenService seqGenService = SpringUtil.getBean(SeqGenService.class);
SeqGen seqGen = seqGenService.applySeq(service);
return seqGen;
}
}
- 测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class Test1 {
private final static Set<Long> set = new HashSet<Long>();
@Test
public void test() {
for(int i = 0; i < 1000; i++) {
final int ii = i;
new Thread(() -> {
DoubleIdBuffer d = BufferIdFactory.genDoubleBufferId("test");
long id = d.getId();
if(set.contains(id)) {
System.out.println("error: " + id);
System.exit(0);
}
set.add(id);
System.out.println("Thread" + ii + ": " + id);
}).start();
}
}
}
- 如果申请的号段没有用完就停机了,那么重启后会重新加载新的一段号段造成这部分没有用完的旧号段的浪费
- 如果主从延迟,且主数据库挂掉,而号段又已从数据库返回,那么从库上去后如果没有同步到最新的数据,则有号段重复的风险