分布式锁 - 969251639/study GitHub Wiki
这是用Redis实现的分布式锁,抽取redisson框架中的分布式锁实现(由于引入redisson需要依赖其他第三方包,比如netty等,且redisson的编程模型是将redis包装成各种集合操作,而非原生命令操作),并做了点小改进,完美支持锁各种要求和条件。
-
如何支持重入
答:使用map数据结构存储,之中的value存的就是重入次数 -
如果其中一台机器获取到了锁,其他机器如何在等待休眠后感知到锁的释放而去继续竞争锁
答:使用redis的订阅功能,收到解锁的订阅消息后去唤醒休眠的线程醒来再次去竞争锁 -
分布式锁中的key必须设置超时间,否则有可能造成获取锁的机器在没有解锁发出去之前宕机,则这个锁永远不会释放,需要人工介入,那么如果设置了key,方法执行时间很长,超过这个key的超时时间,那么获取锁的方法还没执行完,锁就被redis自动释放了
答:用延时器一直延长key的超时时间
- 客户端申请锁,成功则运行,否则则订阅锁的释放消息后休眠
- 监听到锁释放后,唤醒线程继续抢锁
- 唤醒和休眠控制使用jdk信号量
- 醒来后需要注意超时时间的控制
lockKey: 待锁住的资源(String) timeOut: 请求锁的超时时间(ms) expireTime: 锁的有效时间(s)
lockValue: 锁的内容(UUID : ThreadId)
- 当客户端请求到锁,则执行业务方法,没到锁过期时间的三分之二后自动延长过期时间,防止方法没执行完,锁却过期,导致锁失效
*当客户端请求不到锁时等待,并订阅锁对应的通道,直至等待超时或者受到订阅通知重新去竞争锁 - 当持有锁的客户端释放锁时发布订阅消息,通知所有竞争锁的机器上的线程,每台机器受到订阅消息后释放信号量,唤醒线程重新竞争锁
- 定义锁的接口
public interface RedisLock {
void lock() throws InterruptedException, TimeoutException;
void lockInterruptibly() throws InterruptedException, TimeoutException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException, TimeoutException;
void unlock();
Condition newCondition();
}
- 监听解锁的订阅消息通道
@Component
public class LockListener extends MessageListenerAdapter {
public static final Long UNLOCK_MESSAGE = 0L;//解锁消息
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] b = message.getBody();//请使用valueSerializer
byte[] c = message.getChannel();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
String messageCode = stringRedisSerializer.deserialize(b);
if(UNLOCK_MESSAGE.toString().equals(messageCode)) {
String channel = stringRedisSerializer.deserialize(c);
LockSubscribe.release(channel);//释放信号量,唤醒线程
}
}
}
public class LockSubscribe {
private static final Map<String, Semaphore> MAP = new ConcurrentHashMap<>();
public static void subscribe(String channel) {
if(!MAP.containsKey(channel)) {
MAP.put(channel, new Semaphore(0));
}
}
public static void unsubscribe(String channel) {
MAP.remove(channel);
}
public static void wait(String channel, long timeout, TimeUnit unit) throws InterruptedException {
Semaphore semaphore = MAP.get(channel);
if(semaphore != null) {
semaphore.tryAcquire(timeout, unit);
}
}
public static void release(String channel) {
Semaphore semaphore = MAP.get(channel);
if(semaphore != null) {
semaphore.release(semaphore.getQueueLength());
}
}
}
- 加锁的LUA脚本
if (redis.call(‘exists’, lockKey) == 0) then //判断lockKey是否存在
redis.call(‘hset’, lockKey, uidtid, 1); //设置锁的value,value是个map(key: uidtid, value: 重入次数)
redis.call(‘expire’, lockKey, expireTime); //设置锁的过期时间
return nil; //返回空,表示获取锁成功
end;
if (redis.call(‘hexists’, lockKey, uidtid) == 1) then //判断是否是重入
redis.call(‘hincrby’, lockKey, uidtid, 1); //重入次数1
redis.call(‘expire’, lockKey, expireTime); //重置过期时间
return nil; //返回空,表示获取锁成功
end;
return redis.call(‘pttl’, lockKey)//获取锁失败,返回lockKey的剩余存活时间(毫秒)
- 加锁的LUA脚本
if (redis.call(‘exists’, lockKey) == 0) then //判断是否锁存在(有可能被强制解锁)
redis.call(‘publish’, channel, unlockMessage); // 发布消息,通知其他线程可以竞争锁
return 1; //返回1,表示锁释放成功
end;
if (redis.call(‘hexists’, lockKey, uid+tid) == 0) then //判断当前线程是否是锁的持有者
return nil; //返回空,表示释放锁的线程与持有锁的线程不一致
end;
local counter = redis.call(‘hincrby’, lockKey, uid+tid, -1); //重入次数减1并返回
if (counter > 0) then //判断重入次数是否仍大于0
redis.call(‘expire’, lockKey, expireTime); //重置过期时间,继续持有锁
return 0; //返回0,表示继续持有锁
else
redis.call(‘del’, lockKey); //删除lockKey
redis.call('publish', channel, unlockMessage); //发布消息,通知其他线程可以竞争锁
return 1; //返回1,表示锁释放成功
end;
return nil; //返回空,表示释放锁的线程与持有锁的线程不一致
- 锁的具体实现
public class RedisDistributedLock implements RedisLock {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisDistributedLock.class);
private RedisTemplate<String, Object> redisTemplate = RedisSupport.getRedisTemplate();
/**
* 竞争消息通道
*/
public static final String CHANNEL_PREFIX = "lock__channel";
/**
* 默认请求锁的超时时间(ms 毫秒)
*/
public static final long DEFAULT_TIME_OUT = 1000;
/**
* 默认锁的有效时间(s)
*/
public static final int DEFAULT_EXPIRE_TIME = 60;
/**
* 锁标志对应的key
*/
private String lockKey;
/**
* 锁对应的值
*/
private String lockValue = UUID.randomUUID().toString() + ":" + Thread.currentThread().getId();
/**
* 锁的有效时间(s)
*/
private int expireTime = DEFAULT_EXPIRE_TIME;
/**
* 请求锁的超时时间(ms)
*/
private long timeOut = DEFAULT_TIME_OUT;
/**
* 延长执行时间计时器
*/
private Timer timer = new Timer();
/**
* 加锁LUA脚本
*/
public static final String LOCK_SCRIPT =
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('expire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('expire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
/**
* 解锁LUA脚本
*/
public static final String UNLOCK_SCRIPT =
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('expire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;";
/**
* 强制解锁LUA脚本
*/
public static final String FORCE_UNLOCK_SCRIPT =
"if (redis.call('del', KEYS[1]) == 1) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"else " +
"return 0; " +
"end";
public static final String RENEWAL_EXPIRETION_SCRIPT =
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('expire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";
/**
* 使用默认的锁过期时间和请求锁的超时时间
*
* @param lockKey 锁的key(Redis的Key)
*/
public RedisDistributedLock(String lockKey) {
this.lockKey = lockKey;
}
/**
* 使用默认的请求锁的超时时间,指定锁的过期时间
*
* @param lockKey 锁的key(Redis的Key)
* @param expireTime 锁的过期时间(单位:秒)
*/
public RedisDistributedLock(String lockKey, int expireTime) {
this.lockKey = lockKey;
this.expireTime = expireTime;
}
/**
* 使用默认的锁的过期时间,指定请求锁的超时时间
*
* @param lockKey 锁的key(Redis的Key)
* @param timeOut 请求锁的超时时间(单位:毫秒)
*/
public RedisDistributedLock(String lockKey, long timeOut) {
this.lockKey = lockKey;
this.timeOut = timeOut;
}
/**
* 锁的过期时间和请求锁的超时时间都是用指定的值
*
* @param lockKey 锁的key(Redis的Key)
* @param expireTime 锁的过期时间(单位:秒)
* @param timeOut 请求锁的超时时间(单位:毫秒)
*/
public RedisDistributedLock(String lockKey, int expireTime, long timeOut) {
this.lockKey = lockKey;
this.expireTime = expireTime;
this.timeOut = timeOut;
}
/**
* 上锁
*/
@Override
public void lock() throws InterruptedException, TimeoutException {
lockInterruptibly();
}
/**
* 上锁(可中断)
*/
@Override
public void lockInterruptibly() throws InterruptedException, TimeoutException {
if(!tryLock(timeOut, TimeUnit.MILLISECONDS)) {
throw new TimeoutException(String.format("获取锁超时, key: %s, value: %s", lockKey, lockValue));
}
}
/**
* 尝试上锁
*/
@Override
public boolean tryLock() {
return tryAcquire() == null;
}
/**
* 尝试上锁
* @param waitTime 等待的超时时间
* @param unit 时间单位
*/
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
String channel = getChannelName();
long time = unit.toMillis(waitTime);
long currentTime = System.currentTimeMillis();
Long result = tryAcquire();//尝试获取锁
if(result == null) {//获取成功
return true;
}
time = time - (System.currentTimeMillis() - currentTime);
if (time <= 0) {//超时返回失败
return false;
}
//订阅争锁通道
currentTime = System.currentTimeMillis();
LockSubscribe.subscribe(channel);//订阅
try {
for(;;) {
time = time - (System.currentTimeMillis() - currentTime);
if (time <= 0) {//超时返回失败
return false;
}
//订阅争锁通道
currentTime = System.currentTimeMillis();
result = tryAcquire();//尝试再获取一次
if (result == null) {
return true;
}
time = time - (System.currentTimeMillis() - currentTime);
if (time <= 0) {//超时返回失败
return false;
}
//等待消息通知后重新竞争锁
currentTime = System.currentTimeMillis();
if (result >= 0 && result < time) {
LockSubscribe.wait(channel, result, TimeUnit.MILLISECONDS);
} else {
LockSubscribe.wait(channel, time, TimeUnit.MILLISECONDS);
}
}
} finally {
LockSubscribe.unsubscribe(channel);//退订
}
}
/**
* 解锁
*/
@Override
public void unlock() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(UNLOCK_SCRIPT);
redisScript.setResultType(Long.class);
//返回null则unlock的线程非持有锁的线程
//返回1则锁不存在或者释放成功,则发消息通知其他等待线程竞争锁
//返回0则重入次数非空,延长锁的超时时间
Long result = redisTemplate.execute(redisScript, Arrays.<String>asList(lockKey, getChannelName()), LockListener.UNLOCK_MESSAGE, expireTime, lockValue);
if(result == null) {
LOGGER.info("解锁线程与持有锁的线程不一致,解锁失败, key: {}, value: {}", lockKey, lockValue);
throw new IllegalMonitorStateException("解锁线程与持有锁的线程不一致,解锁失败");
}else if(result == 1L) {
LOGGER.info("解锁成功, key: {}, value: {}", lockKey, lockValue);
//移除延长key定时器
cancelExpiration();
}else if(result == 0L) {
LOGGER.info("重入次数减一, key: {}, value: {}", lockKey, lockValue);
}
}
/**
* 强制解锁
*/
public void forceUnlock() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(FORCE_UNLOCK_SCRIPT);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript, Arrays.<String>asList(lockKey, getChannelName()), LockListener.UNLOCK_MESSAGE);
if(result == 1L) {
LOGGER.info("强制解锁成功, key: {}, value: {}", lockKey);
}else {
LOGGER.info("强制解锁失败, key: {}, value: {}", lockKey);
}
}
/**
* 获取重入次数
*/
public int getHoldCount() {
return Integer.parseInt(redisTemplate.opsForHash().get(lockKey, lockValue).toString());
}
/**
* 是否被锁住
*/
public boolean isLocked() {
return redisTemplate.hasKey(lockKey);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
/**
* 申请锁
*/
private Long tryAcquire() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(LOCK_SCRIPT);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript, Collections.<String>singletonList(lockKey), expireTime, lockValue);
if(result == null) {
renewalExpiration();
}
return result;
}
/**
* 延长器
*/
private void renewalExpiration() {
timer.schedule(new TimerTask() {
@Override
public void run() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(RENEWAL_EXPIRETION_SCRIPT);
redisScript.setResultType(Long.class);
RedisSupport.getRedisTemplate().execute(redisScript, Collections.<String>singletonList(lockKey), expireTime, lockValue);
}
}, (expireTime / 3) * 1000, (expireTime / 3) * 1000);
}
/**
* 取消延长定时器
*/
private void cancelExpiration() {
timer.cancel();
}
private String getChannelName() {
return CHANNEL_PREFIX + ":" + lockKey;
}
public String getLockKey() {
return lockKey;
}
public String getLockValue() {
return lockValue;
}
public int getExpireTime() {
return expireTime;
}
public long getTimeOut() {
return timeOut;
}
}
- 使用
模拟机器1(work1)
public class Test1 {
public static void main(String[] args) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(5);
jedisPoolConfig.setMaxWaitMillis(2000);
jedisPoolConfig.setMaxTotal(10);
jedisPoolConfig.setMinIdle(1);
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestOnReturn(true);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, "你的redis ip", 端口, 连接超时时间, "redis密码");
RedisSupport.setJedisPool(jedisPool);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
RedisDistributedLock lock = new RedisDistributedLock("userId");
lock.setLockTimeoutCallback(new LockCallback() {//超时回调
@Override
public void callback(String key) {
System.out.println(key);
}
});
try {
System.out.println("begin time: " + simpleDateFormat.format(new Date()));
lock.lock();
System.out.println("workid: test1");
Thread.sleep(5000);
System.out.println("end time: " + simpleDateFormat.format(new Date()));
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.exit(0);
}
}
}
模拟机器2(work2)
public class Test1 {
public static void main(String[] args) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(5);
jedisPoolConfig.setMaxWaitMillis(2000);
jedisPoolConfig.setMaxTotal(10);
jedisPoolConfig.setMinIdle(1);
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestOnReturn(true);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, "你的redis ip", 端口, 连接超时时间, "redis密码");
RedisSupport.setJedisPool(jedisPool);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
RedisDistributedLock lock = new RedisDistributedLock("userId", 10000L);
try {
System.out.println("begin time: " + simpleDateFormat.format(new Date()));
lock.lock();
System.out.println("workid: test2");
Thread.sleep(5000);
System.out.println("end time: " + simpleDateFormat.format(new Date()));
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.exit(0);
}
}
}
work1先运行,work2后运行,那么work2抢到锁的时间应该是work1开始执行的第5秒后,运行结果如下
begin time: 2019-02-27 10:00:46
workid: test1
end time: 2019-02-27 10:00:52
begin time: 2019-02-27 10:00:49
workid: test2
end time: 2019-02-27 10:00:57
主从过程中,如果从库没有同步主库最新的数据,而主库先挂掉,从库上去时有可能锁会不存在(其实有可能已经存在,只不过没同步到从库主库就挂了)