老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業(yè)提供網(wǎng)站建設(shè)、域名注冊(cè)、服務(wù)器等服務(wù)

Spark筆記整理(四):SparkRDD算子實(shí)戰(zhàn)

[TOC]

我們提供的服務(wù)有:成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、微信公眾號(hào)開(kāi)發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、許昌ssl等。為上千家企事業(yè)單位解決了網(wǎng)站和推廣的問(wèn)題。提供周到的售前咨詢(xún)和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的許昌網(wǎng)站制作公司


Spark算子概述

RDD:彈性分布式數(shù)據(jù)集,是一種特殊集合、支持多種來(lái)源、有容錯(cuò)機(jī)制、可以被緩存、支持并行操作,一個(gè)RDD代表多個(gè)分區(qū)里的數(shù)據(jù)集。

RDD有兩種操作算子:

  • Transformation(轉(zhuǎn)換):Transformation屬于延遲計(jì)算,當(dāng)一個(gè)RDD轉(zhuǎn)換成另一個(gè)RDD時(shí)并沒(méi)有立即進(jìn)行轉(zhuǎn)換,僅僅是記住了數(shù)據(jù)集的邏輯操作
  • Action(執(zhí)行):觸發(fā)Spark作業(yè)的運(yùn)行,真正觸發(fā)轉(zhuǎn)換算子的計(jì)算

需要說(shuō)明的是,下面寫(xiě)的scala代碼,其實(shí)都是可以簡(jiǎn)寫(xiě)的,但是為了方便理解,我都沒(méi)有簡(jiǎn)寫(xiě),因?yàn)橐?jiǎn)寫(xiě)的話對(duì)于scala來(lái)說(shuō)真的就是一句話的事情了。

另外如果是在本地環(huán)境進(jìn)行開(kāi)發(fā),那么需要添加相關(guān)依賴(lài):


    org.scala-lang
    scala-library
    2.10.5


    org.apache.spark
    spark-core_2.10
    1.6.2

Transformation算子

概述

需要操作的Transformation算子說(shuō)明如下:

  • map(func)

    返回一個(gè)新的分布式數(shù)據(jù)集,由每個(gè)原元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成

  • filter(func)

    返回一個(gè)新的數(shù)據(jù)集,由經(jīng)過(guò)func函數(shù)后返回值為true的原元素組成

  • flatMap(func)

    類(lèi)似于map,但是每一個(gè)輸入元素,會(huì)被映射為0到多個(gè)輸出元素(因此,func函數(shù)的返回值是一個(gè)Seq,而不是單一元素)

  • sample(withReplacement, frac, seed)

    根據(jù)給定的隨機(jī)種子seed,隨機(jī)抽樣出數(shù)量為frac的數(shù)據(jù)

  • union(otherDataset)

    返回一個(gè)新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成

  • groupByKey([numTasks])

    在一個(gè)由(K,V)對(duì)組成的數(shù)據(jù)集上調(diào)用,返回一個(gè)(K,Seq[V])對(duì)的數(shù)據(jù)集。注意:默認(rèn)情況下,使用8個(gè)并行任務(wù)進(jìn)行分組,你可以傳入numTask可選參數(shù),根據(jù)數(shù)據(jù)量設(shè)置不同數(shù)目的Task

  • reduceByKey(func, [numTasks])

    在一個(gè)(K,V)對(duì)的數(shù)據(jù)集上使用,返回一個(gè)(K,V)對(duì)的數(shù)據(jù)集,key相同的值,都被使用指定的reduce函數(shù)聚合到一起。和groupbykey類(lèi)似,任務(wù)的個(gè)數(shù)是可以通過(guò)第二個(gè)可選參數(shù)來(lái)配置的。

  • join(otherDataset, [numTasks])

    在類(lèi)型為(K,V)和(K,W)類(lèi)型的數(shù)據(jù)集上調(diào)用,返回一個(gè)(K,(V,W))對(duì),每個(gè)key中的所有元素都在一起的數(shù)據(jù)集

map

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps1(sc)

        sc.stop()
    }
    /**
      * 1、map:將集合中每個(gè)元素乘以7
      * map(func):返回一個(gè)新的分布式數(shù)據(jù)集,由每個(gè)原元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成
      */
    def transformationOps1(sc:SparkContext): Unit = {
        val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val listRDD = sc.parallelize(list)
        val retRDD = listRDD.map(num => num * 7)
        retRDD.foreach(num => println(num))
    }
}

執(zhí)行結(jié)果如下:

42
7
49
14
56
21
63
28
70
35

filter

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps2(sc)

        sc.stop()
    }
    /**
      * 2、filter:過(guò)濾出集合中的奇數(shù)
      * filter(func): 返回一個(gè)新的數(shù)據(jù)集,由經(jīng)過(guò)func函數(shù)后返回值為true的原元素組成
      *
      * 一般在filter操作之后都要做重新分區(qū)(因?yàn)榭赡軘?shù)據(jù)量減少了很多)
      */
    def transformationOps2(sc:SparkContext): Unit = {
        val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val listRDD = sc.parallelize(list)
        val retRDD = listRDD.filter(num => num % 2 == 0)
        retRDD.foreach(println)
    }
}

