当前位置:首页 > 技术文章 > 正文内容

用大白话的方式,带你搞懂Redis分布式锁!

arlanguage3个月前 (01-31)技术文章33

小李接到一个小需求,开发一个秒杀功能,功能很简单,我一共有五个库存,某一时间开启秒杀,购买一次,库存减一,直至库存为零。小李一顿需求分析,觉得很简单,然后就开始一顿CV,完事,功能上线,然后就发现了问题。

public class Demo {
  //库存
    private static Long a=5L;
	//购买
    public static void buy(){
        if (a > 0) {
            a-=1;
            System.out.println(Thread.currentThread().getName()+"秒杀成功,商品剩余:"+a);
        }else {
            System.out.println(Thread.currentThread().getName()+"秒杀失败,商品库存不足");
        }
    }
		//开启秒杀
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(Demo::buy).start();
        }
    }
}

秒杀当晚,就出现了商品超卖的情况,一共五件商品,被卖出6次,老板气的当场送进了ICU。第二天小李因为右脚先迈进办公室被开除了。

然后这个需求就交到了老王手里,老王是个经验丰富的程序员,他一眼就看出了问题,原来,在多线程并发的情况下,对同一数据操作,可能会出现数据错乱的问题,只需要对购买操作做原子限制,同一时间只能有一个线程做购买操作,于是老王使用了锁:

public class Demo {
    private static Long a = 5L;
    public static void buy() {
        synchronized (a) {
            if (a > 0) {
                a -= 1;
                System.out.println(Thread.currentThread().getName() + "秒杀成功,商品剩余:" + a);
            } else {
                System.out.println(Thread.currentThread().getName() + "秒杀失败,商品库存不足");
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(Demo::buy).start();
        }

    }
}

开发完老王还做了压力测试,非常的ok,然后功能就上线了。

老板听说老王把功能修复好了,立马从ICU里抢救了回来,并升职老王做了项目经理。

后来在老王的带领下,项目越做越好,用户量越来越大,慢慢的老王发现服务器的性能已经达到了瓶颈,单个服务已经无法满足现阶段的需求了。经验丰富的老王立马安排人使用nginx对项目进行分布式集群部署,将项目部署到多个服务器中,使用nginx进行负载均衡。

然后进行压测,发现吞吐量确实上来了,单个服务器压力也没有那么大了,但是依然出现了超卖的现象。老王一分析发现:锁是jvm级别的,单个锁只能在一个服务器上生效,在多个服务器的情况下,使用这种锁已经不能满足需求了。

这时新来的代替小李位置的小王对老王说:王哥,我知道有一种分布式锁,可以解决咱们项目目前的问题。老王一听觉得小王挺聪明啊,行,这块交给你来办吧,办好了今年年底给你提加薪。

小王是个爱学习的孩子,他研究发现可以使用redis或者zookeeper在实现分布式锁,因为redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系,并且redis在项目中已经有了广泛使用,于是小王决定使用redis来实现分布式锁。

@Component
public class RedisBuy {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void buy(){
      	//如果有值返回false,没值返回ture
        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("wangpeng", "wangpeng");
        if (ifAbsent) {
            System.out.println("库存减一");
          	//减完库存删除锁
          	redisTemplate.delete("wangpeng");
        }else {
            System.out.println("未获取到锁...");
        }
    }

}

ok,小王写完后开始压测,就在觉得一切顺利的时候,突然测试环境有个服务器挂掉了,然后就出现了当前功能一直提示未获取到锁。然后小王发现原来是挂掉的那台机器上在设置完锁还没有来得及释放就挂掉了,导致锁一直没有释放,从而导致的问题。

于是小王在原来基础上设置了过期时间以防止死锁。

@Component
public class RedisBuy {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public  void buy() throws InterruptedException {
        if (lock("wangpeng")) {
            System.out.println("库存减一");
            redisTemplate.delete("wangpeng");
        }
    }
    public boolean lock(String lockKey) throws InterruptedException {
        //60s
        int expireMsecs = 30 * 1000;
        //wait time
        int timeout = 10 * 1000;
        //重试间隔
        int sleepMsecs = 100;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); // 锁到期时间
            if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
                return true;
            }
            // redis里key的时间
            String currentValue = redisTemplate.opsForValue().get(lockKey);
            // 判断锁是否已经过期,过期则重新设置并获取
            if (currentValue != null && Long.parseLong(currentValue) < System.currentTimeMillis()) {
                // 设置锁并返回旧值
                String oldValue = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                // 比较锁的时间,如果不一致则可能是其他锁已经修改了值并获取
                if (oldValue != null && oldValue.equals(currentValue)) {
                    return true;
                }
            }
            timeout -= sleepMsecs;
            // 延时
            Thread.sleep(sleepMsecs);
        }
        return false;
    }

}

设置完后就不会出现服务宕机导致的redis锁无法释放的问题了,但是紧接着小王发现了另一个问题,就是如果设置的过期时间过短就会导致当前线程还没有处理完,redis的key就过期了,从而导致问题依然发生。另外还有一个问题就是即使线程一的rediskey过期了,等线程二设置上key后,线程一等业务处理完依然会删除redis的key,而此时可能删除的就是线程二的redis的key了,从而导致线程二也会出现问题。

