老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業提供網站建設、域名注冊、服務器等服務

StormMongoDB接口怎么使用

本篇內容介紹了“Storm MongoDB接口怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

成都創新互聯-專業網站定制、快速模板網站建設、高性價比廣德網站開發、企業建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式廣德網站制作公司更省心,省錢,快速模板網站建設找我們,業務覆蓋廣德地區。費用合理售后完善,10多年實體公司更值得信賴。

整體的Storn接口分為以下的幾個class

1:MongoBolt.java

2 : MongoSpout.java

3 : MongoTailableCursorTopology.java

4 : SimpleMongoBolt.java

看代碼說話:

package storm.mongo;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;

/**
 *
 * 注意在這里,沒有實現批處理的調用,并且只是一個抽象類,對于Mongo的Storm交互做了一次封裝
 *
 * @author Adrian Petrescu 
 *
 */
public abstract class MongoBolt extends BaseRichBolt {
	private OutputCollector collector;
	
	// MOngDB的DB對象
	private DB mongoDB;
	
	
        //記錄我們的主機,端口,和MongoDB的數據DB民粹
	private final String mongoHost;
	private final int mongoPort;
	private final String mongoDbName;

	/**
	 * @param mongoHost The host on which Mongo is running.
	 * @param mongoPort The port on which Mongo is running.
	 * @param mongoDbName The Mongo database containing all collections being
	 * written to.
	 */
	protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) {
		this.mongoHost = mongoHost;
		this.mongoPort = mongoPort;
		this.mongoDbName = mongoDbName;
	}
	
	@Override
	public void prepare(
			@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		
		this.collector = collector;
		try {
		
		        //prepare方法目前在初始化的過程之中得到了一個Mongo的連接
			this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void execute(Tuple input) {
	
	    
	        //注意我們在這里還有一個判斷,判斷當前是否該發射
	
		if (shouldActOnInput(input)) {
			String collectionName = getMongoCollectionForInput(input);
			DBObject dbObject = getDBObjectForInput(input);
			if (dbObject != null) {
				try {
					mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1));
					collector.ack(input);
				} catch (MongoException me) {
					collector.fail(input);
				}
			}
		} else {
			collector.ack(input);
		}
	}

	/**
	 * Decide whether or not this input tuple should trigger a Mongo write.
	 *
	 * @param input the input tuple under consideration
	 * @return {@code true} iff this input tuple should trigger a Mongo write
	 */
	public abstract boolean shouldActOnInput(Tuple input);
	
	/**
	 * Returns the Mongo collection which the input tuple should be written to.
	 *
	 * @param input the input tuple under consideration
	 * @return the Mongo collection which the input tuple should be written to
	 */
	public abstract String getMongoCollectionForInput(Tuple input);
	
	/**
	 * Returns the DBObject to store in Mongo for the specified input tuple.
	 * 
	 
	 拿到DBObject的一個抽象類
	 
	 
	 * @param input the input tuple under consideration
	 * @return the DBObject to be written to Mongo
	 */
	public abstract DBObject getDBObjectForInput(Tuple input);
	
	
	//注意這里隨著計算的終結被關閉了。
	@Override
	public void cleanup() {
		this.mongoDB.getMongo().close();
	}

}

2 :

package storm.mongo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;

import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;