輸出結(jié)果如下:

6
2
8
4
10

flatMap

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps3(sc)

        sc.stop()
    }
    /**
      * 3、flatMap:將行拆分為單詞
      * flatMap(func):類(lèi)似于map,但是每一個(gè)輸入元素,
      * 會(huì)被映射為0到多個(gè)輸出元素(因此,func函數(shù)的返回值是一個(gè)Seq,而不是單一元素)
      */
    def transformationOps3(sc:SparkContext): Unit = {
        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val wordsRDD = listRDD.flatMap(line => line.split(" "))
        wordsRDD.foreach(println)
    }
}

輸出結(jié)果如下:

hello
hello
he
you
hello
me

sample

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps4(sc)

        sc.stop()
    }
    /**
      * 4、sample:根據(jù)給定的隨機(jī)種子seed,隨機(jī)抽樣出數(shù)量為frac的數(shù)據(jù)
      * sample(withReplacement, frac, seed): 根據(jù)給定的隨機(jī)種子seed,隨機(jī)抽樣出數(shù)量為frac的數(shù)據(jù)
      * 抽樣的目的:就是以樣本評(píng)估整體
      * withReplacement:
      *     true:有放回的抽樣
      *     false:無(wú)放回的抽樣
      * frac:就是樣本空間的大小,以百分比小數(shù)的形式出現(xiàn),比如20%,就是0.2
      *
      * 使用sample算子計(jì)算出來(lái)的結(jié)果可能不是很準(zhǔn)確,1000個(gè)數(shù),20%,樣本數(shù)量在200個(gè)左右,不一定為200
      *
      * 一般情況下,使用sample算子在做spark優(yōu)化(數(shù)據(jù)傾斜)的方面應(yīng)用最廣泛
      */
    def transformationOps4(sc:SparkContext): Unit = {
        val list = 1 to 1000
        val listRDD = sc.parallelize(list)
        val sampleRDD = listRDD.sample(false, 0.2)

        sampleRDD.foreach(num => print(num + " "))
        println
        println("sampleRDD count: " + sampleRDD.count())
        println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count())
    }
}

輸出結(jié)果如下:

sampleRDD count: 219
Another sampleRDD count: 203

union

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps5(sc)

        sc.stop()
    }
    /**
      * 5、union:返回一個(gè)新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成
      * union(otherDataset): 返回一個(gè)新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成
      * 類(lèi)似數(shù)學(xué)中的并集,就是sql中的union操作,將兩個(gè)集合的所有元素整合在一塊,包括重復(fù)元素
      */
    def transformationOps5(sc:SparkContext): Unit = {
        val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val list2 = List(7, 8, 9, 10, 11, 12)
        val listRDD1 = sc.parallelize(list1)
        val listRDD2 = sc.parallelize(list2)
        val unionRDD = listRDD1.union(listRDD2)

        unionRDD.foreach(println)
    }
}

輸出結(jié)果如下:

1
6
2
7
3
8
4
9
5
10
7
8
9
10
11
12

groupByKey

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps6(sc)

        sc.stop()
    }
    /**
      * 6、groupByKey:對(duì)數(shù)組進(jìn)行 group by key操作
      * groupByKey([numTasks]): 在一個(gè)由(K,V)對(duì)組成的數(shù)據(jù)集上調(diào)用,返回一個(gè)(K,Seq[V])對(duì)的數(shù)據(jù)集。
      * 注意:默認(rèn)情況下,使用8個(gè)并行任務(wù)進(jìn)行分組,你可以傳入numTask可選參數(shù),根據(jù)數(shù)據(jù)量設(shè)置不同數(shù)目的Task
      * mr中:
      * --->map操作--->--->shuffle--->--->
      * groupByKey類(lèi)似于shuffle操作
      *
      * 和reduceByKey有點(diǎn)類(lèi)似,但是有區(qū)別,reduceByKey有本地的規(guī)約,而groupByKey沒(méi)有本地規(guī)約,所以一般情況下,
      * 盡量慎用groupByKey,如果一定要用的話,可以自定義一個(gè)groupByKey,在自定義的gbk中添加本地預(yù)聚合操作
      */
    def transformationOps6(sc:SparkContext): Unit = {
        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val wordsRDD = listRDD.flatMap(line => line.split(" "))
        val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
        pairsRDD.foreach(println)
        val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey()
        println("=============================================")
        gbkRDD.foreach(t => println(t._1 + "..." + t._2))
    }
}

輸出結(jié)果如下:

(hello,1)
(hello,1)
(you,1)
(he,1)
(hello,1)
(me,1)
=============================================
you...CompactBuffer(1)
hello...CompactBuffer(1, 1, 1)
he...CompactBuffer(1)
me...CompactBuffer(1)

