共计 2678 个字符,预计需要花费 7 分钟才能阅读完成。
这篇文章主要介绍“IBatchSpout API 怎么使用”,在日常操作中,相信很多人在 IBatchSpout API 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”IBatchSpout API 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
IBatchSpout 是 storm trident 推出的一种可以批量发射的 Spout。非事务性,基本的 spout
1:Map getComponentConfiguration(); 定义配置,可以用 backtype.storm.Config。
2:void open(Map conf, TopologyContext context); Spout 的初始化方法,参数 conf 即是 getComponentConfiguration 定义的配置
3:Fields getOutputFields(); 声明输出的 fields
4:void emitBatch(long batchId, TridentCollector collector); 批量发射 tuple,本次的批次号为 batchId
5:void ack(long batchId); 批次号为 batchId 的数据处理成功
6: void close();
一个例子
package storm.projectA;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IBatchSpout{
/**
*
*/
private static final long serialVersionUID = 1L;
private long maxBatchSize;// 每批次最大的数量
private BufferedReader br;// 源文件流
HashMap Long, List List Object batches = new HashMap Long, List List Object // 保存发送过的所有数据,以便于重复发送
/**
* @param conf 配置
* @param context
*/
@Override
public void open(Map conf, TopologyContext context) { String filePath = (String)conf.get( filePath
maxBatchSize = (Long)conf.get( maxBatchSize
try { br = new BufferedReader(new FileReader(filePath));
} catch (FileNotFoundException e) { e.printStackTrace();
}
}
/*** spout 的发送方法
* @param batchId 批次 id
* @param collector 批量发射器
*/
@Override
public void emitBatch(long batchId, TridentCollector collector) { List List Object batch = batches.get(batchId);
if (batch == null) { batch = new ArrayList List Object ();
for (int i = 0; i maxBatchSize; i++) {
try { String line = br.readLine();
if(line == null){
break;
}
batch.add(new Values(line));
} catch (IOException e) { e.printStackTrace();
}
}
}
for(List Object list : batch){ collector.emit(list);
}
}
@Override
public void ack(long batchId) { batches.remove(batchId);
}
/**
* close 方法
*/
@Override
public void close() { if(br!=null){
try { br.close();
} catch (IOException e) { e.printStackTrace();
}
}
}
@Override
public Map getComponentConfiguration() { Config conf = new Config();
// 最大并行度 本地模式设置为 1
conf.setMaxTaskParallelism(1);
conf.put( filePath , D:\\aaa.txt
conf.put(maxBatchSize , 2);
return conf;
}
/**
* 输出的 fileds
*/
@Override
public Fields getOutputFields() {
return new Fields( sentence
}
}
到此,关于“IBatchSpout API 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!