共计 6775 个字符,预计需要花费 17 分钟才能阅读完成。
自动写代码机器人,免费开通
Redis 分布式锁有哪些?这个问题可能是我们日常学习或工作经常见到的。希望通过这个问题能让你收获颇深。下面是丸趣 TV 小编给大家带来的参考内容,让我们一起来看看吧!
我们通常使用的 synchronized 或者 Lock 都是线程锁,对同一个 JVM 进程内的多个线程有效。因为锁的本质 是内存中存放一个标记,记录获取锁的线程是谁,这个标记对每个线程都可见。然而我们启动的多个订单服务,就是多个 JVM,内存中的锁显然是不共享的,每个 JVM 进程都有自己的 锁,自然无法保证线程的互斥了,这个时候我们就需要使用到分布式锁了。常用的有三种解决方案:1. 基于数据库实现 2. 基于 zookeeper 的临时序列化节点实现 3.redis 实现。本文我们介绍的就是 redis 的实现方式。
实现分布式锁要满足 3 点:多进程可见,互斥,可重入。
1)多进程可见
redis 本身就是基于 JVM 之外的,因此满足多进程可见的要求。
2)互斥
即同一时间只能有一个进程获取锁标记,我们可以通过 redis 的 setnx 实现,只有第一次执行的才会成功并返回 1,其它情况返回 0。
释放锁
释放锁其实只需要把锁的 key 删除即可,使用 del xxx 指令。不过,如果在我们执行 del 之前,服务突然宕机,那么锁就永远无法删除了。所以我们可以通过 setex 命令设置过期时间即可。
import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/**
* 第一种分布式锁 */@Componentpublic class RedisService {private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
JedisPool jedisPool;
// 获取锁之前的超时时间 (获取锁的等待重试时间)
private long acquireTimeout = 5000; // 获取锁之后的超时时间 (防止死锁)
private int timeOut = 10000;
* 获取分布式锁
* @return 锁标识 */
public boolean getRedisLock(String lockName,String val) {
Jedis jedis = null; try {jedis = jedisPool.getResource(); // 1. 计算获取锁的时间
Long endTime = System.currentTimeMillis() + acquireTimeout; // 2. 尝试获取锁
while (System.currentTimeMillis() endTime) { // 3. 获取锁成功就设置过期时间
if (jedis.setnx(lockName, val) == 1) {jedis.expire(lockName, timeOut/1000); return true;
} catch (Exception e) {log.error(e.getMessage());
} finally {returnResource(jedis);
} return false;
} /**
* 释放分布式锁
* @param lockName 锁名称 */
public void unRedisLock(String lockName) {
Jedis jedis = null; try {jedis = jedisPool.getResource(); // 释放锁 jedis.del(lockName);
} catch (Exception e) {log.error(e.getMessage());
} finally {returnResource(jedis);
}// ===============================================
public String get(String key) {
Jedis jedis = null;
String value = null; try {jedis = jedisPool.getResource();
value = jedis.get(key);
log.info(value);
} catch (Exception e) {log.error(e.getMessage());
} finally {returnResource(jedis);
} return value;
public void set(String key, String value) {
Jedis jedis = null; try {jedis = jedisPool.getResource();
jedis.set(key, value);
} catch (Exception e) {log.error(e.getMessage());
} finally {returnResource(jedis);
} /**
* 关闭连接 */
public void returnResource(Jedis jedis) {try { if(jedis!=null) jedis.close();} catch (Exception e) {}
上面的分布式锁实现了,但是这时候还可能出现另外 2 个问题:
一:获取锁时
setnx 获取锁成功了,还没来得及 setex 服务就宕机了,由于这种非原子性的操作,死锁又发生了。其实 redis 提供了 nx 与 ex 连用的命令。
二:释放锁时
1. 3 个进程:A 和 B 和 C,在执行任务,并争抢锁,此时 A 获取了锁,并设置自动过期时间为 10s
2. A 开始执行业务,因为某种原因,业务阻塞,耗时超过了 10 秒,此时锁自动释放了
3. B 恰好此时开始尝试获取锁,因为锁已经自动释放,成功获取锁
4. A 此时业务执行完毕,执行释放锁逻辑(删除 key),于是 B 的锁被释放了,而 B 其实还在执行业务
5. 此时进程 C 尝试获取锁,也成功了,因为 A 把 B 的锁删除了。
问题出现了:B 和 C 同时获取了锁,违反了互斥性!如何解决这个问题呢?我们应该在删除锁之前,判断这个锁是否是自己设置的锁,如果不是(例如自己 的锁已经超时释放),那么就不要删除了。所以我们可以在 set 锁时,存入当前线程的唯一标识!删除锁前,判断下里面的值是不是与自己标识释放一 致,如果不一致,说明不是自己的锁,就不要删除了。
/**
* 第二种分布式锁 */public class RedisTool { private static final String LOCK_SUCCESS = OK
private static final Long RELEASE_SUCCESS = 1L; /**
* 尝试获取分布式锁
* @param jedis Redis 客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功 */
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, NX , PX , expireTime); if (LOCK_SUCCESS.equals(result)) {return true;} return false;
} /**
* 释放分布式锁
* @param jedis Redis 客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功 */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {if (jedis.get(lockKey).equals(requestId)) {System.out.println( 释放锁... + Thread.currentThread().getName() + ,identifierValue: + requestId);
jedis.del(lockKey); return true;
} return false;
}
按照上面方式实现分布式锁之后,就可以轻松解决大部分问题了。网上很多博客也都是这么实现的,但是仍然有些场景是不满足的,例如一个方法获取到锁之后,可能在方法内调这个方法此时就获取不到锁了。这个时候我们就需要把锁改进成可重入式锁了。
3)重入锁:
也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。换一种说法:同一个线程再次进入同步代码时,可以使用自己已获取到的锁。可重入锁可以避免因同一线程中多次获取锁而导致死锁发生。像 synchronized 就是一个重入锁,它是通过 moniter 函数记录当前线程信息来实现的。实现可重入锁需要考虑两点:
获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取,而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果锁进入了多次,在内层直接删除锁,导致外部的业务在没有锁的情况下执行,会有安全问题。因此必须获取锁时累计重入的次数,释放时则减去重入次数,如果减到 0,则可以删除锁。
下面我们假设锁的 key 为“lock”,hashKey 是当前线程的 id:“threadId”,锁自动释放时间假设为 20
获取锁的步骤:1、判断 lock 是否存在 EXISTS lock
2、不存在,则自己获取锁,记录重入层数为 1. 2、存在,说明有人获取锁了,下面判断是不是自己的锁, 即判断当前线程 id 作为 hashKey 是否存在:HEXISTS lock threadId
3、不存在,说明锁已经有了,且不是自己获取的,锁获取失败. 3、存在,说明是自己获取的锁,重入次数 +1:HINCRBY lock threadId 1,最后更新锁自动释放时间,EXPIRE lock 20
释放锁的步骤:1、判断当前线程 id 作为 hashKey 是否存在:HEXISTS lock threadId
2、不存在,说明锁已经失效,不用管了
2、存在,说明锁还在,重入次数减 1:HINCRBY lock threadId -1,3、获取新的重入次数,判断重入次数是否为 0,为 0 说明锁全部释放,删除 key:DEL lock
因此,存储在锁中的信息就必须包含:key、线程标识、重入次数。不能再使用简单的 key-value 结构,这里推荐使用 hash 结构。
获取锁的脚本 (注释删掉, 不然运行报错)
local key = KEYS[1]; -- 第 1 个参数, 锁的 keylocal threadId = ARGV[1]; -- 第 2 个参数, 线程唯一标识 local releaseTime = ARGV[2]; -- 第 3 个参数, 锁的自动释放时间 if(redis.call( exists , key) == 0) then -- 判断锁是否已存在
redis.call( hset , key, threadId, 1 -- 不存在, 则获取锁
redis.call(expire , key, releaseTime); -- 设置有效期
return 1; -- 返回结果 end;if(redis.call( hexists , key, threadId) == 1) then -- 锁已经存在,判断 threadId 是否是自己
redis.call( hincrby , key, threadId, 1 -- 如果是自己,则重入次数 +1
redis.call(expire , key, releaseTime); -- 设置有效期
return 1; -- 返回结果 end;return 0; -- 代码走到这里, 说明获取锁的不是自己,获取锁失败
释放锁的脚本 (注释删掉, 不然运行报错)
local key = KEYS[1]; -- 第 1 个参数, 锁的 keylocal threadId = ARGV[1]; -- 第 2 个参数, 线程唯一标识 if (redis.call( HEXISTS , key, threadId) == 0) then -- 判断当前锁是否还是被自己持有
return nil; -- 如果已经不是自己,则直接返回 end;local count = redis.call(HINCRBY , key, threadId, -1); -- 是自己的锁,则重入次数 -1if (count == 0) then -- 判断是否重入次数是否已经为 0
redis.call(DEL , key); -- 等于 0 说明可以释放锁,直接删除
return nil;
end;
完整代码
import java.util.Collections;import java.util.UUID;import org.springframework.core.io.ClassPathResource;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.scripting.support.ResourceScriptSource;/**
* Redis 可重入锁 */public class RedisLock {private static final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class); private static final DefaultRedisScript Long LOCK_SCRIPT; private static final DefaultRedisScript Object UNLOCK_SCRIPT; static { // 加载释放锁的脚本
LOCK_SCRIPT = new DefaultRedisScript ();
LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource( lock.lua)));
LOCK_SCRIPT.setResultType(Long.class); // 加载释放锁的脚本
UNLOCK_SCRIPT = new DefaultRedisScript ();
UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource( unlock.lua)));
} /**
* 获取锁
* @param lockName 锁名称
* @param releaseTime 超时时间 (单位: 秒)
* @return key 解锁标识 */
public static String tryLock(String lockName,String releaseTime) { // 存入的线程信息的前缀,防止与其它 JVM 中线程信息冲突
String key = UUID.randomUUID().toString(); // 执行脚本
Long result = redisTemplate.execute(
LOCK_SCRIPT,
Collections.singletonList(lockName),
key + Thread.currentThread().getId(), releaseTime); // 判断结果
if(result != null result.intValue() == 1) {return key;}else {return null;} /**
* 释放锁
* @param lockName 锁名称
* @param key 解锁标识 */
public static void unlock(String lockName,String key) { // 执行脚本 redisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(lockName),
key + Thread.currentThread().getId(), null);
}
感谢各位的阅读!看完上述内容,你们对 Redis 分布式锁有哪些大概了解了吗?希望文章内容对大家有所帮助。如果想了解更多相关文章内容,欢迎关注丸趣 TV 行业资讯频道。
向 AI 问一下细节
丸趣 TV 网 – 提供最优质的资源集合!