reduceByKey

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps7(sc)

        sc.stop()
    }
    /**
      * 7、reduceByKey:統(tǒng)計(jì)每個(gè)班級(jí)的人數(shù)
      * reduceByKey(func, [numTasks]): 在一個(gè)(K,V)對(duì)的數(shù)據(jù)集上使用,返回一個(gè)(K,V)對(duì)的數(shù)據(jù)集,
      * key相同的值,都被使用指定的reduce函數(shù)聚合到一起。和groupbykey類(lèi)似,任務(wù)的個(gè)數(shù)是可以通過(guò)第二個(gè)可選參數(shù)來(lái)配置的。
      *
      * 需要注意的是還有一個(gè)reduce的操作,其為action算子,并且其返回的結(jié)果只有一個(gè),而不是一個(gè)數(shù)據(jù)集
      * 而reduceByKey是一個(gè)transformation算子,其返回的結(jié)果是一個(gè)數(shù)據(jù)集
      */
    def transformationOps7(sc:SparkContext): Unit = {
        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val wordsRDD = listRDD.flatMap(line => line.split(" "))
        val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
        val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2)

        retRDD.foreach(t => println(t._1 + "..." + t._2))
    }
}

輸出結(jié)果如下:

you...1
hello...3
he...1
me...1

join

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps8(sc)

        sc.stop()
    }
    /**
      * 8、join:打印關(guān)聯(lián)的組合信息
      * join(otherDataset, [numTasks]): 在類(lèi)型為(K,V)和(K,W)類(lèi)型的數(shù)據(jù)集上調(diào)用,返回一個(gè)(K,(V,W))對(duì),每個(gè)key中的所有元素都在一起的數(shù)據(jù)集
      * 學(xué)生基礎(chǔ)信息表和學(xué)生考試成績(jī)表
      * stu_info(sid ,name, birthday, class)
      * stu_score(sid, chinese, english, math)
      *
      * *  Serialization stack:
    - object not serializable
        這種分布式計(jì)算的過(guò)程,一個(gè)非常重要的點(diǎn),傳遞的數(shù)據(jù)必須要序列化

        通過(guò)代碼測(cè)試,該join是等值連接(inner join)
        A.leftOuterJoin(B)
            A表所有的數(shù)據(jù)都包涵,B表中在A表沒(méi)有關(guān)聯(lián)的數(shù)據(jù),顯示為null
        之后執(zhí)行一次filter就是join的結(jié)果
      */
    def transformationOps8(sc: SparkContext): Unit = {
        val infoList = List(
            "1,鐘  瀟,1988-02-04,bigdata",
            "2,劉向前,1989-03-24,linux",
            "3,包維寧,1984-06-16,oracle")
        val scoreList = List(
            "1,50,21,61",
            "2,60,60,61",
            "3,62,90,81",
            "4,72,80,81"
        )

        val infoRDD:RDD[String] = sc.parallelize(infoList)
        val scoreRDD:RDD[String] = sc.parallelize(scoreList)

        val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => {
            val fields = line.split(",")
            val student = new Student(fields(0), fields(1), fields(2), fields(3))
            (fields(0), student)
        })
        val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => {
            val fields = line.split(",")
            val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat)
            (fields(0), score)
        })

        val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD)
        joinedRDD.foreach(t => {
            val sid = t._1
            val student = t._2._1
            val score = t._2._2
            println(sid + "\t" + student + "\t" + score)
        })

        println("=========================================")

        val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD)
        leftOuterRDD.foreach(println)

    }
}

輸出結(jié)果如下:

3   3 包維寧 1984-06-16 oracle 3 62.0 90.0 81.0
2   2 劉向前 1989-03-24 linux  2 60.0 60.0 61.0
1   1 鐘  瀟 1988-02-04 bigdata   1 50.0 21.0 61.0
=========================================
(4,(4 72.0 80.0 81.0,None))
(3,(3 62.0 90.0 81.0,Some(3 包維寧 1984-06-16 oracle)))
(2,(2 60.0 60.0 61.0,Some(2 劉向前 1989-03-24 linux)))
(1,(1 50.0 21.0 61.0,Some(1 鐘  瀟 1988-02-04 bigdata)))

為了更好進(jìn)行操作和理解,下面提供一個(gè)Spark-shell的經(jīng)典例子:

scala> val infoList = List("1,zhongxiang","2,liuxiangqian","3,baweining")
infoList: List[String] = List(1,zhongxiang, 2,liuxiangqian, 3,baweining)

scala> val infoRDD = sc.parallelize(infoList)
infoRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[31] at parallelize at :29

scala> val infoPairRDD = infoRDD.map(line => (line.split(",")(0),line.split(",")(1)))
infoPairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[32] at map at :31

scala> val scoreList = List("1,50-21-61","2,60-60-61","3,62-90-81","4,72-80-81")
scoreList: List[String] = List(1,50-21-61, 2,60-60-61, 3,62-90-81, 4,72-80-81)

scala> val scoreRDD = sc.parallelize(scoreList)
scoreRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at :29

scala> val scorePairRDD = scoreRDD.map(line => (line.split(",")(0),line.split(",")(1)))
scorePairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[34] at map at :31

scala>

