Redis的分布式锁应该怎么打开

54次阅读
没有评论

共计 6200 个字符,预计需要花费 16 分钟才能阅读完成。

这篇文章主要讲解了“Redis 的分布式锁应该怎么打开”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Redis 的分布式锁应该怎么打开”吧!

要求

基于 Redis 实现分布式锁需要满足如下几点要求:

在分布式集群中,被分布式锁控制的方法或代码段同一时刻只能被一个客户端上面的一个线程执行,也就是互斥

锁信息需要设置过期时间,避免一个线程长期占有(比如在做解锁操作前异常退出)而导致死锁

加锁与解锁必须一致,谁加的锁,就由谁来解(或过期超时),一个客户端不能解开另一个客户端加的锁

加锁与解锁的过程必须保证原子性

实现 1. 加锁实现

基于 Redis 的分布式锁加锁操作一般使用  SETNX  命令,其含义是“将  key  的值设为  value ,当且仅当  key  不存在。若给定的  key  已经存在,则  SETNX  不做任何动作”。
在 Spring Boot 中,可以使用 StringRedisTemplate 来实现,如下,一行代码即可实现加锁过程。(下列代码给出两种调用形式——立即返回加锁结果与给定超时时间获取加锁结果)

/**
 *  尝试获取锁(立即返回) * @param key  锁的 redis key
 * @param value  锁的 value
 * @param expire  过期时间 / 秒
 * @return  是否获取成功
 */public boolean lock(String key, String value, long expire) { return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
 *  尝试获取锁,并至多等待 timeout 时长
 *
 * @param key  锁的 redis key
 * @param value  锁的 value
 * @param expire  过期时间 / 秒
 * @param timeout  超时时长
 * @param unit  时间单位
 * @return  是否获取成功
 */public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) { long waitMillis = unit.toMillis(timeout); long waitAlready = 0; while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS)   waitAlready   waitMillis) { try { Thread.sleep(waitMillisPer);
 } catch (InterruptedException e) { log.error( Interrupted when trying to get a lock. key: {} , key, e);
 }
 waitAlready += waitMillisPer;
 } if (waitAlready   waitMillis) { return true;
 }
 log.warn(====== lock {} failed after waiting for {} ms , key, waitAlready); return false;
}

上述实现如何满足前面提到的几点要求:

客户端互斥:可以将 expire 过期时间设置为大于同步代码的执行时间,比如同步代码块执行时间为 1s,则可将 expire 设置为 3s 或 5s。避免同步代码执行过程中 expire 时间到,其它客户端又可以获取锁执行同步代码块。

通过设置过期时间 expire 来避免某个客户端长期占有锁。

通过 value 来控制谁加的锁,由谁解的逻辑,比如可以使用 requestId 作为 value,requestId 唯一标记一次请求。

setIfAbsent 方法 底层通过调用 Redis 的  SETNX  命令,操作具备原子性。

错误示例:

网上有如下实现,

public boolean lock(String key, String value, long expire) { boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value); if(result) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
 } return result;
}

该实现的问题是如果在 result 为 true,但还没成功设置 expire 时,程序异常退出了,将导致该锁一直被占用而导致死锁,不满足第二点要求。

2. 解锁实现

解锁也需要满足前面所述的四个要求,实现代码如下:

private static final String RELEASE_LOCK_LUA_SCRIPT =  if redis.call(get , KEYS[1]) == ARGV[1] then return redis.call(del , KEYS[1]) else return 0 end private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;/**
 *  释放锁
 * @param key  锁的 redis key
 * @param value  锁的 value
 */public boolean unLock(String key, String value) { DefaultRedisScript Long  redisScript = new DefaultRedisScript (RELEASE_LOCK_LUA_SCRIPT, Long.class); long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value); return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}

这段实现使用一个 Lua 脚本来实现解锁操作,保证操作的原子性。传入的 value 值需与该线程加锁时的 value 一致,可以使用 requestId(具体实现下面给出)。

