共计 8849 个字符,预计需要花费 23 分钟才能阅读完成。
本篇内容介绍了“zookeeper 分布式锁实现的方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一。为何使用分布式锁?
当应用服务器数量超过 1 台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如 MYSQL 的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于 GLEASY 这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。
二。对分布式锁的要求
1. 高性能(分布式锁不能成为系统的性能瓶颈)
2. 避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3. 支持锁重入
三。方案 1,基于 zookeeper 的分布式锁
/**
* DistributedLockUtil.java
* 分布式锁工厂类,所有分布式请求都由该工厂类负责
public class DistributedLockUtil {private static Object schemeLock = new Object();
private static Object mutexLock = new Object();
private static Map String,Object mutexLockMap = new ConcurrentHashMap();
private String schema;
private Map String,DistributedReentrantLock cache = new ConcurrentHashMap String,DistributedReentrantLock
private static Map String,DistributedLockUtil instances = new ConcurrentHashMap();
public static DistributedLockUtil getInstance(String schema){DistributedLockUtil u = instances.get(schema);
if(u==null){synchronized(schemeLock){u = instances.get(schema);
if(u == null){u = new DistributedLockUtil(schema);
instances.put(schema, u);
return u;
private DistributedLockUtil(String schema){
this.schema = schema;
private Object getMutex(String key){Object mx = mutexLockMap.get(key);
if(mx == null){synchronized(mutexLock){mx = mutexLockMap.get(key);
if(mx==null){mx = new Object();
mutexLockMap.put(key,mx);
return mx;
private DistributedReentrantLock getLock(String key){DistributedReentrantLock lock = cache.get(key);
if(lock == null){synchronized(getMutex(key)){lock = cache.get(key);
if(lock == null){lock = new DistributedReentrantLock(key,schema);
cache.put(key, lock);
return lock;
public void reset(){for(String s : cache.keySet()){getLock(s).unlock();
* 尝试加锁
* 如果当前线程已经拥有该锁的话, 直接返回 false, 表示不用再次加锁, 此时不应该再调用 unlock 进行解锁
*
* @param key
* @return
* @throws InterruptedException
* @throws KeeperException
*/
public LockStat lock(String key) throws InterruptedException, KeeperException{if(getLock(key).isOwner()){
return LockStat.NONEED;
getLock(key).lock();
return LockStat.SUCCESS;
public void clearLock(String key) throws InterruptedException, KeeperException{synchronized(getMutex(key)){DistributedReentrantLock l = cache.get(key);
l.clear();
cache.remove(key);
public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{unlock(key,stat,false);
public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{if(stat == null) return;
if(LockStat.SUCCESS.equals(stat)){DistributedReentrantLock lock = getLock(key);
boolean hasWaiter = lock.unlock();
if(!hasWaiter !keepalive){synchronized(getMutex(key)){lock.clear();
cache.remove(key);
public static enum LockStat{
NONEED,
SUCCESS
}
/**
*DistributedReentrantLock.java
* 本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销
public class DistributedReentrantLock {private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private WriteLock writeLock;
private long timeout = 3*60*1000;
private final Object mutex = new Object();
private String dir;
private String schema;
private final ExitListener exitListener = new ExitListener(){
@Override
public void execute() {initWriteLock();
private synchronized void initWriteLock(){
logger.debug( 初始化 writeLock
writeLock = new WriteLock(dir,new LockListener(){
@Override
public void lockAcquired() {synchronized(mutex){mutex.notify();
@Override
public void lockReleased() {
},schema);
if(writeLock != null writeLock.zk != null){writeLock.zk.addExitListener(exitListener);
synchronized(mutex){mutex.notify();
public DistributedReentrantLock(String dir,String schema) {
this.dir = dir;
this.schema = schema;
initWriteLock();
}
public void lock(long timeout) throws InterruptedException, KeeperException { reentrantLock.lock();// 多线程竞争时,先拿到第一层锁
try{ boolean res = writeLock.trylock();
if(!res){ synchronized(mutex){mutex.wait(timeout);
if(writeLock == null || !writeLock.isOwner()){
throw new InterruptedException( 锁超时
}
}
}catch(InterruptedException e){ reentrantLock.unlock();
throw e;
}catch(KeeperException e){ reentrantLock.unlock();
throw e;
}
}
public void lock() throws InterruptedException, KeeperException { lock(timeout);
}
public void destroy() throws KeeperException { writeLock.unlock();
}
public boolean unlock(){ if(!isOwner()) return false;
try{ writeLock.unlock();
reentrantLock.unlock();// 多线程竞争时,释放最外层锁
}catch(RuntimeException e){ reentrantLock.unlock();// 多线程竞争时,释放最外层锁
throw e;
}
return reentrantLock.hasQueuedThreads();
}
public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() writeLock.isOwner();
}
public void clear() {writeLock.clear();
}
/**
*WriteLock.java
* 基于 zk 的锁实现
* 一个最简单的场景如下:*1. 结点 A 请求加锁,在特定路径下注册自己(会话自增结点 ),得到一个 ID 号 1
*2. 结点 B 请求加锁,在特定路径下注册自己(会话自增结点 ),得到一个 ID 号 2
*3. 结点 A 获取所有结点 ID,判断出来自己是最小结点号,于是获得锁
*4. 结点 B 获取所有结点 ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点 A)变更事件
*5. 结点 A 拿到锁,处理业务,处理完,释放锁(删除自己)*6. 结点 B 收到结点 A 变更事件,判断出来自己已经是最小结点号,于是获得锁。public class WriteLock extends ZkPrimative { private static final Logger LOG = Logger.getLogger(WriteLock.class);
private final String dir;
private String id;
private LockNode idName;
private String ownerId;
private String lastChildId;
private byte[] data = {0x12, 0x34};
private LockListener callback;
public WriteLock(String dir,String schema) { super(schema,true);
this.dir = dir;
}
public WriteLock(String dir,LockListener callback,String schema) { this(dir,schema);
this.callback = callback;
}
public LockListener getLockListener() {
return this.callback;
}
public void setLockListener(LockListener callback) {
this.callback = callback;
}
public synchronized void unlock() throws RuntimeException { if(zk == null || zk.isClosed()){
return;
}
if (id != null) {
try { zk.delete(id, -1);
} catch (InterruptedException e) { LOG.warn( Caught: + e, e);
//set that we have been interrupted.
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (KeeperException e) { LOG.warn( Caught: + e, e);
throw (RuntimeException) new RuntimeException(e.getMessage()).
initCause(e);
}finally { if (callback != null) { callback.lockReleased();
}
id = null;
}
}
}
private class LockWatcher implements Watcher { public void process(WatchedEvent event) { LOG.debug( Watcher fired on path: + event.getPath() + state: +
event.getState() + type + event.getType());
try { trylock();
} catch (Exception e) { LOG.warn( Failed to acquire lock: + e, e);
}
}
}
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
throws KeeperException, InterruptedException { List String names = zookeeper.getChildren(dir, false);
for (String name : names) { if (name.startsWith(prefix)) {
id = dir + / + name;
if (LOG.isDebugEnabled()) { LOG.debug( Found id created last time: + id);
}
break;
}
}
if (id == null) {
id = zookeeper.create(dir + / + prefix, data,
acl, EPHEMERAL_SEQUENTIAL);
if (LOG.isDebugEnabled()) { LOG.debug( Created id: + id);
}
}
}
public void clear() {if(zk == null || zk.isClosed()){
return;
}
try {zk.delete(dir, -1);
} catch (Exception e) { LOG.error( clear error: + e,e);
}
public synchronized boolean trylock() throws KeeperException, InterruptedException { if(zk == null){
LOG.info( zk 是空
return false;
}
if (zk.isClosed()) {
LOG.info( zk 已经关闭
return false;
}
ensurePathExists(dir);
LOG.debug(id: +id);
do { if (id == null) { long sessionId = zk.getSessionId();
String prefix = x- + sessionId + -
idName = new LockNode(id);
LOG.debug(idName: +idName);
}
if (id != null) { List String names = zk.getChildren(dir, false);
if (names.isEmpty()) {
LOG.warn( No children in: + dir + when we ve just +
created one! Lets recreate it...
id = null;
} else {
SortedSet LockNode sortedNames = new TreeSet LockNode
for (String name : names) { sortedNames.add(new LockNode(dir + / + name));
}
ownerId = sortedNames.first().getName();
LOG.debug(all: +sortedNames);
SortedSet LockNode lessThanMe = sortedNames.headSet(idName);
LOG.debug(less than me: +lessThanMe);
if (!lessThanMe.isEmpty()) { LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
if (LOG.isDebugEnabled()) { LOG.debug( watching less than me node: + lastChildId);
}
Stat stat = zk.exists(lastChildId, new LockWatcher());
if (stat != null) {
return Boolean.FALSE;
} else {
LOG.warn( Could not find the +
stats for less than me: + lastChildName.getName());
}
} else { if (isOwner()) { if (callback != null) { callback.lockAcquired();
}
return Boolean.TRUE;
}
}
}
}
}
while (id == null);
return Boolean.FALSE;
}
public String getDir() {
return dir;
}
public boolean isOwner() { return id != null ownerId != null id.equals(ownerId);
}
public String getId() {
return this.id;
}
}
使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS 大概为 2000-3000,性能比较一般
“zookeeper 分布式锁实现的方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!