scala> val joinedRDD = infoPairRDD.join(scorePairRDD)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[37] at join at :39

scala> joinedRDD.foreach(t => println(t._1 + "\t" + "name:" + t._2._1 + "\t" + "score:" + t._2._2))
1   name:zhongxiang score:50-21-61
3   name:baweining  score:62-90-81
2   name:liuxiangqian   score:60-60-61

有讀者反應(yīng)上面的案例還是過(guò)于復(fù)雜化,于是又寫(xiě)了下面這個(gè)demo,相信就很好理解了:

scala> val infoPairRDD = sc.parallelize(Seq((1,"leaf"),(2,"xpleaf"),(3,"yyh")))
infoPairRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[41] at parallelize at :27
scala> infoPairRDD.foreach(println)
(2,xpleaf)
(1,leaf)
(3,yyh)

scala> val scorePairRDD = sc.parallelize(Seq((1, 93), (2, 91), (3, 86), (4, 97)))
scorePairRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at parallelize at :27
scala> scorePairRDD.foreach(println)
(1,93)
(3,86)
(2,91)
(4,97)

scala> val joinedRDD = infoPairRDD.join(scorePairRDD)
joinedRDD: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[53] at join at :31
scala> joinedRDD.foreach(println)
(2,(xpleaf,91))
(1,(leaf,93))
(3,(yyh,86))

1.應(yīng)該很清楚地理解到,spark中的join其實(shí)跟sql中的join是類(lèi)似的,infoPairRDD和scorePairRDD就可以理解為兩張表,而RDD中的每一條數(shù)據(jù)就可以理解為表中的一條數(shù)據(jù),上面的盒子,相當(dāng)于兩個(gè)表中都有相同的id,需要將兩張表中的數(shù)據(jù)根據(jù)id來(lái)進(jìn)行連接,因此,在上面演示的等值連接中,左表的每一條數(shù)據(jù),只要左表有出現(xiàn)的id,在右表也有相同的id,那么就會(huì)進(jìn)行連接操作,當(dāng)然,這是等值連接的情況,對(duì)于左外連接,則是不管右表有沒(méi)有該id出現(xiàn),左邊的數(shù)據(jù)都會(huì)顯示出來(lái)。

2.spark在進(jìn)行開(kāi)發(fā)級(jí)別的調(diào)優(yōu)時(shí),要盡可能避免出現(xiàn)shuffle操作,對(duì)于join操作,尤其需要注意的是大小表join問(wèn)題,如果采用大表.join(小表)的join操作,實(shí)際上,在網(wǎng)絡(luò)上或者節(jié)點(diǎn)之間傳輸?shù)氖切”淼臄?shù)據(jù),這不會(huì)有太大的性能問(wèn)題,但是如果是采用小表.join(大表),那么在網(wǎng)絡(luò)上或者節(jié)點(diǎn)之間就會(huì)傳輸大量的數(shù)據(jù),這會(huì)造成很?chē)?yán)重的性能問(wèn)題。所以,當(dāng)需要執(zhí)行join操作時(shí),請(qǐng)一定要警惕大小表的問(wèn)題。
3.看下面的兩份RDD數(shù)據(jù),顯然是從infoRDD的分區(qū)傳輸?shù)絪coreRDD的分區(qū)成本更低:

infoRDD:
(1,"info")
(2,"info")
(3,"info")
(4,"info")

scoreRDD:
(1,"score1")
(1,"score2")
(1,"score3")
(1,"score4")
(1,"score5")

(2,"score1")
(2,"score2")
(2,"score3")
(2,"score4")
(2,"score5")

(3,"score1")
(3,"score2")
(3,"score3")
(3,"score4")
(3,"score5")

(4,"score1")
(4,"score2")
(4,"score3")
(4,"score4")
(4,"score5")

sortByKey

測(cè)試代碼如下:

object _02SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

        transformationOps7(sc)

        sc.stop()
    }
    /**
      * sortByKey:將學(xué)生身高進(jìn)行(降序)排序
      *     身高相等,按照年齡排(升序)
      */
    def transformationOps9(sc: SparkContext): Unit = {
        val list = List(
            "1,李  磊,22,175",
            "2,劉銀鵬,23,175",
            "3,齊彥鵬,22,180",
            "4,楊  柳,22,168",
            "5,敦  鵬,20,175"
        )
        val listRDD:RDD[String] = sc.parallelize(list)

        /*  // 使用sortBy操作完成排序
        val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
            override def compare(x: String, y: String): Int = {
                val xFields = x.split(",")
                val yFields = y.split(",")
                val xHgiht = xFields(3).toFloat
                val yHgiht = yFields(3).toFloat
                val xAge = xFields(2).toFloat
                val yAge = yFields(2).toFloat
                var ret = yHgiht.compareTo(xHgiht)
                if (ret == 0) {
                    ret = xAge.compareTo(yAge)
                }
                ret
            }
        } ,ClassTag.Object.asInstanceOf[ClassTag[String]])
        */
        // 使用sortByKey完成操作,只做身高降序排序
        val heightRDD:RDD[(String, String)] = listRDD.map(line => {
            val fields = line.split(",")
            (fields(3), line)
        })
        val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1)   // 需要設(shè)置1個(gè)分區(qū),否則只是各分區(qū)內(nèi)有序
        retRDD.foreach(println)

        // 使用sortByKey如何實(shí)現(xiàn)sortBy的二次排序?將上面的信息寫(xiě)成一個(gè)java對(duì)象,然后重寫(xiě)compareTo方法,在做map時(shí),key就為該對(duì)象本身,而value可以為null

    }
}

