老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業(yè)提供網(wǎng)站建設(shè)、域名注冊、服務(wù)器等服務(wù)

DataStreamReader和DataStreamWriter怎么使用

這篇文章主要介紹“DataStreamReader和DataStreamWriter怎么使用”,在日常操作中,相信很多人在DataStreamReader和DataStreamWriter怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”DataStreamReader和DataStreamWriter怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

按需規(guī)劃網(wǎng)站可以根據(jù)自己的需求進行定制,網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)構(gòu)思過程中功能建設(shè)理應(yīng)排到主要部位公司網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)的運用實際效果公司網(wǎng)站制作網(wǎng)站建立與制做的實際意義

流的讀取是從DataStreamReader和DataStreamWriter開始的。

DataStreamReader

DataStreamReader是生成流讀取者的入口所在,關(guān)鍵方法是load。這段代碼很關(guān)鍵,所以把全部代碼先貼出來,慢慢分析。

def load(): DataFrame = {
    
    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).

      getConstructor().newInstance()
   
    val v1DataSource = DataSource(

      sparkSession,

      userSpecifiedSchema = userSpecifiedSchema,

      className = source,

      options = extraOptions.toMap)

    val v1Relation = ds match {

      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))

      case _ => None

    }

    ds match {

      case provider: TableProvider =>

        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(

          source = provider, conf = sparkSession.sessionState.conf)

        val options = sessionOptions ++ extraOptions

        val dsOptions = new CaseInsensitiveStringMap(options.asJava)

        val table = userSpecifiedSchema match {

          case Some(schema) => provider.getTable(dsOptions, schema)

          case _ => provider.getTable(dsOptions)

        }

        import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._

        table match {

          case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>

            Dataset.ofRows(

              sparkSession,

              StreamingRelationV2(

                provider, source, table, dsOptions, table.schema.toAttributes, v1Relation)(

                sparkSession))

          // fallback to v1

          // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.

          case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))

        }

      case _ =>

        // Code path for data source v1.

        Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))

    }

  }

有好多分支,重要的是區(qū)分開V1和V2。

V1用的邏輯關(guān)系是StreamingRelation;而V2用的邏輯關(guān)系是StreamingRelationV2。這里先看看他們對應(yīng)的物理計劃是什么?

在SparkStrategies.scala文件中,定義了物理計劃:

/**

   * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`.

   * It won't affect the execution, because `StreamingRelation` will be replaced with

   * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will

   * be replaced with the real relation using the `Source` in `StreamExecution`.

   */

object StreamingRelationStrategy extends Strategy {

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case s: StreamingRelation =>

        StreamingRelationExec(s.sourceName, s.output) :: Nil

      case s: StreamingExecutionRelation =>

        StreamingRelationExec(s.toString, s.output) :: Nil

      case s: StreamingRelationV2 =>

        StreamingRelationExec(s.sourceName, s.output) :: Nil

      case _ => Nil

    }

  }

物理計劃都是StreamingRelationExec,StreamingRelationExec的代碼其實啥都沒實現(xiàn),所以最后其實看代碼注釋StreamingRelationExec也不是真正的物理計劃。

這里先記得相關(guān)的類ContinuousExecution和MicroBatchExecution。一時找不到怎么執(zhí)行到具體的物理計劃ContinuousExecution和MicroBatchExecution的,我們就試試反推把。先看看ContinuousExecution的代碼。

StreamExecution

StreamExecution是抽象類。其抽象方法runActivatedStream是執(zhí)行具體的連續(xù)流讀取任務(wù)的,子類會重寫該函數(shù)。

runStream方法封裝了runActivatedStream方法,額外加了些事件通知等處理機制,知道這一點就行了。

StreamingQueryManager

這里先嘗試看看StreamingQueryManager是干什么用的,看注釋應(yīng)該是管理所有的StreamingQuery的。

 private def createQuery(...): StreamingQueryWrapper ={

   (sink, trigger) match {

      case (table: SupportsWrite, trigger: ContinuousTrigger) =>
       

        new StreamingQueryWrapper(new ContinuousExecution(

          sparkSession,

          userSpecifiedName.orNull,

          checkpointLocation,

          analyzedPlan,

          table,

          trigger,

          triggerClock,

          outputMode,

          extraOptions,

          deleteCheckpointOnStop))

      case _ =>

        if (operationCheckEnabled) {

          UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)

        }

        new StreamingQueryWrapper(new MicroBatchExecution(

          sparkSession,

          userSpecifiedName.orNull,

          checkpointLocation,

          analyzedPlan,

          sink,

          trigger,

          triggerClock,

          outputMode,

          extraOptions,

          deleteCheckpointOnStop))

    }

}

對于連續(xù)流,返回一個:

  new StreamingQueryWrapper(new ContinuousExecution))

StreamingQueryWrapper的作用,就是將StreamingQuery封裝成可序列化的,別的和StreamingQuery沒什么區(qū)別。這里對于連續(xù)流就是包裝了ContinuousExecution。

