使用redis实现分布式锁

mac2025-07-20  5

简介: 当高并发访问某个接口的时候,如果这个接口访问的数据库中的资源,并且你的数据库事务级别是可重复读(Repeatable read)的话,确实是没有线程问题的,因为数据库锁的级别就够了;但是如果这个接口需要访问一个静态变量、静态代码块、全局缓存的中的资源或者redis中的资源的时候,就会出现线程安全的问题。

案例: github地址: https://github.com/mzd123/mywy/tree/master/src/main/java/com/mzd/mywy/service

@RestController public class MsController {     @Autowired     private MsService msService;     @RequestMapping("/select_info.do")     public String select_info(String product_id) {         return msService.select_info(product_id);     }     @RequestMapping("/order.do")     public String order(String product_id) throws CongestionException {         return msService.order1(product_id);     } } @Service public class MsService {

    @Autowired     private RedisLock redisLock;     //商品详情     private static HashMap<String, Integer> product = new HashMap();     //订单表     private static HashMap<String, String> orders = new HashMap();     //库存表     private static HashMap<String, Integer> stock = new HashMap();

    static {         product.put("123", 10000);         stock.put("123", 10000);     }

    public String select_info(String product_id) {         return "限量抢购商品XXX共" + product.get(product_id) + ",现在成功下单" + orders.size()                 + ",剩余库存" + stock.get(product_id) + "件";     }