輸出結(jié)果如下:

(180,3,齊彥鵬,22,180)
(175,1,李  磊,22,175)
(175,2,劉銀鵬,23,175)
(175,5,敦  鵬,20,175)
(168,4,楊  柳,22,168)

下面是一個(gè)快速入門(mén)的demo:

scala> val rdd = sc.parallelize(Seq((1,"one"),(2,"two"),(3,"three")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at :21

scala> rdd.sortByKey(true, 1).foreach(println)
(1,one)
(2,two)
(3,three)

combineByKey與aggregateByKey

下面的代碼分別使用combineByKey和aggregateByKey來(lái)模擬groupByKey和reduceBykey,所以是有4個(gè)操作,只要把combineByKey模擬groupByKey的例子掌握了,其它三個(gè)相對(duì)就容易許多了。

整體來(lái)說(shuō)理解不太容易,但是非常重要,所以一定是要掌握的!


/**
  * spark的transformation操作:
  * aggregateByKey
  * combineByKey
  *
  * 使用combineByKey和aggregateByKey模擬groupByKey和reduceByKey
  *
  * 通過(guò)查看源碼,我們發(fā)現(xiàn)aggregateByKey底層,還是combineByKey
  *
  * 問(wèn)題:combineByKey和aggregateByKey的區(qū)別?
  * aggregateByKey是柯里化形式的,目前底層源碼還沒(méi)時(shí)間去分析,所知道的區(qū)別是這個(gè)
  */
object _03SparkTransformationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc = new SparkContext(conf)

//        combineByKey2GroupByKey(sc)
//        combineByKey2ReduceByKey(sc)
//        aggregateByKey2ReduceByKey(sc)
        aggregateByKey2GroupByKey(sc)

        sc.stop()
    }

    /**
      * 使用aggregateByKey模擬groupByKey
      */
    def aggregateByKey2GroupByKey(sc: SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) (  // 這里需要指定value的類(lèi)型為ArrayBuffer[Int]()
            (part, num) => {
                part.append(num)
                part
            },
            (part1, part2) => {
                part1.++=(part2)
                part1
            }
        )

        retRDD.foreach(println)
    }

    /**
      * 使用aggregateByKey模擬reduceByKey
      *   def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]
      (zeroValue: U)就對(duì)應(yīng)的是combineByKey中的第一個(gè)函數(shù)的返回值
      seqOp 就對(duì)應(yīng)的是combineByKey中的第二個(gè)函數(shù),也就是mergeValue
      combOp 就對(duì)應(yīng)的是combineByKey中的第三個(gè)函數(shù),也就是mergeCombiners
      */
    def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) (
            (partNum, num) => partNum + num,    // 也就是mergeValue
            (partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners
        )

        retRDD.foreach(println)
    }

    /**
      * 使用reduceByKey模擬groupByKey
      */
    def combineByKey2ReduceByKey(sc:SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        /**
          * 對(duì)于createCombiner1   mergeValue1     mergeCombiners1
          * 代碼的參數(shù)已經(jīng)體現(xiàn)得很清楚了,其實(shí)只要理解了combineByKey模擬groupByKey的例子,這個(gè)就非常容易了
          */
        var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1)

        retRDD.foreach(println)
    }

    /**
      * reduceByKey操作,value就是該數(shù)值本身,則上面的數(shù)據(jù)會(huì)產(chǎn)生:
      * (hello, 1) (bo, 1)   (bo, 1)
      * (zhou, 1)  (xin, 1)  (xin, 1)
      * (hello, 1) (song, 1) (bo, 1)
      * 注意有別于groupByKey的操作,它是創(chuàng)建一個(gè)容器
      */
    def createCombiner1(num:Int):Int = {
        num
    }

    /**
      * 同一partition內(nèi),對(duì)于有相同key的,這里的mergeValue直接將其value相加
      * 注意有別于groupByKey的操作,它是添加到value到一個(gè)容器中
      */
    def mergeValue1(localNum1:Int, localNum2:Int): Int = {
        localNum1 + localNum2
    }

    /**
      * 將兩個(gè)不同partition中的key相同的value值相加起來(lái)
      * 注意有別于groupByKey的操作,它是合并兩個(gè)容器
      */
    def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = {
        thisPartitionNum1 + anotherPartitionNum2
    }

    /**
      * 使用combineByKey模擬groupByKey
      */
    def combineByKey2GroupByKey(sc:SparkContext): Unit = {
        val list = List("hello bo bo", "zhou xin xin", "hello song bo")
        val lineRDD = sc.parallelize(list)
        val wordsRDD = lineRDD.flatMap(line => line.split(" "))
        val pairsRDD = wordsRDD.map(word => (word, 1))

        // 輸出每個(gè)partition中的map對(duì)
        pairsRDD.foreachPartition( partition => {
            println("<=========partition-start=========>")
            partition.foreach(println)
            println("<=========partition-end=========>")
        })

        val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)

        gbkRDD.foreach(println)

        // 如果要測(cè)試最后groupByKey的結(jié)果是在幾個(gè)分區(qū),可以使用下面的代碼進(jìn)行測(cè)試
        /*gbkRDD.foreachPartition(partition => {
            println("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
            partition.foreach(println)
        })*/

    }

    /**
      * 初始化,將value轉(zhuǎn)變成為標(biāo)準(zhǔn)的格式數(shù)據(jù)
      * 是在每個(gè)分區(qū)中進(jìn)行的操作,去重后的key有幾個(gè),就調(diào)用次,
      * 因?yàn)閷?duì)于每個(gè)key,其容器創(chuàng)建一次就ok了,之后有key相同的,只需要執(zhí)行mergeValue到已經(jīng)創(chuàng)建的容器中即可
      */
    def createCombiner(num:Int):ArrayBuffer[Int] = {
        println("----------createCombiner----------")
        ArrayBuffer[Int](num)
    }

    /**
      * 將key相同的value,添加到createCombiner函數(shù)創(chuàng)建的ArrayBuffer容器中
      * 一個(gè)分區(qū)內(nèi)的聚合操作,將一個(gè)分區(qū)內(nèi)key相同的數(shù)據(jù),合并
      */
    def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = {
        println("----------mergeValue----------")
        ab.append(num)
        ab
    }

    /**
      * 將key相同的多個(gè)value數(shù)組,進(jìn)行整合
      * 分區(qū)間的合并操作
      */
    def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = {
        println("----------mergeCombiners----------")
        ab1 ++= ab2
        ab1
    }

}

