用大白话的方式,带你搞懂Redis分布式锁!
小李接到一个小需求,开发一个秒杀功能,功能很简单,我一共有五个库存,某一时间开启秒杀,购买一次,库存减一,直至库存为零。小李一顿需求分析,觉得很简单,然后就开始一顿CV,完事,功能上线,然后就发现了问题。
public class Demo {
//库存
private static Long a=5L;
//购买
public static void buy(){
if (a > 0) {
a-=1;
System.out.println(Thread.currentThread().getName()+"秒杀成功,商品剩余:"+a);
}else {
System.out.println(Thread.currentThread().getName()+"秒杀失败,商品库存不足");
}
}
//开启秒杀
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(Demo::buy).start();
}
}
}
秒杀当晚,就出现了商品超卖的情况,一共五件商品,被卖出6次,老板气的当场送进了ICU。第二天小李因为右脚先迈进办公室被开除了。
然后这个需求就交到了老王手里,老王是个经验丰富的程序员,他一眼就看出了问题,原来,在多线程并发的情况下,对同一数据操作,可能会出现数据错乱的问题,只需要对购买操作做原子限制,同一时间只能有一个线程做购买操作,于是老王使用了锁:
public class Demo {
private static Long a = 5L;
public static void buy() {
synchronized (a) {
if (a > 0) {
a -= 1;
System.out.println(Thread.currentThread().getName() + "秒杀成功,商品剩余:" + a);
} else {
System.out.println(Thread.currentThread().getName() + "秒杀失败,商品库存不足");
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(Demo::buy).start();
}
}
}
开发完老王还做了压力测试,非常的ok,然后功能就上线了。
老板听说老王把功能修复好了,立马从ICU里抢救了回来,并升职老王做了项目经理。
后来在老王的带领下,项目越做越好,用户量越来越大,慢慢的老王发现服务器的性能已经达到了瓶颈,单个服务已经无法满足现阶段的需求了。经验丰富的老王立马安排人使用nginx对项目进行分布式集群部署,将项目部署到多个服务器中,使用nginx进行负载均衡。
然后进行压测,发现吞吐量确实上来了,单个服务器压力也没有那么大了,但是依然出现了超卖的现象。老王一分析发现:锁是jvm级别的,单个锁只能在一个服务器上生效,在多个服务器的情况下,使用这种锁已经不能满足需求了。
这时新来的代替小李位置的小王对老王说:王哥,我知道有一种分布式锁,可以解决咱们项目目前的问题。老王一听觉得小王挺聪明啊,行,这块交给你来办吧,办好了今年年底给你提加薪。
小王是个爱学习的孩子,他研究发现可以使用redis或者zookeeper在实现分布式锁,因为redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系,并且redis在项目中已经有了广泛使用,于是小王决定使用redis来实现分布式锁。
@Component
public class RedisBuy {
@Autowired
private StringRedisTemplate redisTemplate;
public void buy(){
//如果有值返回false,没值返回ture
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("wangpeng", "wangpeng");
if (ifAbsent) {
System.out.println("库存减一");
//减完库存删除锁
redisTemplate.delete("wangpeng");
}else {
System.out.println("未获取到锁...");
}
}
}
ok,小王写完后开始压测,就在觉得一切顺利的时候,突然测试环境有个服务器挂掉了,然后就出现了当前功能一直提示未获取到锁。然后小王发现原来是挂掉的那台机器上在设置完锁还没有来得及释放就挂掉了,导致锁一直没有释放,从而导致的问题。
于是小王在原来基础上设置了过期时间以防止死锁。
@Component
public class RedisBuy {
@Autowired
private StringRedisTemplate redisTemplate;
public void buy() throws InterruptedException {
if (lock("wangpeng")) {
System.out.println("库存减一");
redisTemplate.delete("wangpeng");
}
}
public boolean lock(String lockKey) throws InterruptedException {
//60s
int expireMsecs = 30 * 1000;
//wait time
int timeout = 10 * 1000;
//重试间隔
int sleepMsecs = 100;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires); // 锁到期时间
if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
return true;
}
// redis里key的时间
String currentValue = redisTemplate.opsForValue().get(lockKey);
// 判断锁是否已经过期,过期则重新设置并获取
if (currentValue != null && Long.parseLong(currentValue) < System.currentTimeMillis()) {
// 设置锁并返回旧值
String oldValue = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
// 比较锁的时间,如果不一致则可能是其他锁已经修改了值并获取
if (oldValue != null && oldValue.equals(currentValue)) {
return true;
}
}
timeout -= sleepMsecs;
// 延时
Thread.sleep(sleepMsecs);
}
return false;
}
}
设置完后就不会出现服务宕机导致的redis锁无法释放的问题了,但是紧接着小王发现了另一个问题,就是如果设置的过期时间过短就会导致当前线程还没有处理完,redis的key就过期了,从而导致问题依然发生。另外还有一个问题就是即使线程一的rediskey过期了,等线程二设置上key后,线程一等业务处理完依然会删除redis的key,而此时可能删除的就是线程二的redis的key了,从而导致线程二也会出现问题。
针对第二个问题,于是小王在value中加入了线程id,删除的时候判断是不是当前线程拥有的key,是的话再删除。
@Component
public class RedisBuy {
@Autowired
private StringRedisTemplate redisTemplate;
public void buy() throws InterruptedException {
if (lock("wangpeng")) {
System.out.println("库存减一");
unlLock("wangpeng");
}
}
public void unlLock(String lockKey){
String value = redisTemplate.opsForValue().get(lockKey);
if (StringUtils.isEmpty(value)) {
String id = value.split(":")[1];
if ((Thread.currentThread().getId() + "").equals(id)) {
redisTemplate.delete("wangpeng");
}
}
}
public boolean lock(String lockKey) throws InterruptedException {
long id = Thread.currentThread().getId();
//60s
int expireMsecs = 60 * 1000;
//wait time
int timeout = 10 * 1000;
/**
* 失败重试间隔时间
*/
int sleepMsecs = 100;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = expires+":"+id; // 锁到期时间
if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
return true;
}
// redis里key的时间
String currentValue = redisTemplate.opsForValue().get(lockKey);
// 判断锁是否已经过期,过期则重新设置并获取
if (currentValue != null && Long.parseLong(currentValue) < System.currentTimeMillis()) {
// 设置锁并返回旧值
String oldValue = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
// 比较锁的时间,如果不一致则可能是其他锁已经修改了值并获取
if (oldValue != null && oldValue.equals(currentValue)) {
return true;
}
}
timeout -= sleepMsecs;
// 延时
Thread.sleep(sleepMsecs);
}
return false;
}
}
但即使这样,也不能保证业务在redis的key失效之前处理完成,只能适当延长失效时间,但是适当是多少呢,这是个问题,那怎么办呢?
小王不愧是写代码的一把好手,对于第一个问题,除了过期时间,他还在业务代码中加入了一个子线程,在子线程中每10秒确认一下主线程是否在线,如果在线,对锁进行续命,从而保证key的过期时间在业务处理时不会失效。
但是这么多过程,稍不留意就会出现问题,从而导致小王因为右脚迈进公司大门而被开除。聪明的小王开始寻找是否有现成的工具组件来完成这一功能呢,于是他发现了redisson。
然后小王开始学习redisson,重构了这一功能,保证了项目平稳运行。
秒杀项目已经上线广受好评,老板高兴的直接从ICU里出来去操场跑了五公里,年底小王薪资直接涨到了50k。