ContinuousExecution

ContinuousExecution看名稱應(yīng)該是對應(yīng)連續(xù)流的物理執(zhí)行計劃的,繼承自StreamExecution(抽象類)。看看主要代碼其實就是重寫了runActivatedStream方法。

 override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {

    val stateUpdate = new UnaryOperator[State] {

      override def apply(s: State) = s match {

        // If we ended the query to reconfigure, reset the state to active.

        case RECONFIGURING => ACTIVE

        case _ => s

      }

    }

    do {

      runContinuous(sparkSessionForStream)

    } while (state.updateAndGet(stateUpdate) == ACTIVE)

    stopSources()

  }

真正的執(zhí)行邏輯代碼在私有方法runContinuous中,這里就不詳細展開了,知道了主要流程就可以了。

下面就是要看看ContinuousExecution到底是在哪里被從邏輯計劃轉(zhuǎn)換到物理計劃的。

搜索全文,找到了StreamingQueryManager.scala這個文件。對了,就是從上面的StreamingQueryManager找到這個ContinuousExecution。

DataStreamWriter

DataStreamWriter是真正觸發(fā)流計算開始啟動執(zhí)行的地方。

start()方法得到要給StreamingQuery,方法里的關(guān)鍵代碼片段:

 df.sparkSession.sessionState.streamingQueryManager.startQuery(

        extraOptions.get("queryName"),

        extraOptions.get("checkpointLocation"),

        df,

        extraOptions.toMap,

        sink,

        outputMode,

        useTempCheckpointLocation = source == "console" || source == "noop",

        recoverFromCheckpointLocation = true,

        trigger = trigger)

跟蹤進去到了StreamingQueryManager,看它的startQuery方法。

startQuery方法分為幾步:

  1. 調(diào)用createQuery方法返回StreamingQuery。

val query = createQuery(

      userSpecifiedName,

      userSpecifiedCheckpointLocation,

      df,

      extraOptions,

      sink,

      outputMode,

      useTempCheckpointLocation,

      recoverFromCheckpointLocation,

      trigger,

      triggerClock)

query就是StreamingQueryWrapper,就是類似這樣的代碼:

new StreamingQueryWrapper(new ContinuousExecution))

2、啟動上一步的query 

try {     

      query.streamingQuery.start()

    } catch {     

    }

這里的代碼直接調(diào)用到StreamingQuery的父類StreamExecution的start方法。代碼定義:

def start(): Unit = {

    logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")

    queryExecutionThread.setDaemon(true)

    queryExecutionThread.start()

    startLatch.await()  // Wait until thread started and QueryStart event has been posted

  }

queryExecutionThread線程的定義又是這樣的:

val queryExecutionThread: QueryExecutionThread =

    new QueryExecutionThread(s"stream execution thread for $prettyIdString") {

      override def run(): Unit = {

        sparkSession.sparkContext.setCallSite(callSite)

        runStream()

      }

    }

最后在線程中啟動runStream這個私有方法。

3、返回query

最后返回query,注意這里的query在上面的代碼中已經(jīng)start運行了。

到此,關(guān)于“DataStreamReader和DataStreamWriter怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
當(dāng)前題目:DataStreamReader和DataStreamWriter怎么使用
網(wǎng)址分享:http://www.xueling.net.cn/article/gcgsjd.html

其他資訊

在線咨詢
服務(wù)熱線
服務(wù)熱線:028-86922220
TOP
主站蜘蛛池模板: 老司机精品成人无码AV | 69午夜免费福利 | 人人爽人人片人人片av | 天天操天天要 | 亚洲综合区夜夜久久久 | 精品精品国产毛片在线看 | 国产天堂| 扒开粉嫩的小缝喷白浆h | 日韩欧美精 | 日本一本在线观看 | 未满十八18勿进黄网站 | 人妻少妇看a片偷人精品视频 | 亚洲国产精品久久无人区 | 91久久香蕉国产熟女线看 | 性淫视频 | 欧洲vs亚洲vs国产 | 中文在线免费看视频 | 在线欧美色 | 亚洲第一黄色网址 | 看真人视频a级毛片 | 毛茸茸av| 免费一看一级毛片 | 国产91xxx在线观看大全 | 青草99| 福利在线观看1000集 | 日本久久99| 男人操女人视频网站 | 啊轻点灬大ji巴太粗太长了日本 | 午夜成人毛片免费观看蜜桔视频 | 免费高清网站在线播放的注意事项 | 国产一级大片 | 国产精品99久久久久久一二区 | 97视频免费| 97久久精品午夜一区二区 | 中文字幕另类日韩欧美亚洲嫩草 | 国产极品一区二区 | 国产乱轮在线视频 | 黄色影院国产 | www久久久久久久 | 免费国产自线拍一欧美视频 | 看成年全黄大色黄大片 |