/*
combineByKey模擬groupByKey的一個(gè)輸出效果,可以很好地說(shuō)明createCombiner、mergeValue和mergeCombiners各個(gè)階段的執(zhí)行時(shí)機(jī):
<=========partition-start=========>
<=========partition-start=========>
(hello,1)
(zhou,1)
(bo,1)
(xin,1)
(bo,1)
(xin,1)
<=========partition-end=========>
(hello,1)
(song,1)
(bo,1)
<=========partition-end=========>
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeValue----------
----------mergeValue----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeCombiners----------
----------mergeCombiners----------
(song,ArrayBuffer(1))
(hello,ArrayBuffer(1, 1))
(bo,ArrayBuffer(1, 1, 1))
(zhou,ArrayBuffer(1))
(xin,ArrayBuffer(1, 1))
 */

Actions算子

概述

前面Transformationt算子的測(cè)試都是在本地開(kāi)發(fā)環(huán)境中直接跑代碼,這里Actions算子的測(cè)試主要在spark-shell中進(jìn)行操作,因?yàn)闀?huì)方便很多。

需要說(shuō)明的Actions算子如下:

  • reduce(func)

    通過(guò)函數(shù)func聚集數(shù)據(jù)集中的所有元素。Func函數(shù)接受2個(gè)參數(shù),返回一個(gè)值。這個(gè)函數(shù)必須是關(guān)聯(lián)性的,確保可以被正確的并發(fā)執(zhí)行

  • collect()

    在Driver的程序中,以數(shù)組的形式,返回?cái)?shù)據(jù)集的所有元素。這通常會(huì)在使用filter或者其它操作后,返回一個(gè)足夠小的數(shù)據(jù)子集再使用,直接將整個(gè)RDD集Collect返回,很可能會(huì)讓Driver程序OOM

  • count()

    返回?cái)?shù)據(jù)集的元素個(gè)數(shù)

  • take(n)

    返回一個(gè)數(shù)組,由數(shù)據(jù)集的前n個(gè)元素組成。注意,這個(gè)操作目前并非在多個(gè)節(jié)點(diǎn)上,并行執(zhí)行,而是Driver程序所在機(jī)器,單機(jī)計(jì)算所有的元素(Gateway的內(nèi)存壓力會(huì)增大,需要謹(jǐn)慎使用)

  • first()

    返回?cái)?shù)據(jù)集的第一個(gè)元素(類(lèi)似于take(1))

  • saveAsTextFile(path)

    將數(shù)據(jù)集的元素,以textfile的形式,保存到本地文件系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。Spark將會(huì)調(diào)用每個(gè)元素的toString方法,并將它轉(zhuǎn)換為文件中的一行文本

  • saveAsSequenceFile(path)

    將數(shù)據(jù)集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。RDD的元素必須由key-value對(duì)組成,并都實(shí)現(xiàn)了Hadoop的Writable接口,或隱式可以轉(zhuǎn)換為Writable(Spark包括了基本類(lèi)型的轉(zhuǎn)換,例如Int,Double,String等等)

  • foreach(func)

    在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func。這通常用于更新一個(gè)累加器變量,或者和外部存儲(chǔ)系統(tǒng)做交互

