基于redis的分布式锁怎么实现

47次阅读
没有评论

共计 10768 个字符,预计需要花费 27 分钟才能阅读完成。

本篇内容介绍了“基于 redis 的分布式锁怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

基于 redis 的分布式锁

/**
* 分布式锁工厂类
public class RedisLockUtil {private static final Logger logger = Logger.getLogger(RedisLockUtil.class);
 private static Object schemeLock = new Object();
 private static Map String,RedisLockUtil  instances = new ConcurrentHashMap();
 public static RedisLockUtil getInstance(String schema){RedisLockUtil u = instances.get(schema);
 if(u==null){synchronized(schemeLock){u = instances.get(schema);
 if(u == null){LockObserver lo = new LockObserver(schema); 
 u = new RedisLockUtil(schema,lo);
 instances.put(schema, u);
 return u;
 private Object mutexLock = new Object();
 private Map String,Object  mutexLockMap = new ConcurrentHashMap();
 private Map String,RedisReentrantLock  cache = new ConcurrentHashMap String,RedisReentrantLock 
 private DelayQueue RedisReentrantLock  dq = new DelayQueue RedisReentrantLock 
 private AbstractLockObserver lo;
 public RedisLockUtil(String schema, AbstractLockObserver lo){Thread th = new Thread(lo);
 th.setDaemon(false);
 th.setName(Lock Observer: +schema);
 th.start();
 clearUselessLocks(schema);
 this.lo = lo;
 public void clearUselessLocks(String schema){Thread th = new Thread(new Runnable(){
 @Override
 public void run() {while(!SystemExitListener.isOver()){
 try {RedisReentrantLock t = dq.take(); 
 if(t.clear()){String key = t.getKey();
 synchronized(getMutex(key)){cache.remove(key);
 t.resetCleartime();} catch (InterruptedException e) {th.setDaemon(true);
 th.setName(Lock cleaner: +schema);
 th.start();
 private Object getMutex(String key){Object mx = mutexLockMap.get(key);
 if(mx == null){synchronized(mutexLock){mx = mutexLockMap.get(key);
 if(mx==null){mx = new Object();
 mutexLockMap.put(key,mx);
 return mx;
 private RedisReentrantLock getLock(String key,boolean addref){RedisReentrantLock lock = cache.get(key);
 if(lock == null){synchronized(getMutex(key)){lock = cache.get(key);
 if(lock == null){lock = new RedisReentrantLock(key,lo);
 cache.put(key, lock);
 if(addref){if(!lock.incRef()){synchronized(getMutex(key)){lock = cache.get(key);
 if(!lock.incRef()){lock = new RedisReentrantLock(key,lo);
 cache.put(key, lock);
 return lock;
 public void reset(){for(String s : cache.keySet()){getLock(s,false).unlock();
  *  尝试加锁
  *  如果当前线程已经拥有该锁的话, 直接返回, 表示不用再次加锁, 此时不应该再调用 unlock 进行解锁
  * 
  * @param key
  * @return
  * @throws Exception 
  * @throws InterruptedException
  * @throws KeeperException
  */
 public LockStat lock(String key) {return lock(key,-1);
 public LockStat lock(String key,int timeout) {RedisReentrantLock ll = getLock(key,true);
 ll.incRef();
 try{if(ll.isOwner(false)){ll.descrRef();
 return LockStat.NONEED;
 if(ll.lock(timeout)){return LockStat.SUCCESS;}else{ll.descrRef();
 if(ll.setCleartime()){dq.put(ll);
 return null;
 }catch(LockNotExistsException e){ll.descrRef();
 return lock(key,timeout);
 }catch(RuntimeException e){ll.descrRef();
 throw e;
 public void unlock(String key,LockStat stat) {unlock(key,stat,false);
 public void unlock(String key,LockStat stat,boolean keepalive){if(stat == null) return;
 if(LockStat.SUCCESS.equals(stat)){RedisReentrantLock lock = getLock(key,false);
 boolean candestroy = lock.unlock();
 if(candestroy   !keepalive){if(lock.setCleartime()){dq.put(lock);
 public static enum LockStat{
 NONEED,
 SUCCESS
}

/**
* 分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger =  Logger.getLogger(RedisReentrantLock.class);
   private ReentrantLock reentrantLock = new ReentrantLock();

   private RedisLock redisLock;
   private long timeout = 3*60;
   private CountDownLatch lockcount = new CountDownLatch(1);

   private String key;
   private AbstractLockObserver observer;

   private int ref = 0;
   private Object refLock = new Object();
   private boolean destroyed = false;

   private long cleartime = -1;

   public RedisReentrantLock(String key,AbstractLockObserver observer) {
    this.key = key;
    this.observer = observer;
    initWriteLock();
   }

public boolean isDestroyed() {
return destroyed;
}

private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}

@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
    },observer);
}

public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
    ref ++;
    }
return true;
}

public void descrRef(){
synchronized(refLock){
    ref --;
    }
}

public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
    if(ref 0){
    return false;
    }
    destroyed = true;
    redisLock.clear();
    redisLock = null;
    return true;
    }
}

   public boolean lock(long timeout) throws LockNotExistsException{
    if(timeout = 0) timeout = this.timeout;
    //incRef();
       reentrantLock.lock();// 多线程竞争时,先拿到第一层锁
       if(redisLock == null){
        reentrantLock.unlock();
        //descrRef();
        throw new LockNotExistsException();
       }
       try{
        lockcount = new CountDownLatch(1);
        boolean res = redisLock.trylock(timeout);
        if(!res){ 
        lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
        if(!redisLock.doExpire()){
        reentrantLock.unlock();
return false;
}
        }
        return true;
       }catch(InterruptedException e){
        reentrantLock.unlock();
        return false;
       }
   }

   public boolean lock() throws LockNotExistsException {
    return lock(timeout);
   }

   public boolean unlock(){
    if(!isOwner(true)) {
    try{
    throw new RuntimeException(big ================================================ error.key: +key);
    }catch(Exception e){
    logger.error(err: +e,e);
    }
    return false;
    }
       try{
        redisLock.unlock();
        reentrantLock.unlock();// 多线程竞争时,释放最外层锁
       }catch(RuntimeException e){
        reentrantLock.unlock();// 多线程竞争时,释放最外层锁
        throw e;
       }finally{
        descrRef();
       }
       return canDestroy();
   }

   public boolean canDestroy(){
    synchronized(refLock){
    return ref
    }
   }

   public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public boolean isOwner(boolean check) {
    synchronized(refLock){
    if(redisLock == null) {
    logger.error(reidsLock is null:key= +key);
    return false;
    }
    boolean a = reentrantLock.isHeldByCurrentThread();
    boolean b = redisLock.isOwner();
    if(check){
    if(!a || !b){
    logger.error(key+ a: +a+ b: +b);
    }
    }
    return a
    }
   }

public boolean setCleartime() {
synchronized(this){
if(cleartime 0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}

public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}

@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
       long l = this.cleartime - t.cleartime;

if(l 0) return 1 ; // 比当前的小则返回 1,比当前的大则返回 -1,否则为 0
else if(l 0) return -1;
else return 0;
}
return 0;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    return d;
}

}

/**
* 使用 Redis 实现的分布式锁
* 基本工作原理如下:*1.  使用 setnx(key, 时间戮 + 超时),如果设置成功,则直接拿到锁
*2.  如果设置不成功,获取 key 的值 v1(它的到期时间戮), 跟当前时间对比,看是否已经超时
*3.  如果超时(说明拿到锁的结点已经挂掉 ),v2=getset(key, 时间戮 + 超时 +1),判断 v2 是否等于 v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试 (200MS)
public class RedisLock implements LockListener{
 private String key;
 private boolean owner = false;
 private AbstractLockObserver observer = null;
 private LockListener lockListener = null;
 private boolean waiting = false;
 private long expire;// 锁超时时间,以秒为单位
 private boolean expired = false;
 public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
 this.key = key;
 this.lockListener = lockListener;
 this.observer = observer;
 public boolean trylock(long expire) {synchronized(this){if(owner){
 return true;
 this.expire = expire;
 this.expired = false;
 if(!waiting){owner = observer.tryLock(key,expire);
 if(!owner){
 waiting = true;
 observer.addLockListener(key, this);
 return owner;
 public boolean isOwner() {
 return owner;
 public void unlock() {synchronized(this){observer.unLock(key);
 owner = false;
 public void clear() {synchronized(this){if(waiting) {observer.removeLockListener(key);
 waiting = false;
 public boolean doExpire(){synchronized(this){if(owner) return true;
 if(expired) return false;
 expired = true;
 clear();
 return false;
 @Override
 public void lockAcquired() {synchronized(this){if(expired){unlock();
 return;
 owner = true;
 waiting = false;
 lockListener.lockAcquired();
 @Override
 public long getExpire() {
 return this.expire;
 @Override
 public void lockError() {synchronized(this){
 owner = false;
 waiting = false;
 lockListener.lockError();}

public class LockObserver extends AbstractLockObserver implements Runnable{
 private CacheRedisClient client;
 private Object mutex = new Object();
 private Map String,LockListener  lockMap = new ConcurrentHashMap();
 private boolean stoped = false;
 private long interval = 500;
 private boolean terminated = false;
 private CountDownLatch doneSignal = new CountDownLatch(1);
 public LockObserver(String schema){client = new CacheRedisClient(schema);
 SystemExitListener.addTerminateListener(new ExitHandler(){public void run() {
 stoped = true;
 try {doneSignal.await();
 } catch (InterruptedException e) {
public void addLockListener(String key,LockListener listener){if(terminated){listener.lockError(); return; synchronized(mutex){lockMap.put(key, listener); public void removeLockListener(String key){synchronized(mutex){lockMap.remove(key); @Override public void run() {while(!terminated){long p1 = System.currentTimeMillis(); Map String,LockListener  clone = new HashMap(); synchronized(mutex){clone.putAll(lockMap); Set String  keyset = clone.keySet(); if(keyset.size()   0){ConnectionFactory.setSingleConnectionPerThread(keyset.size()); for(String key : keyset){LockListener ll = clone.get(key); try{ if(tryLock(key,ll.getExpire())) {  ll.lockAcquired();   removeLockListener(key);  } }catch(Exception e){ll.lockError(); removeLockListener(key); ConnectionFactory.releaseThreadConnection();}else{if(stoped){ terminated = true; doneSignal.countDown(); return; try {long p2 = System.currentTimeMillis(); long cost = p2 - p1; if(cost  = interval){Thread.sleep(interval - cost); }else{Thread.sleep(interval*2); } catch (InterruptedException e) {
 */ public boolean tryLock(final String key,final long expireInSecond){if(terminated) return false; final long tt = System.currentTimeMillis(); final long expire = expireInSecond * 1000; final Long ne = tt + expire; List Object  mm = client.multi(key, new MultiBlock(){ @Override public void execute() {transaction.setnxObject(key, ne); transaction.get(SafeEncoder.encode(key)); Long res = (Long)mm.get(0);  if(new Long(1).equals(res)) {   return true;  }else{  byte[] bb = (byte[])mm.get(1); Long ex = client.deserialize(bb);   if(ex == null || tt   ex){  Long old = client.getSet(key, new Long(ne+1));   if(old == null || (ex == null old==null) || (ex!=null ex.equals(old))){   return true;   }   }  }  return false; public void unLock(String key){client.del(key); }

使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:

500 线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218

TPS 达到 35000,比方案 1 强了整整一个数量级

“基于 redis 的分布式锁怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计10768字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)