共计 6644 个字符,预计需要花费 17 分钟才能阅读完成。
本篇内容介绍了“Storm MongoDB 接口怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
整体的 Storn 接口分为以下的几个 class
1:MongoBolt.java
2 : MongoSpout.java
3 : MongoTailableCursorTopology.java
4 : SimpleMongoBolt.java
看代码说话:
1
package storm.mongo;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
*
* 注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于 Mongo 的 Storm 交互做了一次封装
*
* @author Adrian Petrescu apetresc@gmail.com
*
*/
public abstract class MongoBolt extends BaseRichBolt {
private OutputCollector collector;
// MOngDB 的 DB 对象
private DB mongoDB;
// 记录我们的主机,端口,和 MongoDB 的数据 DB 民粹
private final String mongoHost;
private final int mongoPort;
private final String mongoDbName;
* @param mongoHost The host on which Mongo is running.
* @param mongoPort The port on which Mongo is running.
* @param mongoDbName The Mongo database containing all collections being
* written to.
*/
protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) {
this.mongoHost = mongoHost;
this.mongoPort = mongoPort;
this.mongoDbName = mongoDbName;
@Override
public void prepare(@SuppressWarnings( rawtypes) Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
//prepare 方法目前在初始化的过程之中得到了一个 Mongo 的连接
this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName);
} catch (Exception e) {throw new RuntimeException(e);
@Override
public void execute(Tuple input) {
// 注意我们在这里还有一个判断,判断当前是否该发射
if (shouldActOnInput(input)) {String collectionName = getMongoCollectionForInput(input);
DBObject dbObject = getDBObjectForInput(input);
if (dbObject != null) {
try {mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1));
collector.ack(input);
} catch (MongoException me) {collector.fail(input);
} else {collector.ack(input);
* Decide whether or not this input tuple should trigger a Mongo write.
*
* @param input the input tuple under consideration
* @return {@code true} iff this input tuple should trigger a Mongo write
*/
public abstract boolean shouldActOnInput(Tuple input);
* Returns the Mongo collection which the input tuple should be written to.
*
* @param input the input tuple under consideration
* @return the Mongo collection which the input tuple should be written to
*/
public abstract String getMongoCollectionForInput(Tuple input);
* Returns the DBObject to store in Mongo for the specified input tuple.
*
拿到 DBObject 的一个抽象类
* @param input the input tuple under consideration
* @return the DBObject to be written to Mongo
*/
public abstract DBObject getDBObjectForInput(Tuple input);
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;
import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
* A Spout which consumes documents from a Mongodb tailable cursor.
* Subclasses should simply override two methods:
* ul
* li {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}
* li {@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns
* a Mongo document into a Storm tuple matching the declared output fields.
* /ul
** p
* b WARNING: /b You can only use tailable cursors on capped collections.
*
* @author Dan Beaulieu danjacob.beaulieu@gmail.com
// 在这里,抽象的过程中,依旧保持了第一层的 Spout 为一个抽象类,MongoSpout 为 abstract 的一个抽象类,子类在继承这 // 个类的过程之中实现特定的方法即可
// 这里还有一个类似 Cursor 的操作。
private LinkedBlockingQueue DBObject queue;
private final AtomicBoolean opened = new AtomicBoolean(false);
private DB mongoDB;
private final DBObject query;
private final String mongoHost;
private final int mongoPort;
private final String mongoDbName;
private final String mongoCollectionName;
public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) {
this.mongoHost = mongoHost;
this.mongoPort = mongoPort;
this.mongoDbName = mongoDbName;
this.mongoCollectionName = mongoCollectionName;
this.query = query;
class TailableCursorThread extends Thread {
// 注意在其中我们使用了 LinkedBlockingQueue 的对象,有关 java 高并发的集合类,请参考本 ID 的【Java 集合类型的博文】博文。LinkedBlockingQueue DBObject queue;
String mongoCollectionName;
DB mongoDB;
DBObject query;
public TailableCursorThread(LinkedBlockingQueue DBObject queue, DB mongoDB, String mongoCollectionName, DBObject query) {
this.queue = queue;
this.mongoDB = mongoDB;
this.mongoCollectionName = mongoCollectionName;
this.query = query;
public void run() {while(opened.get()) {
try {
// create the cursor
mongoDB.requestStart();
final DBCursor cursor = mongoDB.getCollection(mongoCollectionName)
.find(query)
.sort(new BasicDBObject( $natural , 1))
.addOption(Bytes.QUERYOPTION_TAILABLE)
.addOption(Bytes.QUERYOPTION_AWAITDATA);
try {while (opened.get() cursor.hasNext()) { final DBObject doc = cursor.next();
if (doc == null) break;
queue.put(doc);
}
} finally {
try {
if (cursor != null) cursor.close(); } catch (final Throwable t) { }
try {
mongoDB.requestDone();
} catch (final Throwable t) { }
}
Utils.sleep(500);
} catch (final MongoException.CursorNotFound cnf) {
// rethrow only if something went wrong while we expect the cursor to be open.
if (opened.get()) {
throw cnf;
}
} catch (InterruptedException e) { break; }
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.queue = new LinkedBlockingQueue DBObject (1000);
try {this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName);
} catch (Exception e) {throw new RuntimeException(e);
TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query);
this.opened.set(true);
listener.start();
@Override
public void close() {this.opened.set(false);
@Override
public void nextTuple() {DBObject dbo = this.queue.poll();
if(dbo == null) { Utils.sleep(50);
} else { this.collector.emit(dbObjectToStormTuple(dbo));
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
public abstract List Object dbObjectToStormTuple(DBObject message);
}
“Storm MongoDB 接口怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!
正文完