老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業提供網站建設、域名注冊、服務器等服務

SparkStreaming與Kafka整合遇到的問題及解決方案是什么

今天就跟大家聊聊有關SparkStreaming與Kafka整合遇到的問題及解決方案是什么,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

創新互聯是一家專業提供雨花企業網站建設,專注與網站建設、成都網站建設H5技術、小程序制作等業務。10年已為雨花眾多企業、政府機構等服務。創新互聯專業網站制作公司優惠進行中。

前言

最近工作中是做日志分析的平臺,采用了sparkstreaming+kafka,采用kafka主要是看中了它對大數據量處理的高性能,處理日志類應用再好不過了,采用了sparkstreaming的流處理框架  主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務,并且可以提供準實時服務到elasticsearch中,可以實現準實時定位系統日志。

實現

Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式。

一. 基于Receiver方式

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer  API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark  Streaming啟動的job會去處理那些數據。代碼如下:

SparkConf sparkConf = new SparkConf().setAppName("log-etl").setMaster("local[4]");     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));     int numThreads = Integer.parseInt("4");     Map topicMap = new HashMap();     topicMap.put("group-45", numThreads);      //接收的參數分別是JavaStreamingConetxt,zookeeper連接地址,groupId,kafak的topic      JavaPairReceiverInputDStream messages =     KafkaUtils.createStream(jssc, "172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181", "1", topicMap);

剛開始的時候系統正常運行,沒有發現問題,但是如果系統異常重新啟動sparkstreaming程序后,發現程序會重復處理已經處理過的數據,這種基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。官方現在也已經不推薦這種整合方式,官網相關地址  http://spark.apache.org/docs/latest/streaming-kafka-integration.html  ,下面我們使用官網推薦的第二種方式kafkaUtils的createDirectStream()方式。

二.基于Direct的方式

這種新的不基于Receiver的直接方式,是在Spark  1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的***的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer  api來獲取Kafka指定offset范圍的數據。

代碼如下:

SparkConf sparkConf = new SparkConf().setAppName("log-etl"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));  HashSet topicsSet = new HashSet(Arrays.asList(topics.split(","))); HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream messages = KafkaUtils.createDirectStream(     jssc,     String.class,     String.class,     StringDecoder.class,     StringDecoder.class,     kafkaParams,     topicsSet );

這種direct方式的優點如下:

1.簡化并行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對它們進行union操作。Spark會創建跟Kafka  partition一樣多的RDD partition,并且會并行從Kafka中讀取數據。所以在Kafka partition和RDD  partition之間,有一個一對一的映射關系。

2.一次且僅一次的事務機制:基于receiver的方式,在spark和zk中通信,很有可能導致數據的不一致。

3.高效率:在receiver的情況下,如果要保證數據的不丟失,需要開啟wal機制,這種方式下,為、數據實際上被復制了兩份,一份在kafka自身的副本中,另外一份要復制到wal中,  direct方式下是不需要副本的。

三.基于Direct方式丟失消息的問題

貌似這種方式很***,但是還是有問題的,當業務需要重啟sparkstreaming程序的時候,業務日志依然會打入到kafka中,當job重啟后只能從***的offset開始消費消息,造成重啟過程中的消息丟失。kafka中的offset如下圖(使用kafkaManager實時監控隊列中的消息):

SparkStreaming與Kafka整合遇到的問題及解決方案是什么

當停止業務日志的接受后,先重啟spark程序,但是發現job并沒有將先前打入到kafka中的數據消費掉。這是因為消息沒有經過zk,topic的offset也就沒有保存

四.解決消息丟失的處理方案

一般有兩種方式處理這種問題,可以先spark streaming 保存offset,使用spark  checkpoint機制,第二種是程序中自己實現保存offset邏輯,我比較喜歡第二種方式,以為這種方式可控,所有主動權都在自己手中。

先看下大體流程圖,

SparkStreaming與Kafka整合遇到的問題及解決方案是什么

SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("log-etl");  Set topicSet = new HashSet();         topicSet.add("group-45");         kafkaParam.put("metadata.broker.list", "172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092");         kafkaParam.put("group.id", "simple1");          // transform java Map to scala immutable.map         scala.collection.mutable.Map testMap = JavaConversions.mapAsScalaMap(kafkaParam);         scala.collection.immutable.Map scalaKafkaParam =                 testMap.toMap(new Predef.$less$colon$less, Tuple2>() {                     public Tuple2 apply(Tuple2 v1) {                         return v1;                     }                 });          // init KafkaCluster         kafkaCluster = new KafkaCluster(scalaKafkaParam);          scala.collection.mutable.Set mutableTopics = JavaConversions.asScalaSet(topicSet);         immutableTopics = mutableTopics.toSet();         scala.collection.immutable.Set topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get();          // kafka direct stream 初始化時使用的offset數據         Map consumerOffsetsLong = new HashMap();          // 沒有保存offset時(該group***消費時), 各個partition offset 默認為0         if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) {              System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get());              Set topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2);              for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {                 consumerOffsetsLong.put(topicAndPartition, 0L);             }          }         // offset已存在, 使用保存的offset         else {              scala.collection.immutable.Map consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("simple1", topicAndPartitionSet2).right().get();              Map consumerOffsets = JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp);              Set topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2);              for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {                 Long offset = (Long)consumerOffsets.get(topicAndPartition);                 consumerOffsetsLong.put(topicAndPartition, offset);             }          }          JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));         kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);          // create direct stream         JavaInputDStream message = KafkaUtils.createDirectStream(                 jssc,                 String.class,                 String.class,                 StringDecoder.class,                 StringDecoder.class,                 String.class,                 kafkaParam,                 consumerOffsetsLong,                 new Function, String>() {                     public String call(MessageAndMetadata v1) throws Exception {                         System.out.println("接收到的數據《《==="+v1.message());                         return v1.message();                     }                 }         );          // 得到rdd各個分區對應的offset, 并保存在offsetRanges中         final AtomicReference offsetRanges = new AtomicReference();          JavaDStream javaDStream = message.transform(new Function, JavaRDD>() {             public JavaRDD call(JavaRDD rdd) throws Exception {                 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();                 offsetRanges.set(offsets);                 return rdd;             }         });          // output         javaDStream.foreachRDD(new Function, Void>() {              public Void call(JavaRDD v1) throws Exception {                 if (v1.isEmpty()) return null;                  List list = v1.collect();                 for(String s:list){                     System.out.println("數據==="+s);                 }                  for (OffsetRange o : offsetRanges.get()) {                      // 封裝topic.partition 與 offset對應關系 java Map                     TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());                     Map topicAndPartitionObjectMap = new HashMap();                     topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());                      // 轉換java map to scala immutable.map                     scala.collection.mutable.Map testMap =                             JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap);                     scala.collection.immutable.Map scalatopicAndPartitionObjectMap =                             testMap.toMap(new Predef.$less$colon$less, Tuple2>() {                                 public Tuple2 apply(Tuple2 v1) {                                     return v1;                                 }                             });                      // 更新offset到kafkaCluster                     kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap);                        System.out.println("原數據====》"+o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()                     );                 }                 return null;             }         });          jssc.start();         jssc.awaitTermination();     }

基本使用這種方式就可以解決數據丟失的問題。

看完上述內容,你們對SparkStreaming與Kafka整合遇到的問題及解決方案是什么有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注創新互聯行業資訊頻道,感謝大家的支持。


分享名稱:SparkStreaming與Kafka整合遇到的問題及解決方案是什么
當前網址:http://www.xueling.net.cn/article/jghihc.html

其他資訊

在線咨詢
服務熱線
服務熱線:028-86922220
TOP
主站蜘蛛池模板: 久久九九国产视频 | 亚洲经典一区二区 | 少妇被猛男粗大的猛进出 | 人妻少妇看a片偷人精品视频 | 天天摸天天看 | 成人在线国产 | 亚洲丁香婷婷综合久久 | 6080YYY午夜理论片 | 色网站视频在线观看 | 91视频ww| 麻豆精品久久久久久中文字幕无码 | 成人免费毛片内射美女-百度 | 欧美色图bt | 制服诱惑一区 | 少妇把腿扒开让我舔18 | 你懂得在线观看 | 黄色片大全在线观看 | 精品国产品香蕉在线 | 午夜影视啪啪免费体验区入口 | 国产aⅴ无码久久丝袜美腿 国产麻豆精品一区二区 | 伊人久久狼人 | 另类免费视频 | 成年福利片在线观看 | 亚洲国产精品成人女人久久 | 人妻少妇精品一区二区三区 | 欧美xxxx色视频在线观看免费 | 免费羞羞视频无遮挡噼啪男男 | 久久综合在线观看 | 99视频国产精品免费观看 | V一区无码内射国产 | 亚洲拍宾馆视频播放 | 在线观看一区三区 | 日韩视频一区二区三区在线观看 | 2021av在线| 日韩亚洲欧美中文在线 | 中文字幕在线观看 | 精品久久综合 | 亚洲av无码专区国产乱码4se | 国产精品美女久久久浪潮软件 | 欧美日韩黄色大片 | 亚洲欧美日本在线观看 |