共计 5479 个字符,预计需要花费 14 分钟才能阅读完成。
本篇内容主要讲解“ZooKeeper 同步框架怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“ZooKeeper 同步框架怎么实现”吧!
首先,定义一个同步接口,它有一个 execute 方法,主要负责同步任务的实现。
Path 参数是任务节点 (用户),只有相同的节点才会同步工作。想象一下,去银行取钱,如果每个人都有一个专属的柜台,那效率是明显的。
SynchronousProcessor 参数用来处理具体的业务。
Synchronous.java
package org.bigmouth.nvwa.zookeeper.concurrent;
* 同步,支持分布式
*
* @author Allen Hu
* 2015-4-17
*/
public interface Synchronous {
/**
* 同步执行,根据 path 标识来区分同步工作。不同的 path 将不会同步进行。 *
* @param 处理结果类型
* @param path 任务节点
* e.g. /project/synchronous/0000001
* @param processor 业务处理器
* @return 处理结果
*/T execute(String path, SynchronousProcessorprocessor);
}
MutexLockSynchronous.java
Synchronous 的实现类,基于普通排它锁的方式实现。
package org.bigmouth.nvwa.zookeeper.concurrent;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.common.PathUtils;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
* 基于普通排他锁的方式实现同步
*
* @author Allen Hu
* 2015-4-17
*/
public class MutexLockSynchronous implements Synchronous {
private final ZkClientHolder zkClientHolder;
public MutexLockSynchronous(ZkClientHolder zkClientHolder) {
this.zkClientHolder = zkClientHolder;
}
@Override
publicT execute(String path, SynchronousProcessorprocessor) { PathUtils.validatePath(path);
InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path);
try { lock.acquire();
if (null != processor)
return processor.process();
}
catch (Exception e) { if (null != processor)
processor.exceptionCaught(e);
}
finally {
try { lock.release();
}
catch (Exception e) { }
}
return null;
}
}
SynchronousProcessor.java
任务处理器接口,实现它来完成具体的业务工作
package org.bigmouth.nvwa.zookeeper.concurrent;
* 同步业务处理器
*
* @author Allen Hu
* 2015-4-17
*/
public interface SynchronousProcessor{
/**
* 处理具体的业务
*
* @return
*/
T process();
/**
* 异常捕获
*
* @param throwable
*/
void exceptionCaught(Throwable throwable);
}
ZkClientHolder.java
当然少不了这个了,继承的父类可以不需要了解,就是定义了两个抽象方法:doInit 和 doDestroy 方法。
package org.bigmouth.nvwa.zookeeper;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
* ZooKeeper client holder
*
* @author Allen Hu
* 2015-4-16
*/
public class ZkClientHolder extends BaseLifeCycleSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class);
public static final int MAX_RETRIES = 3;
public static final int BASE_SLEEP_TIMEMS = 3000;
private CuratorFramework zkClient;
private final String connectString;
private final int sessionTimeout;
public ZkClientHolder(String connectString, int sessionTimeout) { Preconditions.checkArgument(StringUtils.isNotBlank(connectString), connectString cannot be blank
Preconditions.checkArgument(sessionTimeout = 10000, sessionTimeout must be greater than 10000
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
}
public CuratorFramework get() {
return zkClient;
}
@Override
protected void doInit() { zkClient = CuratorFrameworkFactory.builder()
.sessionTimeoutMs(sessionTimeout)
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES))
.build();
zkClient.start();
if (LOGGER.isInfoEnabled()) { LOGGER.info( Connected to ZooKepper server: {} , connectString);
}
}
@Override
protected void doDestroy() { if (null != zkClient)
zkClient.close();
}
}
最后来个测试类,模拟多个用户多线程处理任务的过程,我们达到了相同用户间同步的目的。
package org.bigmouth.nvwa.zookeeper.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.utils.ZKPaths;
import org.bigmouth.nvwa.zookeeper.ZkClientHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Allen Hu
* 2015-4-17
*/
public class ConcurrentTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class);
private ZkClientHolder zkClientHolder = new ZkClientHolder(172.16.3.24:2181 , 60000);
private Synchronous synchronous;
public ConcurrentTest() { zkClientHolder.init();
synchronous = new MutexLockSynchronous(zkClientHolder);
}
public class Service implements Runnable {
private final String id;
private final long sleepInMillis;
public Service(String id, long sleepInMillis) {
this.id = id;
this.sleepInMillis = sleepInMillis;
}
@Override
public void run() { synchronous.execute(ZKPaths.makePath( /nvwa/zookeeper/concurrent , id), new SynchronousProcessor() {
@Override
public String process() {
LOGGER.info(id + star...!
try { Thread.sleep(sleepInMillis);
}
catch (InterruptedException e) { e.printStackTrace();
}
LOGGER.info(id + has execution!
return id;
}
@Override
public void exceptionCaught(Throwable throwable) { throwable.printStackTrace();
}
});
}
}
static ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String[] args) { ConcurrentTest ct = new ConcurrentTest();
executor.submit(ct.new Service( 1 , 5000)); // 1 号 处理 5 秒
executor.submit(ct.new Service( 1 , 2000)); // 1 号 处理 2 秒
executor.submit(ct.new Service( 2 , 5000)); // 2 号 处理 5 秒
executor.submit(ct.new Service( 3 , 10000)); // 3 号 处理 10 秒
executor.submit(ct.new Service( 3 , 500)); // 3 号 处理 0.5 秒
}
}
输出结果,1、2、3 任务并行,而相同的任务串行。如:第二个 1 号等第一个 1 号执行完才开始。
到此,相信大家对“ZooKeeper 同步框架怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!