老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業(yè)提供網(wǎng)站建設(shè)、域名注冊(cè)、服務(wù)器等服務(wù)

sparkRDD算子的創(chuàng)建和使用

spark是大數(shù)據(jù)領(lǐng)域近幾年比較火的編程開發(fā)語(yǔ)言。有眾多的好處,比如速度快,基于內(nèi)存式計(jì)算框架。

公司主營(yíng)業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)公司推出廣南免費(fèi)做網(wǎng)站回饋大家。

不多說(shuō)直接講 spark的RDD 算子的使用。

如果有spark環(huán)境搭建等問(wèn)題,請(qǐng)自行查找資料。本文不做講述。

spark rdd的創(chuàng)建有兩種方式:

1>從集合創(chuàng)建。也就是從父rdd繼承過(guò)來(lái)

2>從外部創(chuàng)建。

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import com.google.common.base.Optional;

import scala.Tuple2;

public class Demo01 {

	public static void main(String[] args) {
		
		SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		
		//map(jsc);
		//filter(jsc);
	    // flatMap(jsc);
		//groupByKey(jsc);
		//reduceByKey(jsc);
		//sortByKey(jsc);
		//join(jsc);
		leftOutJoin(jsc);
		jsc.stop();
	}

	//每一條元素 都乘以2,并且打印
	private static void map(JavaSparkContext jsc) {
		
		//數(shù)據(jù)源
		List lst = Arrays.asList(1,2,3,4,5,6,7,8);
		
		JavaRDD numRDD = jsc.parallelize(lst);
		
		JavaRDD resultRDD = numRDD.map(new Function() {
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer num) throws Exception {
				
				return num * 2;
			}
		});
		
		resultRDD.foreach(new VoidFunction() {
			
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Integer num) throws Exception {		
				System.out.println(num);
			}
		});
		 
	}
	
	// 把集合中的偶數(shù)過(guò)濾出來(lái)
	private static void filter(JavaSparkContext jsc) {
	
		//數(shù)據(jù)源
		List lst = Arrays.asList(1,2,3,4,5,6,7,8);
		
		JavaRDD numRDD = jsc.parallelize(lst);
		
		System.out.println(numRDD.filter(new Function() {
			private static final long serialVersionUID = 1L;

			@Override
			public Boolean call(Integer num) throws Exception {
				
				return num % 2 ==0;
			}
		}).collect());
	}

	//將一行行數(shù)據(jù)的單詞拆分為一個(gè)個(gè)單詞
	private static void flatMap(JavaSparkContext jsc) {
		
		List lst = Arrays.asList("hi tim ","hello girl","hello spark");
		
		JavaRDD lines = jsc.parallelize(lst);
		
		JavaRDD resultRDD = lines.flatMap(new FlatMapFunction() {

			private static final long serialVersionUID = 1L;

			@Override
			public Iterable call(String line) throws Exception {
			
				return Arrays.asList(line.split(" "));
			}
		});
		
		System.out.println(resultRDD.collect());
	}

	// 根據(jù)班級(jí)進(jìn)行分組
	private static void groupByKey(JavaSparkContext jsc) {
		// int ,Integer 
		// scala 里面的類型,沒有像Java這樣分為基本類型和包裝類,因?yàn)閟cala是一種更加強(qiáng)的面向?qū)ο笳Z(yǔ)言,
		//一切皆對(duì)象,里面的類型,也有對(duì)應(yīng)的方法可以調(diào)用,隱式轉(zhuǎn)換
		// 模擬數(shù)據(jù)
		@SuppressWarnings("unchecked")
		List> lst = Arrays.asList(
				new Tuple2("class01", 100),
				new Tuple2("class02",101),
				new Tuple2("class01",199),
				new Tuple2("class02",121),
				new Tuple2("class02",120));
		
		JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
		JavaPairRDD> groupedRDD = cla***DD.groupByKey();
		
		groupedRDD.foreach(new VoidFunction>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2> tuple)
					throws Exception {
				
				String classKey = tuple._1;
				Iterator values = tuple._2.iterator();
				while (values.hasNext()) {
					
					Integer value = values.next();
					
					System.out.println("key:" + classKey + "\t" + "value:" + value);
				}
			}
		});
	}
	
	
	private static void reduceByKey(JavaSparkContext jsc) {
		
		@SuppressWarnings("unchecked")
		List> lst = Arrays.asList(
				new Tuple2("class01", 100),
				new Tuple2("class02",101),
				new Tuple2("class01",199),
				new Tuple2("class02",121),
				new Tuple2("class02",120));
		
		JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
		
		JavaPairRDD resultRDD = cla***DD.reduceByKey(new Function2() {
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				
				return v1 + v2;
			}
		});
		
		resultRDD.foreach(new VoidFunction>() {
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Tuple2 tuple) throws Exception {
				System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2);
				
			}
		});
	}
	// 把學(xué)生的成績(jī)前3名取出來(lái),并打印
	// 1.先排序sortByKey,然后take(3),再foreach
	private static void sortByKey(JavaSparkContext jsc) {
		
		@SuppressWarnings("unchecked")
		List> lst = Arrays.asList(
				new Tuple2("tom", 60),
				new Tuple2("kate",80),
				new Tuple2("kobe",100),
				new Tuple2("馬蓉",4),
				new Tuple2("宋哲",2),
				new Tuple2("白百合",3),
				new Tuple2("隔壁老王",1));
		
		JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
		
		JavaPairRDD pairRDD = cla***DD.mapToPair(new PairFunction,Integer , String>() {
			
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2 call(Tuple2 tuple)
					throws Exception {
				
				return new Tuple2(tuple._2, tuple._1);
			}
		});
		//do no 
		JavaPairRDD sortedRDD = pairRDD.sortByKey();
		JavaPairRDD sortedRDD01 = sortedRDD.mapToPair(new PairFunction, String, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2 call(Tuple2 tuple)
					throws Exception {
				
				return new Tuple2(tuple._2, tuple._1);
			}
		} );
		// take 也是一個(gè)action操作
		List> result = sortedRDD01.take(3);
		System.out.println(result);
	}
	
	
	private static void join(JavaSparkContext jsc) {
		
		// 模擬數(shù)據(jù)
		@SuppressWarnings("unchecked")
		List> names =Arrays.asList(
				new Tuple2(1,"jack"),
				new Tuple2(2,"rose"),
				new Tuple2(3,"tom"),
				new Tuple2(4,"趙麗穎"));
		
		JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);
	
		List> scores = Arrays.asList(
				new Tuple2(1,60),
				new Tuple2(4,100),
				new Tuple2(2,30));	
		
		JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);
		
		JavaPairRDD> joinedRDD = num2scoresRDD.join(num2NamesRDD);
		
		//姓名成績(jī)排序,取前2名
		JavaPairRDD score2NameRDD = joinedRDD.mapToPair(new PairFunction>,Integer, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2 call(
					Tuple2> tuple)
					throws Exception {
				Integer score = tuple._2._1;
				String name = tuple._2._2;
				return new Tuple2(score,name);
			}
		});
		// sortByKey之后,你可以執(zhí)行一個(gè)maptoPair的操作,轉(zhuǎn)換為
		System.out.println(score2NameRDD.sortByKey(false).take(2));
	}
	
	// 學(xué)生成績(jī)改良版
	private static void leftOutJoin(JavaSparkContext jsc) {
		// 模擬數(shù)據(jù)
				@SuppressWarnings("unchecked")
				List> names =Arrays.asList(
						new Tuple2(1,"jack"),
						new Tuple2(2,"rose"),
						new Tuple2(3,"tom"),
						new Tuple2(4,"趙麗穎"));
				
				JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);
			
				List> scores = Arrays.asList(
						new Tuple2(1,60),
						new Tuple2(4,100),
						new Tuple2(2,30));	
				
				JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);
		
				// num2scoresRDD num2NamesRDD
				//JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
				// 注意join,誰(shuí)join誰(shuí),沒區(qū)別,但是leftoutjoin 是有順序的
				JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
				
				JavaPairRDD pairRDD = joinedRDD.mapToPair(new PairFunction>>, Integer, String>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2 call(
							Tuple2>> tuple)
							throws Exception {
						
						String name = tuple._2._1;
						Optional scoreOptional = tuple._2._2;
						Integer score = null;
				         if(scoreOptional.isPresent()){
				        	score= scoreOptional.get();	 
				         }else {
				        	 score = 0;
				         }
						
						return new Tuple2(score, name);
					}
				});
				
				JavaPairRDD sortedRDD = pairRDD.sortByKey(false);
				
				sortedRDD.foreach(new VoidFunction>() {
					private static final long serialVersionUID = 1L;

					@Override
					public void call(Tuple2 tuple)
							throws Exception {
					
						if(tuple._1 == 0){
							System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成績(jī)0分" );
						}else{
							System.out.println("姓名:" + tuple._2 + "\t" + "分?jǐn)?shù):" + tuple._1);
						}
					}
				});
				
	}
}

