Storm MongoDB接口怎么使用

67次阅读
没有评论

共计 6644 个字符,预计需要花费 17 分钟才能阅读完成。

本篇内容介绍了“Storm MongoDB 接口怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

整体的 Storn 接口分为以下的几个 class

1:MongoBolt.java

2 : MongoSpout.java

3 : MongoTailableCursorTopology.java

4 : SimpleMongoBolt.java

看代码说话:

package storm.mongo;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
 *
 *  注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于 Mongo 的 Storm 交互做了一次封装
 *
 * @author Adrian Petrescu  apetresc@gmail.com 
 *
 */
public abstract class MongoBolt extends BaseRichBolt {
 private OutputCollector collector;
 // MOngDB 的 DB 对象
 private DB mongoDB;

 // 记录我们的主机,端口,和 MongoDB 的数据 DB 民粹 private final String mongoHost; private final int mongoPort; private final String mongoDbName;  * @param mongoHost The host on which Mongo is running.  * @param mongoPort The port on which Mongo is running.  * @param mongoDbName The Mongo database containing all collections being  * written to.  */ protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; @Override public void prepare(@SuppressWarnings( rawtypes) Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try {  //prepare 方法目前在初始化的过程之中得到了一个 Mongo 的连接 this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName); } catch (Exception e) {throw new RuntimeException(e); @Override public void execute(Tuple input) {    // 注意我们在这里还有一个判断,判断当前是否该发射 if (shouldActOnInput(input)) {String collectionName = getMongoCollectionForInput(input); DBObject dbObject = getDBObjectForInput(input); if (dbObject != null) { try {mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1)); collector.ack(input); } catch (MongoException me) {collector.fail(input); } else {collector.ack(input);  * Decide whether or not this input tuple should trigger a Mongo write.  *  * @param input the input tuple under consideration  * @return {@code true} iff this input tuple should trigger a Mongo write  */ public abstract boolean shouldActOnInput(Tuple input);  * Returns the Mongo collection which the input tuple should be written to.  *  * @param input the input tuple under consideration  * @return the Mongo collection which the input tuple should be written to  */ public abstract String getMongoCollectionForInput(Tuple input);  * Returns the DBObject to store in Mongo for the specified input tuple.  *      拿到 DBObject 的一个抽象类      * @param input the input tuple under consideration  * @return the DBObject to be written to Mongo  */ public abstract DBObject getDBObjectForInput(Tuple input);
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.utils.Utils; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; import com.mongodb.DB; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; * A Spout which consumes documents from a Mongodb tailable cursor. * Subclasses should simply override two methods: *  ul *  li {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields} *  li {@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns * a Mongo document into a Storm tuple matching the declared output fields. *  /ul **  p *  b WARNING: /b  You can only use tailable cursors on capped collections. *  * @author Dan Beaulieu  danjacob.beaulieu@gmail.com
//  在这里,抽象的过程中,依旧保持了第一层的 Spout 为一个抽象类,MongoSpout 为 abstract 的一个抽象类,子类在继承这 //  个类的过程之中实现特定的方法即可 //  这里还有一个类似 Cursor 的操作。
private LinkedBlockingQueue DBObject  queue; private final AtomicBoolean opened = new AtomicBoolean(false); private DB mongoDB; private final DBObject query; private final String mongoHost; private final int mongoPort; private final String mongoDbName; private final String mongoCollectionName;
public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; this.mongoCollectionName = mongoCollectionName; this.query = query; class TailableCursorThread extends Thread {
// 注意在其中我们使用了 LinkedBlockingQueue 的对象,有关 java 高并发的集合类,请参考本 ID 的【Java 集合类型的博文】博文。LinkedBlockingQueue DBObject  queue; String mongoCollectionName; DB mongoDB; DBObject query; public TailableCursorThread(LinkedBlockingQueue DBObject  queue, DB mongoDB, String mongoCollectionName, DBObject query) { this.queue = queue; this.mongoDB = mongoDB; this.mongoCollectionName = mongoCollectionName; this.query = query; public void run() {while(opened.get()) { try { // create the cursor mongoDB.requestStart(); final DBCursor cursor = mongoDB.getCollection(mongoCollectionName) .find(query) .sort(new BasicDBObject( $natural , 1)) .addOption(Bytes.QUERYOPTION_TAILABLE) .addOption(Bytes.QUERYOPTION_AWAITDATA); try {while (opened.get()   cursor.hasNext()) { final DBObject doc = cursor.next();  if (doc == null) break;  queue.put(doc);  } } finally { try {  if (cursor != null) cursor.close(); } catch (final Throwable t) { }  try {    mongoDB.requestDone();    } catch (final Throwable t) { }  } Utils.sleep(500); } catch (final MongoException.CursorNotFound cnf) { // rethrow only if something went wrong while we expect the cursor to be open.  if (opened.get()) {   throw cnf;  }  } catch (InterruptedException e) { break; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.queue = new LinkedBlockingQueue DBObject (1000); try {this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName); } catch (Exception e) {throw new RuntimeException(e); TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query); this.opened.set(true); listener.start(); @Override public void close() {this.opened.set(false); @Override public void nextTuple() {DBObject dbo = this.queue.poll(); if(dbo == null) { Utils.sleep(50);  } else { this.collector.emit(dbObjectToStormTuple(dbo));  } @Override public void ack(Object msgId) { // TODO Auto-generated method stub @Override public void fail(Object msgId) { // TODO Auto-generated method stub public abstract List Object  dbObjectToStormTuple(DBObject message); }

“Storm MongoDB 接口怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6644字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)