针对第二个问题,于是小王在value中加入了线程id,删除的时候判断是不是当前线程拥有的key,是的话再删除。

@Component
public class RedisBuy {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public  void buy() throws InterruptedException {
        if (lock("wangpeng")) {
            System.out.println("库存减一");
            unlLock("wangpeng");
        }
    }
    public void unlLock(String lockKey){
        String value = redisTemplate.opsForValue().get(lockKey);
        if (StringUtils.isEmpty(value)) {
            String id = value.split(":")[1];
            if ((Thread.currentThread().getId() + "").equals(id)) {
                redisTemplate.delete("wangpeng");
            }
        }
    }

        public boolean lock(String lockKey) throws InterruptedException {
        long id = Thread.currentThread().getId();
        //60s
        int expireMsecs = 60 * 1000;
        //wait time
        int timeout = 10 * 1000;
        /**
         * 	失败重试间隔时间
         */
        int sleepMsecs = 100;

        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = expires+":"+id; // 锁到期时间
            if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
                return true;
            }
            // redis里key的时间
            String currentValue = redisTemplate.opsForValue().get(lockKey);
            // 判断锁是否已经过期,过期则重新设置并获取
            if (currentValue != null && Long.parseLong(currentValue) < System.currentTimeMillis()) {
                // 设置锁并返回旧值
                String oldValue = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                // 比较锁的时间,如果不一致则可能是其他锁已经修改了值并获取
                if (oldValue != null && oldValue.equals(currentValue)) {
                    return true;
                }
            }
            timeout -= sleepMsecs;
            // 延时
            Thread.sleep(sleepMsecs);
        }
        return false;
    }

}

但即使这样,也不能保证业务在redis的key失效之前处理完成,只能适当延长失效时间,但是适当是多少呢,这是个问题,那怎么办呢?

小王不愧是写代码的一把好手,对于第一个问题,除了过期时间,他还在业务代码中加入了一个子线程,在子线程中每10秒确认一下主线程是否在线,如果在线,对锁进行续命,从而保证key的过期时间在业务处理时不会失效。

但是这么多过程,稍不留意就会出现问题,从而导致小王因为右脚迈进公司大门而被开除。聪明的小王开始寻找是否有现成的工具组件来完成这一功能呢,于是他发现了redisson。

然后小王开始学习redisson,重构了这一功能,保证了项目平稳运行。

秒杀项目已经上线广受好评,老板高兴的直接从ICU里出来去操场跑了五公里,年底小王薪资直接涨到了50k。

扫描二维码推送至手机访问。

版权声明:本文由AR编程网发布,如需转载请注明出处。

本文链接:http://www.arlanguage.com/post/1765.html

标签: zookeeper nginx
分享给朋友:

“用大白话的方式,带你搞懂Redis分布式锁!” 的相关文章

使用nginx部署前端html等静态页面

一、前言最近想要部署一个纯前端的静态页面,项目的内容很简单,也就是一些简单的html、css、js、jpg、mp3等静态资源,不涉及后端开发。之前一直都是使用Tomcat来部署项目的,因为涉及后端接口等方面的内容,这次再用它来部署纯前端的东西,显得大材小用,过于笨重。此时,使用nginx,便是最合适...

Nginx 全面攻略:动静分离、压缩、缓存、黑白名单、跨域、高可用

Nginx 作为一款高性能的 HTTP 服务器和反向代理服务器,被广泛应用于各类互联网项目中。它不仅能够提供静态文件服务,还支持动静分离、压缩、缓存、黑白名单、跨域、高可用等多种高级功能。本文将带你全面了解和掌握 Nginx 的这些强大功能。一、动静分离动静分离是指将动态请求与静态资源请求分离开来,...

nginx监控与调优(三)

nginx监控通常有两种方法:一是status监控;二是ngxtop监控。一、status监控使用status监控的步骤:1.确定nginx中status模块是否已安装[root@localhost sbin]# nginx -V nginx version: nginx/1.13.7 built...

面试常问知识点:Nginx设置代理的一个注意点

前几天,重启了下Nginx代理服务,发现报错了,以下是本次的思考。1:先解决问题查看Nginx错误日志:40 SSL_do_handshake() failed (SSL: error:14094410:SSL routines:ssl3_read_bytes:sslv3 alert handsha...

Nginx系列:图片过滤处理

http_image_filter_module是Nginx提供的集成图片处理模块,支持nginx-0.7.54以后的版本,在网站访问量不是很高;磁盘有限不想生成多余的图片文件的前提下,就可以用它实时缩放图片,旋转图片,验证图片有效性以及获取图片宽高以及图片类型信息,由于是实时计算的结果,所以网站...

Nginx负载均衡安全配置说明2 nginx负载均衡配置文件

上一节,我们对Nginx安全配置的几个知识点做了一个说明,例如限制IP访问、文件目录禁止访问限制、需要防止DOS攻击、请求方法的限制和限制文件上传的大小这个进行了一个分析说明,详细的文章请关注我的头条号:一点热,在里面看回之前的文章,快速入口Nginx负载均衡的一些安全配置说明其实,配置Nginx的...