共计 7414 个字符,预计需要花费 19 分钟才能阅读完成。
本文丸趣 TV 小编为大家详细介绍“Redis 实现分布式锁的五种方法是什么”,内容详细,步骤清晰,细节处理妥当,希望这篇“Redis 实现分布式锁的五种方法是什么”文章能帮助大家解决疑惑,下面跟着丸趣 TV 小编的思路慢慢深入,一起来学习新知识吧。
1. 单机数据一致性
单机数据一致性架构如下图所示:多个可客户访问同一个服务器,连接同一个数据库。
场景描述:客户端模拟购买商品过程,在 Redis 中设定库存总数剩 100 个,多个客户端同时并发购买。
@RestController
public class IndexController1 {
@Autowired
StringRedisTemplate template;
@RequestMapping(/buy1)
public String index(){
// Redis 中存有 goods:001 号商品,数量为 100
String result = template.opsForValue().get( goods:001
// 获取到剩余商品数
int total = result == null ? 0 : Integer.parseInt(result);
if( total 0 ){
// 剩余商品数大于 0 ,则进行扣减
int realTotal = total -1;
// 将商品数回写数据库
template.opsForValue().set( goods:001 ,String.valueOf(realTotal));
System.out.println( 购买商品成功,库存还剩:+realTotal + 件, 服务端口为 8001
return 购买商品成功,库存还剩:+realTotal + 件, 服务端口为 8001
}else{
System.out.println( 购买商品失败,服务端口为 8001
}
return 购买商品失败,服务端口为 8001
}
}
使用 Jmeter 模拟高并发场景,测试结果如下:
测试结果出现多个用户购买同一商品,发生了数据不一致问题!
解决办法:单体应用的情况下,对并发的操作进行加锁操作,保证对数据的操作具有原子性
synchronized
ReentrantLock
@RestController
public class IndexController2 {
// 使用 ReentrantLock 锁解决单体应用的并发问题
Lock lock = new ReentrantLock();
@Autowired
StringRedisTemplate template;
@RequestMapping(/buy2)
public String index() { lock.lock();
try { String result = template.opsForValue().get( goods:001
int total = result == null ? 0 : Integer.parseInt(result);
if (total 0) {
int realTotal = total - 1;
template.opsForValue().set( goods:001 , String.valueOf(realTotal));
System.out.println( 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
return 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
} else {
System.out.println( 购买商品失败,服务端口为 8001
}
} catch (Exception e) { lock.unlock();
} finally { lock.unlock();
}
return 购买商品失败,服务端口为 8001
}
2. 分布式数据一致性
上面解决了单体应用的数据一致性问题,但如果是分布式架构部署呢,架构如下:
提供两个服务,端口分别为 8001、8002,连接同一个 Redis 服务,在服务前面有一台 Nginx 作为负载均衡
两台服务代码相同,只是端口不同
将 8001、8002 两个服务启动,每个服务依然用 ReentrantLock 加锁,用 Jmeter 做并发测试,发现会出现数据一致性问题!
3. Redis 实现分布式锁 3.1 方式一
取消单机锁,下面使用 redis 的 set 命令来实现分布式加锁
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
EX seconds 设置指定的到期时间 (以秒为单位)
PX milliseconds 设置指定的到期时间 (以毫秒为单位)
NX 仅在键不存在时设置键
XX 只有在键已存在时才设置
@RestController
public class IndexController4 {
// Redis 分布式锁的 key
public static final String REDIS_LOCK = good_lock
@Autowired
StringRedisTemplate template;
@RequestMapping(/buy4)
public String index(){
// 每个人进来先要进行加锁,key 值为 good_lock,value 随机生成
String value = UUID.randomUUID().toString().replace( - ,
try{
// 加锁
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value);
// 加锁失败
if(!flag){
return 抢锁失败! }
System.out.println( value+ 抢锁成功
String result = template.opsForValue().get( goods:001
int total = result == null ? 0 : Integer.parseInt(result);
if (total 0) {
int realTotal = total - 1;
template.opsForValue().set( goods:001 , String.valueOf(realTotal));
// 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放, // 释放锁操作不能在此操作,要在 finally 处理
// template.delete(REDIS_LOCK);
System.out.println( 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
return 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
} else {
System.out.println( 购买商品失败,服务端口为 8001
}
return 购买商品失败,服务端口为 8001
}finally {
// 释放锁
template.delete(REDIS_LOCK);
}
}
}
上面的代码,可以解决分布式架构中数据一致性问题。但再仔细想想,还是会有问题,下面进行改进。
3.2 方式二(改进方式一)
在上面的代码中,如果程序在运行期间,部署了微服务 jar 包的机器突然挂了,代码层面根本就没有走到 finally 代码块,也就是说在宕机前,锁并没有被删除掉,这样的话,就没办法保证解锁
所以,这里需要对这个 key 加一个过期时间,Redis 中设置过期时间有两种方法:
template.expire(REDIS_LOCK,10, TimeUnit.SECONDS)
template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS)
第一种方法需要单独的一行代码,且并没有与加锁放在同一步操作,所以不具备原子性,也会出问题
第二种方法在加锁的同时就进行了设置过期时间,所有没有问题,这里采用这种方式
调整下代码,在加锁的同时,设置过期时间:
// 为 key 加一个过期时间,其余代码不变
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK,value,10L,TimeUnit.SECONDS);
这种方式解决了因服务突然宕机而无法释放锁的问题。但再仔细想想,还是会有问题,下面进行改进。
3.3 方式三(改进方式二)
方式二设置了 key 的过期时间,解决了 key 无法删除的问题,但问题又来了
上面设置了 key 的过期时间为 10 秒,如果业务逻辑比较复杂,需要调用其他微服务,处理时间需要 15 秒(模拟场
景,别较真),而当 10 秒钟过去之后,这个 key 就过期了,其他请求就又可以设置这个 key,此时如果耗时 15 秒
的请求处理完了,回来继续执行程序,就会把别人设置的 key 给删除了,这是个很严重的问题!
所以,谁上的锁,谁才能删除
@RestController
public class IndexController6 {
public static final String REDIS_LOCK = good_lock
@Autowired
StringRedisTemplate template;
@RequestMapping(/buy6)
public String index(){
// 每个人进来先要进行加锁,key 值为 good_lock
String value = UUID.randomUUID().toString().replace( - ,
try{
// 为 key 加一个过期时间
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return 抢锁失败! }
System.out.println( value+ 抢锁成功
String result = template.opsForValue().get( goods:001
int total = result == null ? 0 : Integer.parseInt(result);
if (total 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。 int realTotal = total - 1;
template.opsForValue().set( goods:001 , String.valueOf(realTotal));
System.out.println( 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
return 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
} else {
System.out.println( 购买商品失败,服务端口为 8001
}
return 购买商品失败,服务端口为 8001
}finally {
// 谁加的锁,谁才能删除!!!! if(template.opsForValue().get(REDIS_LOCK).equals(value)){ template.delete(REDIS_LOCK);
}
}
}
}
这种方式解决了因服务处理时间太长而释放了别人锁的问题。这样就没问题了吗?
3.4 方式四(改进方式三)
在上面方式三下,规定了谁上的锁,谁才能删除,但 finally 快的判断和 del 删除操作不是原子操作,并发的时候也会出问题,并发嘛,就是要保证数据的一致性,保证数据的一致性,最好要保证对数据的操作具有原子性。
在 Redis 的 set 命令介绍中,最后推荐 Lua 脚本进行锁的删除,地址
@RestController
public class IndexController7 {
public static final String REDIS_LOCK = good_lock
@Autowired
StringRedisTemplate template;
@RequestMapping(/buy7)
public String index(){
// 每个人进来先要进行加锁,key 值为 good_lock
String value = UUID.randomUUID().toString().replace( - ,
try{
// 为 key 加一个过期时间
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return 抢锁失败! }
System.out.println( value+ 抢锁成功
String result = template.opsForValue().get( goods:001
int total = result == null ? 0 : Integer.parseInt(result);
if (total 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。 int realTotal = total - 1;
template.opsForValue().set( goods:001 , String.valueOf(realTotal));
System.out.println( 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
return 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
} else {
System.out.println( 购买商品失败,服务端口为 8001
}
return 购买商品失败,服务端口为 8001
}finally {
// 谁加的锁,谁才能删除,使用 Lua 脚本,进行锁的删除
Jedis jedis = null;
try{ jedis = RedisUtils.getJedis();
String script = if redis.call(get ,KEYS[1]) == ARGV[1] +
then +
return redis.call(del ,KEYS[1]) +
else +
return 0 +
end
Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
if(1 .equals(eval.toString())){
System.out.println( -----del redis lock ok....
}else{
System.out.println( -----del redis lock error ....
}
}catch (Exception e){ }finally { if(null != jedis){ jedis.close();
}
}
}
}
}
3.5 方式五(改进方式四)
在方式四下,规定了谁上的锁,谁才能删除,并且解决了删除操作没有原子性问题。但还没有考虑缓存续命,以及 Redis 集群部署下,异步复制造成的锁丢失:主节点没来得及把刚刚 set 进来这条数据给从节点,就挂了。所以直接上 RedLock 的 Redisson 落地实现。
@RestController
public class IndexController8 {
public static final String REDIS_LOCK = good_lock
@Autowired
StringRedisTemplate template;
@Autowired
Redisson redisson;
@RequestMapping(/buy8)
public String index(){ RLock lock = redisson.getLock(REDIS_LOCK);
lock.lock();
// 每个人进来先要进行加锁,key 值为 good_lock
String value = UUID.randomUUID().toString().replace( - ,
try{ String result = template.opsForValue().get( goods:001
int total = result == null ? 0 : Integer.parseInt(result);
if (total 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。 int realTotal = total - 1;
template.opsForValue().set( goods:001 , String.valueOf(realTotal));
System.out.println( 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
return 购买商品成功,库存还剩: + realTotal + 件, 服务端口为 8001
} else {
System.out.println( 购买商品失败,服务端口为 8001
}
return 购买商品失败,服务端口为 8001
}finally { if(lock.isLocked() lock.isHeldByCurrentThread()){ lock.unlock();
}
}
}
}
读到这里,这篇“Redis 实现分布式锁的五种方法是什么”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注丸趣 TV 行业资讯频道。