共计 10768 个字符,预计需要花费 27 分钟才能阅读完成。
本篇内容介绍了“基于 redis 的分布式锁怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
基于 redis 的分布式锁
/**
* 分布式锁工厂类
public class RedisLockUtil {private static final Logger logger = Logger.getLogger(RedisLockUtil.class);
private static Object schemeLock = new Object();
private static Map String,RedisLockUtil instances = new ConcurrentHashMap();
public static RedisLockUtil getInstance(String schema){RedisLockUtil u = instances.get(schema);
if(u==null){synchronized(schemeLock){u = instances.get(schema);
if(u == null){LockObserver lo = new LockObserver(schema);
u = new RedisLockUtil(schema,lo);
instances.put(schema, u);
return u;
private Object mutexLock = new Object();
private Map String,Object mutexLockMap = new ConcurrentHashMap();
private Map String,RedisReentrantLock cache = new ConcurrentHashMap String,RedisReentrantLock
private DelayQueue RedisReentrantLock dq = new DelayQueue RedisReentrantLock
private AbstractLockObserver lo;
public RedisLockUtil(String schema, AbstractLockObserver lo){Thread th = new Thread(lo);
th.setDaemon(false);
th.setName(Lock Observer: +schema);
th.start();
clearUselessLocks(schema);
this.lo = lo;
public void clearUselessLocks(String schema){Thread th = new Thread(new Runnable(){
@Override
public void run() {while(!SystemExitListener.isOver()){
try {RedisReentrantLock t = dq.take();
if(t.clear()){String key = t.getKey();
synchronized(getMutex(key)){cache.remove(key);
t.resetCleartime();} catch (InterruptedException e) {th.setDaemon(true);
th.setName(Lock cleaner: +schema);
th.start();
private Object getMutex(String key){Object mx = mutexLockMap.get(key);
if(mx == null){synchronized(mutexLock){mx = mutexLockMap.get(key);
if(mx==null){mx = new Object();
mutexLockMap.put(key,mx);
return mx;
private RedisReentrantLock getLock(String key,boolean addref){RedisReentrantLock lock = cache.get(key);
if(lock == null){synchronized(getMutex(key)){lock = cache.get(key);
if(lock == null){lock = new RedisReentrantLock(key,lo);
cache.put(key, lock);
if(addref){if(!lock.incRef()){synchronized(getMutex(key)){lock = cache.get(key);
if(!lock.incRef()){lock = new RedisReentrantLock(key,lo);
cache.put(key, lock);
return lock;
public void reset(){for(String s : cache.keySet()){getLock(s,false).unlock();
* 尝试加锁
* 如果当前线程已经拥有该锁的话, 直接返回, 表示不用再次加锁, 此时不应该再调用 unlock 进行解锁
*
* @param key
* @return
* @throws Exception
* @throws InterruptedException
* @throws KeeperException
*/
public LockStat lock(String key) {return lock(key,-1);
public LockStat lock(String key,int timeout) {RedisReentrantLock ll = getLock(key,true);
ll.incRef();
try{if(ll.isOwner(false)){ll.descrRef();
return LockStat.NONEED;
if(ll.lock(timeout)){return LockStat.SUCCESS;}else{ll.descrRef();
if(ll.setCleartime()){dq.put(ll);
return null;
}catch(LockNotExistsException e){ll.descrRef();
return lock(key,timeout);
}catch(RuntimeException e){ll.descrRef();
throw e;
public void unlock(String key,LockStat stat) {unlock(key,stat,false);
public void unlock(String key,LockStat stat,boolean keepalive){if(stat == null) return;
if(LockStat.SUCCESS.equals(stat)){RedisReentrantLock lock = getLock(key,false);
boolean candestroy = lock.unlock();
if(candestroy !keepalive){if(lock.setCleartime()){dq.put(lock);
public static enum LockStat{
NONEED,
SUCCESS
}
/**
* 分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}
public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ++;
}
return true;
}
public void descrRef(){
synchronized(refLock){
ref --;
}
}
public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException{
if(timeout = 0) timeout = this.timeout;
//incRef();
reentrantLock.lock();// 多线程竞争时,先拿到第一层锁
if(redisLock == null){
reentrantLock.unlock();
//descrRef();
throw new LockNotExistsException();
}
try{
lockcount = new CountDownLatch(1);
boolean res = redisLock.trylock(timeout);
if(!res){
lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
if(!redisLock.doExpire()){
reentrantLock.unlock();
return false;
}
}
return true;
}catch(InterruptedException e){
reentrantLock.unlock();
return false;
}
}
public boolean lock() throws LockNotExistsException {
return lock(timeout);
}
public boolean unlock(){
if(!isOwner(true)) {
try{
throw new RuntimeException(big ================================================ error.key: +key);
}catch(Exception e){
logger.error(err: +e,e);
}
return false;
}
try{
redisLock.unlock();
reentrantLock.unlock();// 多线程竞争时,释放最外层锁
}catch(RuntimeException e){
reentrantLock.unlock();// 多线程竞争时,释放最外层锁
throw e;
}finally{
descrRef();
}
return canDestroy();
}
public boolean canDestroy(){
synchronized(refLock){
return ref
}
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error(reidsLock is null:key= +key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key+ a: +a+ b: +b);
}
}
return a
}
}
public boolean setCleartime() {
synchronized(this){
if(cleartime 0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}
public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}
@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;
if(l 0) return 1 ; // 比当前的小则返回 1,比当前的大则返回 -1,否则为 0
else if(l 0) return -1;
else return 0;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return d;
}
}
/**
* 使用 Redis 实现的分布式锁
* 基本工作原理如下:*1. 使用 setnx(key, 时间戮 + 超时),如果设置成功,则直接拿到锁
*2. 如果设置不成功,获取 key 的值 v1(它的到期时间戮), 跟当前时间对比,看是否已经超时
*3. 如果超时(说明拿到锁的结点已经挂掉 ),v2=getset(key, 时间戮 + 超时 +1),判断 v2 是否等于 v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试 (200MS)
public class RedisLock implements LockListener{
private String key;
private boolean owner = false;
private AbstractLockObserver observer = null;
private LockListener lockListener = null;
private boolean waiting = false;
private long expire;// 锁超时时间,以秒为单位
private boolean expired = false;
public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
this.key = key;
this.lockListener = lockListener;
this.observer = observer;
public boolean trylock(long expire) {synchronized(this){if(owner){
return true;
this.expire = expire;
this.expired = false;
if(!waiting){owner = observer.tryLock(key,expire);
if(!owner){
waiting = true;
observer.addLockListener(key, this);
return owner;
public boolean isOwner() {
return owner;
public void unlock() {synchronized(this){observer.unLock(key);
owner = false;
public void clear() {synchronized(this){if(waiting) {observer.removeLockListener(key);
waiting = false;
public boolean doExpire(){synchronized(this){if(owner) return true;
if(expired) return false;
expired = true;
clear();
return false;
@Override
public void lockAcquired() {synchronized(this){if(expired){unlock();
return;
owner = true;
waiting = false;
lockListener.lockAcquired();
@Override
public long getExpire() {
return this.expire;
@Override
public void lockError() {synchronized(this){
owner = false;
waiting = false;
lockListener.lockError();}
public class LockObserver extends AbstractLockObserver implements Runnable{
private CacheRedisClient client;
private Object mutex = new Object();
private Map String,LockListener lockMap = new ConcurrentHashMap();
private boolean stoped = false;
private long interval = 500;
private boolean terminated = false;
private CountDownLatch doneSignal = new CountDownLatch(1);
public LockObserver(String schema){client = new CacheRedisClient(schema);
SystemExitListener.addTerminateListener(new ExitHandler(){public void run() {
stoped = true;
try {doneSignal.await();
} catch (InterruptedException e) {
public void addLockListener(String key,LockListener listener){if(terminated){listener.lockError();
return;
synchronized(mutex){lockMap.put(key, listener);
public void removeLockListener(String key){synchronized(mutex){lockMap.remove(key);
@Override
public void run() {while(!terminated){long p1 = System.currentTimeMillis();
Map String,LockListener clone = new HashMap();
synchronized(mutex){clone.putAll(lockMap);
Set String keyset = clone.keySet();
if(keyset.size() 0){ConnectionFactory.setSingleConnectionPerThread(keyset.size());
for(String key : keyset){LockListener ll = clone.get(key);
try{ if(tryLock(key,ll.getExpire())) { ll.lockAcquired();
removeLockListener(key);
}
}catch(Exception e){ll.lockError();
removeLockListener(key);
ConnectionFactory.releaseThreadConnection();}else{if(stoped){
terminated = true;
doneSignal.countDown();
return;
try {long p2 = System.currentTimeMillis();
long cost = p2 - p1;
if(cost = interval){Thread.sleep(interval - cost);
}else{Thread.sleep(interval*2);
} catch (InterruptedException e) {
*/
public boolean tryLock(final String key,final long expireInSecond){if(terminated) return false;
final long tt = System.currentTimeMillis();
final long expire = expireInSecond * 1000;
final Long ne = tt + expire;
List Object mm = client.multi(key, new MultiBlock(){
@Override
public void execute() {transaction.setnxObject(key, ne);
transaction.get(SafeEncoder.encode(key));
Long res = (Long)mm.get(0);
if(new Long(1).equals(res)) {
return true;
}else{ byte[] bb = (byte[])mm.get(1);
Long ex = client.deserialize(bb);
if(ex == null || tt ex){ Long old = client.getSet(key, new Long(ne+1));
if(old == null || (ex == null old==null) || (ex!=null ex.equals(old))){
return true;
}
}
}
return false;
public void unLock(String key){client.del(key);
}
使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:
500 线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218
TPS 达到 35000,比方案 1 强了整整一个数量级
“基于 redis 的分布式锁怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!
正文完