当多个线程请求数据库查询库存余量时,显示有余量,但是当进行扣减库存时,库存已经用完了,但那个线程并不知道,依旧去扣减库存,造成库存为负数的情况,于是乎就出现了超发现象。
发生超卖现象的根本原因是共享数据被多个线程所修改,无法保证其执行顺序,如果一个数据库事务读取到一个产品后,就将数据直接锁定,不允许其他线程进行读写操作,直至当前数据库事务完成才释放这条数据的锁,就不会发生超发现象,但是执行效率性能将大大下降。
@Mapper public interface ProductMapper { @Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id} for update") ProductPo getProduct(Long id); @Update("UPDATE t_product SET stock = stock - #{quantity} WHERE id = #{id}") int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity); }在select语句末尾添加了for update,这样,在数据库事务执行的过程中,就会锁定查询出来的数据,其他事务将不能再对其进行读写,单个请求直至数据库事务完成,才会释放这个锁,下图可见其stock为0,没有发生超发现象,但执行效率下降了,通过购买记录可以得知,相比之前没加锁慢了1/5。
为了解决悲观锁带来的性能下降的问题,我们来讨论一下乐观锁的原理:
乐观锁是一种不使用数据库锁和不阻塞线程并发的方案,下图是以本Demo为例的乐观锁流程: 这种方案就是多线程的概念CAS(Compare and Swap),然而这样的方案会引发一种ABA问题:
T1时刻:线程1读取商品库存为A
T2时刻:线程2读取商品库存为A
T3时刻:线程2计算购买商品总价格
T4时刻:当前库存为A,与线程2保存的旧值一致,因此线程2可减库存(当前库存A—>B),此时线程1在当前库存为B的情况下计算剩余商品价格(单价*B)。
T5时刻:线程2取消购买,线程2回退(当前库存B—>A),线程1计算的剩余商品价格错误。
T6时刻:线程1比较旧值与当前数据库库存,发现都为A,返回之前计算好的(单价*B)结果,造成了错误。 从上面的分析中看到一个现象A—>B—>A的过程,就是所谓的ABA问题,解决此问题的方法为加入版本号的限制,只要在操作过程中修改共享值,无论业务正常,回退,还是异常,版本号只能递增,不能回退递减。每次通过比较数据的版本号来查看此数据是否被修改过。
@Mapper public interface ProductMapper { @Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id}") ProductPo getProduct(Long id); //********************change****************************** @Update("UPDATE t_product SET stock = stock - #{quantity}, version = version + 1 WHERE id = #{id} and version = #{version}") int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity, @Param("version") int version); } @Override // 启动Spring数据库事务机制 @Transactional public boolean purchase(Long userId, Long productId, int quantity) { // 获取产品 ProductPo product = productMapper.getProduct(productId); // 比较库存和购买数量 if (product.getStock() < quantity) { // 库存不足 return false; } //**************************change******************************* // 扣减库存,加入了version productMapper.decreaseProduct(productId, quantity, product.getVersion()); //*************************************************************** // 初始化购买记录 PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity); // 插入购买记录 purchaseRecordMapper.insertPurchaseRecord(pr); return true; }发现stock并没有降为0,原因是加入了版本号的判断,所以大量的请求得到了失败的结果,而且失败率有点高。要解决这个方法,就设定为如果失败,就重试,直至成功,但是这样又会造成大量SQL执行,影响性能,所以一般可以使用限制时间或者重入次数的方法来克服。
时间戳限制重入的乐观锁:
将一个请求限制在100ms的生存期,如果在100ms内发生版本号冲突而导致不能更新的,则会重新尝试请求,否则请求失败。
修改service下PurchaseserviceImpl的purchase方法
@Override // 启动Spring数据库事务机制 @Transactional public boolean purchase(Long userId, Long productId, int quantity) { long start = System.currentTimeMillis(); while (true){ long end = System.currentTimeMillis(); if (end - start >100){ return false; } // 获取产品 ProductPo product = productMapper.getProduct(productId); // 比较库存和购买数量 if (product.getStock() < quantity) { // 库存不足 return false; } // 扣减库存 int result = productMapper.decreaseProduct(productId, quantity, product.getVersion()); // 如果数据更新失败,说明数据在多线程中被其他线程修改 // 导致失败,着通过循环重入尝试购买商品 if (result == 0){ continue; } // 初始化购买记录 PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity); // 插入购买记录 purchaseRecordMapper.insertPurchaseRecord(pr); return true; } }这种方法在测试中效果并不是很好,执行速度很慢,冲突现象并没有减少,反而增多,可能是我测试方法并不好,只开了三个网页来模拟并发,不太懂JS,Demo用的JS是发送异步请求的,但用单窗口测试了好多次都没出现超发现象,只能人肉模拟并发。 限定次数重入的乐观锁:
@Override // 启动Spring数据库事务机制 @Transactional public boolean purchase(Long userId, Long productId, int quantity) { long start = System.currentTimeMillis(); for (int i=0; i<3; i++){ // 获取产品 ProductPo product = productMapper.getProduct(productId); // 比较库存和购买数量 if (product.getStock() < quantity) { // 库存不足 return false; } // 扣减库存 int result = productMapper.decreaseProduct(productId, quantity, product.getVersion()); // 如果数据更新失败,说明数据在多线程中被其他线程修改 // 导致失败,着通过循环重入尝试购买商品 if (result == 0){ continue; } // 初始化购买记录 PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity); // 插入购买记录 purchaseRecordMapper.insertPurchaseRecord(pr); return true; } return false; }这种方式比上一种限定时间好,速度和单纯使用乐观锁差不多,并且消除了冲突。
在高并发环境中,直接操作数据库的方式过于缓慢,因为数据库是一个写入磁盘的过程,这个速度没有写入内存的Redis快,Redis的机制也能够帮助我们克服超发现象,但是,因为其命令方式运算能力比较薄弱,所以往往采用Redis Lua去代替它原有的命令方式。Redis Lua在Redis的执行中是局内原子性的,但他被执行时不会被其他客户端发送过来的命令打断,通过这样一种机制,可以在需要高并发的环境下考虑使用Redis去代替数据库作为响应用户的数据载体。但是Redis存储具有不稳定性,所以还需要有一定的机制将Redis存储的数据刷入数据库。
下面先来配置一下Redis:
application.properties
#配置redis spring.redis.jedis.pool.min-idle=5 spring.redis.jedis.pool.max-active=10 spring.redis.jedis.pool.max-idle=10 spring.redis.jedis.pool.max-wait=2000 spring.redis.port=6379 spring.redis.host=127.0.0.1 #我的Redis没有密码 #spring.redis.password=123456 spring.redis.timeout=1000 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.1.4.RELEASE</version> <exclusions> <!--不依赖Redis的异步客户端lettuce --> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <!--引入Redis的客户端驱动jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>PurchaseServiceImpl.java,使用Redis Lua响应请求
@Service public class PurchaseServiceImpl implements PurchaseService { @Autowired private ProductMapper productMapper = null; @Autowired private PurchaseRecordMapper purchaseRecordMapper = null; private PurchaseRecordPo initPurchaseRecord(Long userId, ProductPo product, int quantity) { PurchaseRecordPo pr = new PurchaseRecordPo(); pr.setNote("购买日志,时间:" + System.currentTimeMillis()); pr.setPrice(product.getPrice()); pr.setProductId(product.getId()); pr.setQuantity(quantity); double sum = product.getPrice() * quantity; pr.setSum(sum); pr.setUserId(userId); return pr; } @Autowired StringRedisTemplate stringRedisTemplate = null; String purchaseScript = // 先将产品编号保存到集合中 " redis.call('sadd', KEYS[1], ARGV[2]) \n" // 购买列表 + "local productPurchaseList = KEYS[2]..ARGV[2] \n" // 用户编号 + "local userId = ARGV[1] \n" // 产品key + "local product = 'product_'..ARGV[2] \n" // 购买数量 + "local quantity = tonumber(ARGV[3]) \n" // 当前库存 + "local stock = tonumber(redis.call('hget', product, 'stock')) \n" // 价格 + "local price = tonumber(redis.call('hget', product, 'price')) \n" // 购买时间 + "local purchase_date = ARGV[4] \n" // 库存不足,返回0 + "if stock < quantity then return 0 end \n" // 减库存 + "stock = stock - quantity \n" + "redis.call('hset', product, 'stock', tostring(stock)) \n" // 计算价格 + "local sum = price * quantity \n" // 合并购买记录数据 + "local purchaseRecord = userId..','..quantity..','" + "..sum..','..price..','..purchase_date \n" // 保存到将购买记录保存到list里 + "redis.call('rpush', productPurchaseList, purchaseRecord) \n" // 返回成功 + "return 1 \n"; // Redis购买记录集合前缀 private static final String PURCHASE_PRODUCT_LIST = "purchase_list_"; // 抢购商品集合 private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set"; // 32位SHA1编码,第一次执行的时候先让Redis进行缓存脚本返回 private String sha1 = null; @Override public boolean purchaseRedis(Long userId, Long productId, int quantity) { // 购买时间 Long purchaseDate = System.currentTimeMillis(); Jedis jedis = null; try { // 获取原始连接 jedis = (Jedis) stringRedisTemplate .getConnectionFactory().getConnection().getNativeConnection(); // 如果没有加载过,则先将脚本加载到Redis服务器,让其返回sha1 if (sha1 == null) { sha1 = jedis.scriptLoad(purchaseScript); } // 执行脚本,返回结果 Object res = jedis.evalsha(sha1, 2, PRODUCT_SCHEDULE_SET, PURCHASE_PRODUCT_LIST, userId + "", productId + "", quantity + "", purchaseDate + ""); Long result = (Long) res; return result == 1; } finally { // 关闭jedis连接 if (jedis != null && jedis.isConnected()) { jedis.close(); } } } @Override // 当运行方法启用新的独立事务运行 @Transactional(propagation = Propagation.REQUIRES_NEW) // 保存购买记录,持久化到数据库 public boolean dealRedisPurchase(List<PurchaseRecordPo> prpList) { for (PurchaseRecordPo prp : prpList) { purchaseRecordMapper.insertPurchaseRecord(prp); productMapper.decreaseProduct(prp.getProductId(), prp.getQuantity()); } return true; } }使用定时机制,定时将数据持久化到数据库:
首先设置启动文件:
@SpringBootApplication(scanBasePackages = "com.wayne.springboot") @MapperScan(annotationClass = Mapper.class, basePackages = "com.wayne.springboot") // 启动springboot的定时机制,为此需要一个定时的方法来提供服务 // 把Redis的数据导入到数据库 @EnableScheduling public class SpringBootShoppingApplication{ public static void main(String[] args) { SpringApplication.run(SpringBootShoppingApplication.class, args); } }一个定时的方法来提供服务把Redis的数据导入到数据库:
import com.wayne.springboot.pojo.PurchaseRecordPo; import com.wayne.springboot.service.PurchaseService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundListOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import java.util.Set; @Service public class TaskServiceImpl implements TaskService { @Autowired private StringRedisTemplate stringRedisTemplate = null; @Autowired private PurchaseService purchaseService = null; private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set"; private static final String PURCHASE_PRODUCT_LIST = "purchase_list_"; // 每次取出1000条,避免一次取出消耗太多内存 private static final int ONE_TIME_SIZE = 1000; @Override // 每天半夜1点钟开始执行任务 // @Scheduled(cron = "0 0 1 * * ?") // 下面是用于测试的配置,每分钟执行一次任务 @Scheduled(fixedRate = 1000 * 30) public void purchaseTask() { System.out.println("定时任务开始......"); Set<String> productIdList = stringRedisTemplate.opsForSet().members(PRODUCT_SCHEDULE_SET); List<PurchaseRecordPo> prpList =new ArrayList<>(); for (String productIdStr : productIdList) { Long productId = Long.parseLong(productIdStr); String purchaseKey = PURCHASE_PRODUCT_LIST + productId; BoundListOperations<String, String> ops = stringRedisTemplate.boundListOps(purchaseKey); // 计算记录数 long size = stringRedisTemplate.opsForList().size(purchaseKey); Long times = size % ONE_TIME_SIZE == 0 ? size / ONE_TIME_SIZE : size / ONE_TIME_SIZE + 1; for (int i = 0; i < times; i++) { // 获取至多TIME_SIZE个抢红包信息 List<String> prList = null; if (i == 0) { prList = ops.range(i * ONE_TIME_SIZE, (i + 1) * ONE_TIME_SIZE); } else { prList = ops.range(i * ONE_TIME_SIZE + 1, (i + 1) * ONE_TIME_SIZE); } for (String prStr : prList) { PurchaseRecordPo prp = this.createPurchaseRecord(productId, prStr); prpList.add(prp); } try { // 采用该方法采用新建事务的方式,这样不会导致全局事务回滚 purchaseService.dealRedisPurchase(prpList); } catch(Exception ex) { ex.printStackTrace(); } // 清除列表为空,等待重新写入数据 prpList.clear(); } // 删除购买列表 stringRedisTemplate.delete(purchaseKey); // 从商品集合中删除商品 stringRedisTemplate.opsForSet() .remove(PRODUCT_SCHEDULE_SET, productIdStr); } System.out.println("定时任务结束......"); } private PurchaseRecordPo createPurchaseRecord( Long productId, String prStr) { String[] arr = prStr.split(","); Long userId = Long.parseLong(arr[0]); int quantity = Integer.parseInt(arr[1]); double sum = Double.valueOf(arr[2]); double price = Double.valueOf(arr[3]); Long time = Long.parseLong(arr[4]); Timestamp purchaseTime = new Timestamp(time); PurchaseRecordPo pr = new PurchaseRecordPo(); pr.setProductId(productId); pr.setPurchaseTime(purchaseTime); pr.setPrice(price); pr.setQuantity(quantity); pr.setSum(sum); pr.setUserId(userId); pr.setNote("购买日志,时间:" + purchaseTime.getTime()); return pr; } }到这里基本完成,启动项目前先启动redis服务器,并初始化Redis:
hmset product_1 id 1 stock 10000 price 2.00
然后启动并访问浏览器localhost:8080/test,因为设定的间隔为30s,所以等30s去查看数据库。性能相比之前要快上数倍。
https://github.com/waynedream/SpringBootNotes