/**
* A Spout which consumes documents from a Mongodb tailable cursor.
*
* Subclasses should simply override two methods:
* 
  • {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields} * 
  • {@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns * a Mongo document into a Storm tuple matching the declared output fields. * 
* ** 

WARNING: You can only use tailable cursors on capped collections. *  * @author Dan Beaulieu  * */ // 在這里,抽象的過程中,依舊保持了第一層的Spout為一個抽象類,MongoSpout為abstract的一個抽象類,子類在繼承這// 個類的過程之中實現特定的方法即可 // 這里還有一個類似Cursor的操作。 public abstract class MongoSpout extends BaseRichSpout { private SpoutOutputCollector collector; private LinkedBlockingQueue queue; private final AtomicBoolean opened = new AtomicBoolean(false); private DB mongoDB; private final DBObject query; private final String mongoHost; private final int mongoPort; private final String mongoDbName; private final String mongoCollectionName; public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; this.mongoCollectionName = mongoCollectionName; this.query = query; } class TailableCursorThread extends Thread { // 內部類 TailableCursorThread線程 //注意在其中我們使用了LinkedBlockingQueue的對象,有關java高并發的集合類,請參考本ID的【Java集合類型的博文】博文。 LinkedBlockingQueue queue; String mongoCollectionName; DB mongoDB; DBObject query; public TailableCursorThread(LinkedBlockingQueue queue, DB mongoDB, String mongoCollectionName, DBObject query) { this.queue = queue; this.mongoDB = mongoDB; this.mongoCollectionName = mongoCollectionName; this.query = query; } public void run() { while(opened.get()) { try { // create the cursor mongoDB.requestStart(); final DBCursor cursor = mongoDB.getCollection(mongoCollectionName) .find(query) .sort(new BasicDBObject("$natural", 1)) .addOption(Bytes.QUERYOPTION_TAILABLE) .addOption(Bytes.QUERYOPTION_AWAITDATA); try { while (opened.get() && cursor.hasNext()) {                     final DBObject doc = cursor.next();                     if (doc == null) break;                     queue.put(doc);                 } } finally { try {  if (cursor != null) cursor.close();  } catch (final Throwable t) { }                     try {                       mongoDB.requestDone();                       } catch (final Throwable t) { }                 } Utils.sleep(500); } catch (final MongoException.CursorNotFound cnf) { // rethrow only if something went wrong while we expect the cursor to be open.                     if (opened.get()) {                      throw cnf;                     }                 } catch (InterruptedException e) { break; } } }; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.queue = new LinkedBlockingQueue(1000); try { this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query); this.opened.set(true); listener.start(); } @Override public void close() { this.opened.set(false); } @Override public void nextTuple() { DBObject dbo = this.queue.poll(); if(dbo == null) {             Utils.sleep(50);         } else {             this.collector.emit(dbObjectToStormTuple(dbo));         } } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } public abstract List dbObjectToStormTuple(DBObject message); }

“Storm MongoDB接口怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注創新互聯網站,小編將為大家輸出更多高質量的實用文章!


新聞標題:StormMongoDB接口怎么使用
文章路徑:http://www.xueling.net.cn/article/gphccg.html 主站蜘蛛池模板: 欧美熟妇与小伙性欧美交 | 呻吟翘臀后进爆白浆 | 亚洲美女视频一区二区 | 91九色视频网站 | 高潮喷水在线观看免费 | 国产精品免费久久影 | a级黄色片免费观看 | 伊人wwwyiren22| av毛片| 天堂网www天堂网最新版 | 40集全部免费观看 | 欧美成人免费草草影院视频 | 97免费人做人爱在线看视频 | 国产大片免费看 | 男男高肉H视频无码网址 | 成人性生交大片免费看中文带字幕 | 无码H黄肉动漫在线观看 | 99国产精品久久久久久久久久 | 在线a亚洲v天堂网2018 | 桃子视频在线观看免费视频网 | 久久乱码卡一卡2卡三卡四 亚洲影视在线观看 | 亚州中文 | 二区精品视频 | 国产www| 国产在线榴莲视频导航 | 8av国产精品爽爽va在线观看 | 无码欧精品亜州日韩一区夜夜嗨 | 天天做天天爱夜夜爽导航 | 中文字幕在线精品不卡 | 国产精在线 | 丁香五月网久久综合 | 免费A级毛片中文字幕 | 亚洲国产av无码精品 | 激情欧美成人久久综合 | 2018av天堂在线视频精品观看 | 精品国产一区二区三区在线 | 国产精品人妻一区二区网站 | 国产欧美日韩另类在线专区 | 美女亚洲网 | av网站免费在线观看 | 无遮挡裸体免费视频尤物 |