共计 4536 个字符,预计需要花费 12 分钟才能阅读完成。
本篇内容介绍了“Transactional topology 怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
你可以通过使用 TransactionalTopologyBuilder 来创建 transactional topology. 下面就是一个 transactional topology 的定义,它的作用是计算输入流里面的 tuple 的个数。这段代码来自 storm-starter 里面的 TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields( word), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(global-count , spout , spout, 3);
builder.setBolt(partial-count , new BatchCount(), 5)
.shuffleGrouping(spout
builder.setBolt(sum , new UpdateGlobalCount())
.globalGrouping(partial-count
TransactionalTopologyBuilder 构造器中接受如下的参数:
一个 transaction topology 的 id
spout 在整个 topology 里面的 id。
一个 transactional spout。
一个可选的这个 transactional spout 的并行度。
topology 的 id 是用来在 zookeeper 里面保存这个 topology 的当前进度状态的,所以如果你重启这个 topology,它可以接着前面的进度继续执行。
一个 transaction topology 里面有一个唯一的 TransactionalSpout, 这个 spout 是通过 TransactionalTopologyBuilder 的构造函数来指定的。在这个例子里面,MemoryTransactionalSpout 被用来从一个内存变量里面读取数据 (DATA)。第二个参数指定 spout 发送的 tuple 的字段,第三个参数指定每个 batch 的最大 tuple 数量。关于如何自定义 TransactionalSpout 我们会在后面介绍。
现在说说 bolts。这个 topology 并行地计算 tuple 的总数量。第一个 bolt:BatchBolt,随机地把输入 tuple 分给各个 task,然后各个 task 各自统计局部数量。第二个 bolt:UpdateGlobalCount, 用全局 grouping 来汇总这个 batch 中 tuple 的数量,然后再更新到数据库里面的全局数量。
下面是 BatchCount 的定义:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( id , count));
}
}
storm 会为每个正在处理的 batch 创建一个 BatchCount 对象,这个 BatchCount 是运行在 BatchBoltExecutor 里面的。而 BatchBoltExecutor 负责创建以及清理这个对象的实例。
BatchCount 对象的 prepare 方法接收如下参数:
Storm config
Topology context
Output collector
这个 batch 的 id (txid),在 Transactional Topology 中,这个 id 则是一个 TransactionAttempt 对象。
这个 batch bolt 的抽象在 DRPC 里面也可以用,只是 txid 的类型不一样而已。实际上,BatchBolt 可以接收一个 txid 类型的参数,所以如果你只是想在 transactioinal topology 里面使用这个 BatchBolt,你可以去继承 BaseTransactionalBolt 类,如下定义:
public abstract class BaseTransactionalBolt extends BaseBatchBolt TransactionAttempt {
}
在 transaction topology 里面发射的所有的 tuple 都必须以 TransactionAttempt 作为第一个 field,然后 storm 可以根据这个 field 来判断哪些 tuple 属于一个 batch。所以你在发射 tuple 的时候需要满足这个条件。
TransactionAttempt 包含两个值:一个 transaction id,一个 attempt id。transaction id 的作用就是我们上面介绍的对于每个 batch 是唯一的,而且不管这个 batch 被 replay 多少次都是一样的。attempt id 是对于每个 batch 唯一的一个 id,但是对于同一个 batch,它 replay 之后的 attempt id 跟 replay 之前就不一样了,我们可以把 attempt id 理解成 replay-times, storm 利用这个 id 来区别一个 batch 发射的 tuple 的不同版本。
transaction id 对于每个 batch 加一,所以第一个 batch 的 transaction id 是”1″, 第二个 batch 是”2″,依次类推。
每收到一个 batch 中的 tuple,execute 方法便被调用一次。每次当该方法被调用时,你应该把这个 batch 里面的状态保持在一个本地变量里面。对于这个例子来说,它在 execute 方法里面递增 tuple 的个数。
最后,当这个 bolt 接收到某个 batch 的所有的 tuple 之后,finishBatch 方法会被调用。这个例子里面的 BatchCount 类会在这个时候发射它的局部数量到它的输出流里面去。
下面是 UpdateGlobalCount 类的定义:
public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( id , sum));
}
}
UpdateGlobalCount 是 Transactional Topologies 相关的类,所以它继承自 BaseTransactionalBolt。在 execute 方法里面,UpdateGlobalCount 累积这个 batch 的计数,比较有趣的是 finishBatch 方法。
首先,注意这个 bolt 实现了 ICommitter 接口,这告诉 storm 要在这个事务的 commit 阶段调用 finishBatch 方法,所以对于 finishBatch 的调用会保证强顺序性(顺序就是 transaction id 的升序 ),另一方面 execute 方法在 processing 或者 commit 阶段都可以执行。另外一种把 bolt 标识为 commiter 的方法是调用 TransactionalTopologyBuilder 的 setCommiterBolt 来添加 Bolt(而不是 setBolt)。
UpdateGlobalCount 里面 finishBatch 方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的 transaction id 与当前这个 batch 的 transaction id 进行比较。如果他们一样,那么忽略这个 batch。否则把这个 batch 的结果加到总结果里面去,并且更新数据库。
“Transactional topology 怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!