错误示例:

 public boolean unLock(String key, String value) { String oldValue = stringRedisTemplate.opsForValue().get(key); if(value.equals(oldValue)) { stringRedisTemplate.delete(key);
 }
}

该实现先获取锁的当前值,判断两值相等则删除。考虑一种极端情况,如果在判断为 true 时,刚好该锁过期时间到,另一个客户端加锁成功,则接下来的 delete 将不管三七二十一将别人加的锁直接删掉了,不满足第三点要求。该示例主要是因为没有保证解锁操作的原子性导致。

3. 注解支持

为了方便使用,添加一个注解,可以放于方法上控制方法在分布式环境中的同步执行。

/**
*  标注在方法上的分布式锁注解
*/@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)public @interface DistributedLockable { String key(); String prefix() default  disLock:  long expire() default 10L; //  默认 10s 过期 }

添加一个切面来解析注解的处理,

/**
*  分布式锁注解处理切面
*/@Aspect@Slf4jpublic class DistributedLockAspect { private DistributedLock lock; public DistributedLockAspect(DistributedLock lock) { this.lock = lock;
 } /**
 *  在方法上执行同步锁
 */
 @Around(value =  @annotation(lockable) ) public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable { boolean locked = false;
 String key = lockable.prefix() + lockable.key(); try { locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire()); if(locked) { return point.proceed();
 } else { log.info( Did not get a lock for key {} , key); return null;
 }
 } catch (Exception e) { throw e;
 } finally { if(locked) { if(!lock.unLock(key, WebUtil.getRequestId())){ log.warn( Unlock {} failed, maybe locked by another client already.  , lockable.key());
 }
 }
 }
 }
}

RequestId 的实现如下,通过注册一个 Filter,在请求开始时生成一个 uuid 存于 ThreadLocal 中,在请求返回时清除。

public class WebUtil { public static final String REQ_ID_HEADER =  Req-Id  private static final ThreadLocal String  reqIdThreadLocal = new ThreadLocal (); public static void setRequestId(String requestId) { reqIdThreadLocal.set(requestId);
 } public static String getRequestId(){ String requestId = reqIdThreadLocal.get(); if(requestId == null) { requestId = ObjectId.next();
 reqIdThreadLocal.set(requestId);
 } return requestId;
 } public static void removeRequestId() { reqIdThreadLocal.remove();
 }
}public class RequestIdFilter implements Filter { @Override
 public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
 String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER); // 没有则生成一个
 if (StringUtils.isEmpty(reqId)) { reqId = ObjectId.next();
 }
 WebUtil.setRequestId(reqId); try { filterChain.doFilter(servletRequest, servletResponse);
 } finally { WebUtil.removeRequestId();
 }
 }
}// 在配置类中注册 Filter/**
*  添加 RequestId
* @return*/@Beanpublic FilterRegistrationBean requestIdFilter() { RequestIdFilter reqestIdFilter = new RequestIdFilter();
 FilterRegistrationBean registrationBean = new FilterRegistrationBean();
 registrationBean.setFilter(reqestIdFilter);
 List String  urlPatterns = Collections.singletonList( /* 
 registrationBean.setUrlPatterns(urlPatterns);
 registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1); return registrationBean;
}

4. 使用注解

@DistributedLockable(key =  test , expire = 10)public void test(){ System.out.println( 线程 - +Thread.currentThread().getName()+ 开始执行...  + LocalDateTime.now()); try { Thread.sleep(2000);
 } catch (InterruptedException e) { e.printStackTrace();
 }
 System.out.println(线程 - +Thread.currentThread().getName()+ 结束执行...  + LocalDateTime.now());
}

总结

本文给出了基于 Redis 的分布式锁的实现方案与常见的错误示例。要保障分布式锁的正确运行,需满足本文所提的四个要求,尤其注意保证加锁解锁操作的原子性,设置过期时间,及对同一个锁的加锁解锁线程一致。

感谢各位的阅读,以上就是“Redis 的分布式锁应该怎么打开”的内容了,经过本文的学习后,相信大家对 Redis 的分布式锁应该怎么打开这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-28发表,共计6200字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)