重慶分公司,新征程啟航
為企業提供網站建設、域名注冊、服務器等服務
為企業提供網站建設、域名注冊、服務器等服務
本篇內容介紹了“Storm MongoDB接口怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
成都創新互聯-專業網站定制、快速模板網站建設、高性價比廣德網站開發、企業建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式廣德網站制作公司更省心,省錢,快速模板網站建設找我們,業務覆蓋廣德地區。費用合理售后完善,10多年實體公司更值得信賴。
整體的Storn接口分為以下的幾個class
1:MongoBolt.java
2 : MongoSpout.java
3 : MongoTailableCursorTopology.java
4 : SimpleMongoBolt.java
看代碼說話:
1
package storm.mongo; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import com.mongodb.DB; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; import com.mongodb.WriteConcern; /** * * 注意在這里,沒有實現批處理的調用,并且只是一個抽象類,對于Mongo的Storm交互做了一次封裝 * * @author Adrian Petrescu* */ public abstract class MongoBolt extends BaseRichBolt { private OutputCollector collector; // MOngDB的DB對象 private DB mongoDB; //記錄我們的主機,端口,和MongoDB的數據DB民粹 private final String mongoHost; private final int mongoPort; private final String mongoDbName; /** * @param mongoHost The host on which Mongo is running. * @param mongoPort The port on which Mongo is running. * @param mongoDbName The Mongo database containing all collections being * written to. */ protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; } @Override public void prepare( @SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { //prepare方法目前在初始化的過程之中得到了一個Mongo的連接 this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void execute(Tuple input) { //注意我們在這里還有一個判斷,判斷當前是否該發射 if (shouldActOnInput(input)) { String collectionName = getMongoCollectionForInput(input); DBObject dbObject = getDBObjectForInput(input); if (dbObject != null) { try { mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1)); collector.ack(input); } catch (MongoException me) { collector.fail(input); } } } else { collector.ack(input); } } /** * Decide whether or not this input tuple should trigger a Mongo write. * * @param input the input tuple under consideration * @return {@code true} iff this input tuple should trigger a Mongo write */ public abstract boolean shouldActOnInput(Tuple input); /** * Returns the Mongo collection which the input tuple should be written to. * * @param input the input tuple under consideration * @return the Mongo collection which the input tuple should be written to */ public abstract String getMongoCollectionForInput(Tuple input); /** * Returns the DBObject to store in Mongo for the specified input tuple. * 拿到DBObject的一個抽象類 * @param input the input tuple under consideration * @return the DBObject to be written to Mongo */ public abstract DBObject getDBObjectForInput(Tuple input); //注意這里隨著計算的終結被關閉了。 @Override public void cleanup() { this.mongoDB.getMongo().close(); } }
2 :
package storm.mongo; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.utils.Utils; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; import com.mongodb.DB; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; /** * A Spout which consumes documents from a Mongodb tailable cursor. * * Subclasses should simply override two methods: *
* WARNING: You can only use tailable cursors on capped collections.
*
* @author Dan Beaulieu