    /**      * 下单      *      * @param product_id      * @return      */     public String order1(String product_id) {         if (stock.get(product_id) == 0) {             return "活动已经结束了";             //已近买完了         } else {             //还没有卖完             try {                 //模拟操作数据库                 Thread.sleep(100);             } catch (InterruptedException e) {                 e.printStackTrace();             }             orders.put(MyStringUtils.getuuid(), product_id);             stock.put(product_id, stock.get(product_id) - 1);         }         return select_info(product_id);     } } 如上图所述,我现在需要限购10000个商品id为123的商品,如果什么操作也不做,直接访问静态资源你们觉得会有问题吗?我们使用apache_ab来模拟一下高并发情况,下面是发起100个请,并发量是50的情况:

问题: 可以看到,下单数和库存加起来明显超过了商品总数,这是一种超卖现象,在java角度来说就是线程不安全现象。

解决1: 学过javase的小伙伴应该都能想到使用synchronized关键字,强行同步。

 /**      * 下单      *      * @param product_id      * @return      */     public synchronized String order2(String product_id) {         if (stock.get(product_id) == 0) {             return "活动已经结束了";             //已近买完了         } else {             //还没有卖完             try {                 //模拟操作数据库                 Thread.sleep(100);             } catch (InterruptedException e) {                 e.printStackTrace();             }             orders.put(MyStringUtils.getuuid(), product_id);             stock.put(product_id, stock.get(product_id) - 1);         }         return select_info(product_id);     }

缺点: 1、我们可以明显的看到速度变慢了,从原来的0.535秒变到了10.956秒,那是因为synchronized放这个方法只允许单线程访问了。 2、synchronized是粗粒度的控制了线程安全,即:如果我这个商品id不一样的线程,理论上是可以同时访问这个方法的,但是加上了synchronized之后,无论商品id是否一样,两个线程都是没法同时访问这个方法的。

解决2: 使用redis分布式锁(主要使用了redis中的setnx和getset方法,这两个方法在redisTemplate分别是setIfAbsent和getAndSet方法)实现线程安全,因为redis是单线程,能保证线程的安全性,而且redis强大的读写能力能提高效率。

   /**      * 高并发没问题,效率还行      *      * @param product_id      * @return      */     public String order3(String product_id) throws CongestionException {         /**          * redis加锁          */         String value = System.currentTimeMillis() + 10000 + "";         if (!redisLock.lock1(product_id, value)) {            //系统繁忙,请稍后再试             throw new CongestionException();         }         //##############################业务逻辑#################################//         if (stock.get(product_id) == 0) {             return "活动已经结束了";             //已近买完了         } else {             //还没有卖完             try {                 //模拟操作数据库                 Thread.sleep(100);             } catch (InterruptedException e) {                 e.printStackTrace();             }             orders.put(MyStringUtils.getuuid(), product_id);             stock.put(product_id, stock.get(product_id) - 1);         }        //##############################业务逻辑#################################//         /**          * redis解锁          */         redisLock.unlock(product_id, value);         return select_info(product_id);     } /**  * 用redis实现分布式锁  */ @Component public class RedisLock {     @Autowired     private StringRedisTemplate redisTemplate;    //加锁     public boolean lock1(String key, String value) {         //setIfAbsent相当于jedis中的setnx,如果能赋值就返回true,如果已经有值了,就返回false         //即:在判断这个key是不是第一次进入这个方法         if (redisTemplate.opsForValue().setIfAbsent(key, value)) {             //第一次,即:这个key还没有被赋值的时候             return true;         }         return false;     }     //解锁     public void unlock(String key, String value) {         try {             if (MyStringUtils.Object2String(redisTemplate.opsForValue().get(key)).equals(value)) {                 redisTemplate.opsForValue().getOperations().delete(key);             }         } catch (Exception e) {             e.printStackTrace();         }     } } 缺点: 这个方法看上去没什么问题,而且只有商品id相同的两个线程同时访问这个方法的时候才会出现线程问题,这似乎是很完美了。但是有没有想过,万一处理业务逻辑的代码块中出现了异常,直接抛了出去,那解锁的代码就再也不会被执行了,也就是出现了死锁现象。

改进1:

    public boolean lock2(String key, String value) {         //setIfAbsent相当于jedis中的setnx,如果能赋值就返回true,如果已经有值了,就返回false         //即:在判断这个key是不是第一次进入这个方法         if (redisTemplate.opsForValue().setIfAbsent(key, value)) {             //第一次,即:这个key还没有被赋值的时候             return true;         }         String current_value = redisTemplate.opsForValue().get(key);//①         if (!MyStringUtils.Object2String(current_value).equals("")                 //超时了                 && Long.parseLong(current_value) < System.currentTimeMillis()) {;//②             //返回true就能解决死锁             return true;         }         return false;     } 缺点: 使用超时时间来解决死锁问题,但是又出现新的问题,就是当有两个商品id相同的线程同时执行到了②这一行代码,这时候两个线程同时获取锁,这样一来任然存在线程安全问题了。。。

改进2:

 /**      * 加锁      */     public boolean lock3(String key, String value) {         //setIfAbsent相当于jedis中的setnx,如果能赋值就返回true,如果已经有值了,就返回false         //即:在判断这个key是不是第一次进入这个方法         if (redisTemplate.opsForValue().setIfAbsent(key, value)) {             //第一次,即:这个key还没有被赋值的时候             return true;         }         String current_value = redisTemplate.opsForValue().get(key);         if (!MyStringUtils.Object2String(current_value).equals("")                 //超时了                 && Long.parseLong(current_value) < System.currentTimeMillis()) {//①             String old_value = redisTemplate.opsForValue().getAndSet(key, value);//②             if (!MyStringUtils.Object2String(old_value).equals("")                     && old_value.equals(current_value)) {                 return true;             }         }         return false;     } 解释: 如果两个线程同时调用这个方法,当同时走到①的时候,无论怎么样都有一个线程会先执行②这一行,假设线程1先执行②这行代码,那redis中key对应的value就变成了value,然后线程2再执行②这行代码的时候,获取到的old_value就是value,那么value显然和他上面获取的current_value是不一样的,则线程2是没法获取锁的。

说明: 虽然100个请求只有2个成功下单的,但是耗时却明显变小了,而且线程也是安全的,只是绝大部分因为没有拿到锁而没有抢到限购的商品,但也做了人性化的提醒,个人觉得还是可以接受的! ———————————————— 版权声明:本文为博主「一只仰望天空的菜鸟」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/tuesdayma/article/details/82751790

最新回复(0)