如有疑問(wèn)可跟帖討論。歡迎拍磚


分享題目:sparkRDD算子的創(chuàng)建和使用
本文地址:http://www.xueling.net.cn/article/jisgpj.html

其他資訊

在線咨詢
服務(wù)熱線
服務(wù)熱線:028-86922220
TOP
主站蜘蛛池模板: 一本之道大象高清特色 | 性色欲情侣网站WWW 欧美精品首页 | 激情中国色综合 | 国产精品激情综合五月天中文字幕 | 男女一区二区三区视频 | 麻豆网站视频 | 国产成人无码精品一区在线观看 | 粉嫩在线一区二区三区视频 | 国产69久久精品成人看动漫 | 美女的大奶 | 无码午夜福利片在线观看 | 免费在线播放黄色 | 18禁美女裸体免费网站 | 国产精品素人一区二区 | 91久草视频 | 国产成人亚洲中文字幕视频 | 伊人久久大香线蕉AV一区 | 国产伦精品一区二区三区四区免费 | 佐山爱中文字幕aⅴ在线 | 国产成人无码aa片免费看 | 亚洲avav天堂av在线网爱情 | 国产一级自拍视频 | 日本欧美在线观看视频 | 日本牲交大片无遮挡 | 男人午夜视频 | 精品久久久久久中文字幕大豆网 | 乱人伦中文字幕成人网站在线 | 亚洲欧美一区二区三区久久 | 国产成人超碰人人澡人人澡 | 国产免费高潮白浆二区三区 | 久久精品成人免费视频 | 4438ⅹ亚洲全国最大色丁香 | 久久久妇女 | 超碰97人人人人人蜜桃 | 国产视频第一页 | 中文av在线播放 | 日日躁夜夜躁狠狠躁超碰97 | 精品欧美一区二区三区久久久 | 九九在线视频免费观看 | 爱爱久久 | 国产精品毛片av一区二区三 |