重慶分公司,新征程啟航
為企業(yè)提供網(wǎng)站建設(shè)、域名注冊(cè)、服務(wù)器等服務(wù)
為企業(yè)提供網(wǎng)站建設(shè)、域名注冊(cè)、服務(wù)器等服務(wù)
spark是大數(shù)據(jù)領(lǐng)域近幾年比較火的編程開發(fā)語(yǔ)言。有眾多的好處,比如速度快,基于內(nèi)存式計(jì)算框架。
公司主營(yíng)業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)公司推出廣南免費(fèi)做網(wǎng)站回饋大家。
不多說(shuō)直接講 spark的RDD 算子的使用。
如果有spark環(huán)境搭建等問(wèn)題,請(qǐng)自行查找資料。本文不做講述。
spark rdd的創(chuàng)建有兩種方式:
1>從集合創(chuàng)建。也就是從父rdd繼承過(guò)來(lái)
2>從外部創(chuàng)建。
import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import com.google.common.base.Optional; import scala.Tuple2; public class Demo01 { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); //map(jsc); //filter(jsc); // flatMap(jsc); //groupByKey(jsc); //reduceByKey(jsc); //sortByKey(jsc); //join(jsc); leftOutJoin(jsc); jsc.stop(); } //每一條元素 都乘以2,并且打印 private static void map(JavaSparkContext jsc) { //數(shù)據(jù)源 Listlst = Arrays.asList(1,2,3,4,5,6,7,8); JavaRDD numRDD = jsc.parallelize(lst); JavaRDD resultRDD = numRDD.map(new Function () { private static final long serialVersionUID = 1L; @Override public Integer call(Integer num) throws Exception { return num * 2; } }); resultRDD.foreach(new VoidFunction () { private static final long serialVersionUID = 1L; @Override public void call(Integer num) throws Exception { System.out.println(num); } }); } // 把集合中的偶數(shù)過(guò)濾出來(lái) private static void filter(JavaSparkContext jsc) { //數(shù)據(jù)源 List lst = Arrays.asList(1,2,3,4,5,6,7,8); JavaRDD numRDD = jsc.parallelize(lst); System.out.println(numRDD.filter(new Function () { private static final long serialVersionUID = 1L; @Override public Boolean call(Integer num) throws Exception { return num % 2 ==0; } }).collect()); } //將一行行數(shù)據(jù)的單詞拆分為一個(gè)個(gè)單詞 private static void flatMap(JavaSparkContext jsc) { List lst = Arrays.asList("hi tim ","hello girl","hello spark"); JavaRDD lines = jsc.parallelize(lst); JavaRDD resultRDD = lines.flatMap(new FlatMapFunction () { private static final long serialVersionUID = 1L; @Override public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); System.out.println(resultRDD.collect()); } // 根據(jù)班級(jí)進(jìn)行分組 private static void groupByKey(JavaSparkContext jsc) { // int ,Integer // scala 里面的類型,沒有像Java這樣分為基本類型和包裝類,因?yàn)閟cala是一種更加強(qiáng)的面向?qū)ο笳Z(yǔ)言, //一切皆對(duì)象,里面的類型,也有對(duì)應(yīng)的方法可以調(diào)用,隱式轉(zhuǎn)換 // 模擬數(shù)據(jù) @SuppressWarnings("unchecked") List > lst = Arrays.asList( new Tuple2 ("class01", 100), new Tuple2 ("class02",101), new Tuple2 ("class01",199), new Tuple2 ("class02",121), new Tuple2 ("class02",120)); JavaPairRDD cla***DD = jsc.parallelizePairs(lst); JavaPairRDD > groupedRDD = cla***DD.groupByKey(); groupedRDD.foreach(new VoidFunction >>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2 > tuple) throws Exception { String classKey = tuple._1; Iterator values = tuple._2.iterator(); while (values.hasNext()) { Integer value = values.next(); System.out.println("key:" + classKey + "\t" + "value:" + value); } } }); } private static void reduceByKey(JavaSparkContext jsc) { @SuppressWarnings("unchecked") List > lst = Arrays.asList( new Tuple2 ("class01", 100), new Tuple2 ("class02",101), new Tuple2 ("class01",199), new Tuple2 ("class02",121), new Tuple2 ("class02",120)); JavaPairRDD cla***DD = jsc.parallelizePairs(lst); JavaPairRDD resultRDD = cla***DD.reduceByKey(new Function2 () { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); resultRDD.foreach(new VoidFunction >() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2 tuple) throws Exception { System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2); } }); } // 把學(xué)生的成績(jī)前3名取出來(lái),并打印 // 1.先排序sortByKey,然后take(3),再foreach private static void sortByKey(JavaSparkContext jsc) { @SuppressWarnings("unchecked") List > lst = Arrays.asList( new Tuple2 ("tom", 60), new Tuple2 ("kate",80), new Tuple2 ("kobe",100), new Tuple2 ("馬蓉",4), new Tuple2 ("宋哲",2), new Tuple2 ("白百合",3), new Tuple2 ("隔壁老王",1)); JavaPairRDD cla***DD = jsc.parallelizePairs(lst); JavaPairRDD pairRDD = cla***DD.mapToPair(new PairFunction ,Integer , String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 tuple) throws Exception { return new Tuple2 (tuple._2, tuple._1); } }); //do no JavaPairRDD sortedRDD = pairRDD.sortByKey(); JavaPairRDD sortedRDD01 = sortedRDD.mapToPair(new PairFunction , String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 tuple) throws Exception { return new Tuple2 (tuple._2, tuple._1); } } ); // take 也是一個(gè)action操作 List > result = sortedRDD01.take(3); System.out.println(result); } private static void join(JavaSparkContext jsc) { // 模擬數(shù)據(jù) @SuppressWarnings("unchecked") List > names =Arrays.asList( new Tuple2 (1,"jack"), new Tuple2 (2,"rose"), new Tuple2 (3,"tom"), new Tuple2 (4,"趙麗穎")); JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names); List > scores = Arrays.asList( new Tuple2 (1,60), new Tuple2 (4,100), new Tuple2 (2,30)); JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores); JavaPairRDD > joinedRDD = num2scoresRDD.join(num2NamesRDD); //姓名成績(jī)排序,取前2名 JavaPairRDD score2NameRDD = joinedRDD.mapToPair(new PairFunction >,Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call( Tuple2 > tuple) throws Exception { Integer score = tuple._2._1; String name = tuple._2._2; return new Tuple2 (score,name); } }); // sortByKey之后,你可以執(zhí)行一個(gè)maptoPair的操作,轉(zhuǎn)換為 System.out.println(score2NameRDD.sortByKey(false).take(2)); } // 學(xué)生成績(jī)改良版 private static void leftOutJoin(JavaSparkContext jsc) { // 模擬數(shù)據(jù) @SuppressWarnings("unchecked") List > names =Arrays.asList( new Tuple2 (1,"jack"), new Tuple2 (2,"rose"), new Tuple2 (3,"tom"), new Tuple2 (4,"趙麗穎")); JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names); List > scores = Arrays.asList( new Tuple2 (1,60), new Tuple2 (4,100), new Tuple2 (2,30)); JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores); // num2scoresRDD num2NamesRDD //JavaPairRDD >> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD); // 注意join,誰(shuí)join誰(shuí),沒區(qū)別,但是leftoutjoin 是有順序的 JavaPairRDD >> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD); JavaPairRDD pairRDD = joinedRDD.mapToPair(new PairFunction >>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call( Tuple2 >> tuple) throws Exception { String name = tuple._2._1; Optional scoreOptional = tuple._2._2; Integer score = null; if(scoreOptional.isPresent()){ score= scoreOptional.get(); }else { score = 0; } return new Tuple2 (score, name); } }); JavaPairRDD sortedRDD = pairRDD.sortByKey(false); sortedRDD.foreach(new VoidFunction >() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2 tuple) throws Exception { if(tuple._1 == 0){ System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成績(jī)0分" ); }else{ System.out.println("姓名:" + tuple._2 + "\t" + "分?jǐn)?shù):" + tuple._1); } } }); } }
如有疑問(wèn)可跟帖討論。歡迎拍磚