reduce

通過(guò)函數(shù)func聚集數(shù)據(jù)集中的所有元素。Func函數(shù)接受2個(gè)參數(shù),返回一個(gè)值。這個(gè)函數(shù)必須是關(guān)聯(lián)性的,確保可以被正確的并發(fā)執(zhí)行。

關(guān)于reduce的執(zhí)行過(guò)程,可以對(duì)比scala中類(lèi)似的reduce函數(shù),相關(guān)說(shuō)明可以參考我的scala整理的知識(shí)點(diǎn)。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :29

scala> val ret = listRDD.reduce((v1, v2) => v1 + v2)
...
ret: Int = 21

需要注意的是,不同于Transformation算子,其結(jié)果仍然是RDD,但是執(zhí)行Actions算子之后,其結(jié)果不再是RDD,而是一個(gè)標(biāo)量。

collect

在Driver的程序中,以數(shù)組的形式,返回?cái)?shù)據(jù)集的所有元素。這通常會(huì)在使用filter或者其它操作后,返回一個(gè)足夠小的數(shù)據(jù)子集再使用,直接將整個(gè)RDD集Collect返回,很可能會(huì)讓Driver程序OOM,這點(diǎn)尤其需要注意。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :29

scala> val ret = listRDD.collect()
...
ret: Array[Int] = Array(1, 2, 3, 4, 5, 6)

count

返回?cái)?shù)據(jù)集的元素個(gè)數(shù)。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :29

scala> val ret = listRDD.count()
...
ret: Long = 6

take

返回一個(gè)數(shù)組,由數(shù)據(jù)集的前n個(gè)元素組成。注意,這個(gè)操作目前并非在多個(gè)節(jié)點(diǎn)上,并行執(zhí)行,而是Driver程序所在機(jī)器,單機(jī)計(jì)算所有的元素(Gateway的內(nèi)存壓力會(huì)增大,需要謹(jǐn)慎使用)。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :29

scala> listRDD.take(3)
...
res7: Array[Int] = Array(1, 2, 3)

first

返回?cái)?shù)據(jù)集的第一個(gè)元素(類(lèi)似于take(1))。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :29

scala> listRDD.first()
...
res8: Int = 1

saveAsTextFile

將數(shù)據(jù)集的元素,以textfile的形式,保存到本地文件系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。Spark將會(huì)調(diào)用每個(gè)元素的toString方法,并將它轉(zhuǎn)換為文件中的一行文本。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at :29

scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action")
...

可以在文件系統(tǒng)中查看到保存的文件:

[uplooking@uplooking01 action]$ pwd
/home/uplooking/data/spark/action
[uplooking@uplooking01 action]$ ls
part-00000  part-00001  part-00002  part-00003  _SUCCESS

其實(shí)可以看到,保存的跟Hadoop的格式是一樣的。

當(dāng)然因?yàn)槲业膕park集群中已經(jīng)做了跟hadoop相關(guān)的配置,所以也可以把文件保存到hdfs中:

scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action")
...

然后就可以在hdfs中查看到保存的文件:

[uplooking@uplooking01 action]$ hdfs dfs -ls /output/spark/action
18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r--   3 uplooking supergroup          0 2018-04-27 10:25 /output/spark/action/_SUCCESS
-rw-r--r--   3 uplooking supergroup          2 2018-04-27 10:25 /output/spark/action/part-00000
-rw-r--r--   3 uplooking supergroup          4 2018-04-27 10:25 /output/spark/action/part-00001
-rw-r--r--   3 uplooking supergroup          2 2018-04-27 10:25 /output/spark/action/part-00002
-rw-r--r--   3 uplooking supergroup          4 2018-04-27 10:25 /output/spark/action/part-00003

可以看到,保存的格式跟保存到本地文件系統(tǒng)是一樣的。

foreach

在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func。這通常用于更新一個(gè)累加器變量,或者和外部存儲(chǔ)系統(tǒng)做交互。

scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :29

scala> listRDD.foreach(println)
...

saveAsNewAPIHadoopFile

也就是將數(shù)據(jù)保存到Hadoop HDFS中,但是需要注意的是,前面使用saveAsTextFile也可以進(jìn)行相關(guān)操作,其使用的就是saveAsNewAPIHadoopFile或者saveAsHadoopFile這兩個(gè)API,而其兩者的區(qū)別是:

  • saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的類(lèi)
  • saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的類(lèi)。但不管使用哪一個(gè),都是可以完成工作的。

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.core.p2

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark算子操作之Action
  *     saveAsNewAPIHAdoopFile
  *     * saveAsHadoopFile
  * 和saveAsNewAPIHadoopFile的唯一區(qū)別就在于OutputFormat的不同
  * saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的類(lèi)
  * saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的類(lèi)
  * 使用哪一個(gè)都可以完成工作
  *
  * 前面在使用saveAsTextFile時(shí)也可以保存到hadoop文件系統(tǒng)中,注意其源代碼也是使用上面的操作的
  *
  *   Caused by: java.net.UnknownHostException: ns1
    ... 35 more
  找不到ns1,因?yàn)槲覀冊(cè)诒镜貨](méi)有配置,無(wú)法正常解析,就需要將hadoop的配置文件信息給我們加載進(jìn)來(lái)
    hdfs-site.xml.heihei,core-site.xml.heihei
  */
object _05SparkActionOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName)
        val sc = new SparkContext(conf)

        val list = List("hello you", "hello he", "hello me")
        val listRDD = sc.parallelize(list)
        val pairsRDD = listRDD.map(word => (word, 1))
        val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2)

        retRDD.saveAsNewAPIHadoopFile(
            "hdfs://ns1/spark/action",      // 保存的路徑
            classOf[Text],                      // 相當(dāng)于mr中的k3
            classOf[IntWritable],               // 相當(dāng)于mr中的v3
            classOf[TextOutputFormat[Text, IntWritable]]    // 設(shè)置(k3, v3)的outputFormatClass
        )

    }
}

之后我們可以在hdfs中查看到相應(yīng)的文件輸出:

[uplooking@uplooking01 ~]$ hdfs dfs -ls /spark/action               
18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   3 Administrator supergroup          0 2018-04-27 12:07 /spark/action/_SUCCESS
-rw-r--r--   3 Administrator supergroup         13 2018-04-27 12:07 /spark/action/part-r-00000
-rw-r--r--   3 Administrator supergroup         11 2018-04-27 12:07 /spark/action/part-r-00001
[uplooking@uplooking01 ~]$ hdfs dfs -text /spark/action/part-r-00000
18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello   3
me      1
[uplooking@uplooking01 ~]$ hdfs dfs -text /spark/action/part-r-00001
18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
you     1
he      1

寬依賴(lài)和窄依賴(lài)

窄依賴(lài)(narrow dependencies)

子RDD的每個(gè)分區(qū)依賴(lài)于常數(shù)個(gè)父分區(qū)(與數(shù)據(jù)規(guī)模無(wú)關(guān))
輸入輸出一對(duì)一的算子,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變。主要是map/flatmap
輸入輸出一對(duì)一的算子,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生了變化,如union/coalesce
從輸入中選擇部分元素的算子,如filter、distinct、substract、sample

寬依賴(lài)(wide dependencies)

子RDD的每個(gè)分區(qū)依賴(lài)于所有的父RDD分區(qū)
對(duì)單個(gè)RDD基于key進(jìn)行重組和reduce,如groupByKey,reduceByKey
對(duì)兩個(gè)RDD基于key進(jìn)行join和重組,如join
經(jīng)過(guò)大量shuffle生成的RDD,建議進(jìn)行緩存。這樣避免失敗后重新計(jì)算帶來(lái)的開(kāi)銷(xiāo)。

注意:reduce是一個(gè)action,和reduceByKey完全不同。

關(guān)于寬依賴(lài)和窄依賴(lài),《Hadoop與大數(shù)據(jù)挖掘》書(shū)本上的說(shuō)明非常精簡(jiǎn),但是理解起來(lái)也是不錯(cuò)的,可以參考一下,當(dāng)然,這本書(shū)的Spark內(nèi)容就寫(xiě)得非常少了。


標(biāo)題名稱(chēng):Spark筆記整理(四):SparkRDD算子實(shí)戰(zhàn)
本文地址:http://www.xueling.net.cn/article/pjgjjc.html

其他資訊

在線咨詢(xún)
服務(wù)熱線
服務(wù)熱線:028-86922220
TOP
主站蜘蛛池模板: 中文字幕超碰在线 | 岛国无码av不卡一区二区 | 亚洲国产精品久久人人爱蜜臀 | 91av国产精品 | 成人看片 | 日日噜噜噜噜人人爽亚洲精品 | 三级毛片视频 | 九九视频这里只有精品 | 日本草逼视频 | 粉嫩av一区二区在线播 | 亚洲国产精品久久久久久久 | 成人久久18免费网站麻豆 | 97超碰人人做人人爽3d | 欧美视频在线观看免费 | 天天干天天草天天 | 日产精品一区二区 | 亚洲AV永久无码国产精品久久 | 色乱码一区二区三区麻豆 | 又大又粗又硬又爽黄毛少妇 | 免费女人18毛片a毛片视频 | 爱情到此为止在线观看 | 97青娱乐 | 欧美成人一区二区三区在线观看 | 99热这里只有精品国产免费免费 | 国产无套精品久久久久久 | av不卡免费在线观看 | 国产成人jvid在线播放 | 潘金莲性xxxxhd | 中文字幕佐山爱一区二区免费 | 亚洲毛片免费在线观看 | aa片在线观看无码免费 | 久久99精品久久久秒播软件优势 | av无线看 | 亚洲精品无码久久久久yw | 男人午夜 | 三区中文字幕 | china国语对白刺激videos 美女视频很黄很a免费 | 亚洲系列第一页 | 欧美专区日韩视频人妻 | 欧美日影院 | 一级色毛片 |