博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark基本操作 java 版
阅读量:7056 次
发布时间:2019-06-28

本文共 7371 字,大约阅读时间需要 24 分钟。

1.map算子

private static void map() {          //创建SparkConf          SparkConf conf = new SparkConf()                  .setAppName("map")                  .setMaster("local");          //创建JavasparkContext          JavaSparkContext sc = new JavaSparkContext(conf);          //构造集合          List
numbers = Arrays.asList(1,2,3,4,5); //并行化集合,创建初始RDD JavaRDD
numberRDD = sc.parallelize(numbers); //使用map算子,将集合中的每个元素都乘以2 JavaRDD
multipleNumberRDD = numberRDD.map(new Function
() { @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); //打印新的RDD multipleNumberRDD.foreach(new VoidFunction
() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); //关闭JavasparkContext sc.close(); }

2.filter算子

private static void filter() {          //创建SparkConf          SparkConf conf = new SparkConf()                      .setAppName("filter")                      .setMaster("local");          //创建JavaSparkContext           JavaSparkContext sc = new JavaSparkContext(conf);          //模拟集合          List
numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); //并行化集合,创建初始RDD JavaRDD
numberRDD = sc.parallelize(numbers); //对集合使用filter算子,过滤出集合中的偶数 JavaRDD
evenNumberRDD = numberRDD.filter(new Function
() { @Override public Boolean call(Integer v1) throws Exception { return v1%2==0; } }); evenNumberRDD.foreach(new VoidFunction
() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); sc.close(); }

3.flatMap算子

Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;

而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:

操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象

操作2:最后将所有对象合并为一个对象

private static void flatMap() {          SparkConf conf = new SparkConf()              .setAppName("flatMap")              .setMaster("local");          JavaSparkContext sc = new JavaSparkContext(conf);          List
lineList = Arrays.asList("hello you","hello me","hello world"); JavaRDD
lines = sc.parallelize(lineList); //对RDD执行flatMap算子,将每一行文本,拆分为多个单词 JavaRDD
words = lines.flatMap(new FlatMapFunction
() { //在这里,传入第一行,hello,you //返回的是一个Iterable
(hello,you) @Override public Iterable
call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); words.foreach(new VoidFunction
() { @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); }

4.groupByKey算子

private static void groupByKey() {          SparkConf conf = new SparkConf()                  .setAppName("groupByKey")                  .setMaster("local");          JavaSparkContext sc = new JavaSparkContext(conf);          List
> scoreList = Arrays.asList( new Tuple2
("class1", 80), new Tuple2
("class2", 90), new Tuple2
("class1", 97), new Tuple2
("class2", 89)); JavaPairRDD
scores = sc.parallelizePairs(scoreList); //针对scoresRDD,执行groupByKey算子,对每个班级的成绩进行分组 //相当于是,一个key join上的所有value,都放到一个Iterable里面去了 JavaPairRDD
> groupedScores = scores.groupByKey(); groupedScores.foreach(new VoidFunction
>>() { @Override public void call(Tuple2
> t) throws Exception { System.out.println("class:" + t._1); Iterator
ite = t._2.iterator(); while(ite.hasNext()) { System.out.println(ite.next()); } } }); }

5.reduceByKey算子

private static void reduceByKey() {          SparkConf conf = new SparkConf()                  .setAppName("reduceByKey")                  .setMaster("local");          JavaSparkContext sc = new JavaSparkContext(conf);          List
> scoreList = Arrays.asList( new Tuple2
("class1", 80), new Tuple2
("class2", 90), new Tuple2
("class1", 97), new Tuple2
("class2", 89)); JavaPairRDD
scores = sc.parallelizePairs(scoreList); //reduceByKey算法返回的RDD,还是JavaPairRDD
JavaPairRDD
totalScores = scores.reduceByKey(new Function2
() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); totalScores.foreach(new VoidFunction
>() { @Override public void call(Tuple2
t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); }

6.sortByKey算子

private static void sortByKey() {          SparkConf conf = new SparkConf()                  .setAppName("sortByKey")                  .setMaster("local");          JavaSparkContext sc = new JavaSparkContext(conf);          List
> scoreList = Arrays.asList( new Tuple2
(78, "marry"), new Tuple2
(89, "tom"), new Tuple2
(72, "jack"), new Tuple2
(86, "leo")); JavaPairRDD
scores = sc.parallelizePairs(scoreList); JavaPairRDD
sortedScores = scores.sortByKey(); sortedScores.foreach(new VoidFunction
>() { @Override public void call(Tuple2
t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); }

7.join算子

join算子用于关联两个RDD,join以后,会根据key进行join,并返回JavaPairRDD。JavaPairRDD的第一个泛型类型是之前两个JavaPairRDD的key类型,因为通过key进行join的。第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型

private static void join() {          SparkConf conf = new SparkConf()                  .setAppName("join")                  .setMaster("local");          JavaSparkContext sc = new JavaSparkContext(conf);          List
> studentList = Arrays.asList( new Tuple2
(1, "tom"), new Tuple2
(2, "jack"), new Tuple2
(3, "marry"), new Tuple2
(4, "leo")); List
> scoreList = Arrays.asList( new Tuple2
(1, 78), new Tuple2
(2, 87), new Tuple2
(3, 89), new Tuple2
(4, 98)); //并行化两个RDD JavaPairRDD
students = sc.parallelizePairs(studentList);; JavaPairRDD
scores = sc.parallelizePairs(scoreList); //使用join算子关联两个RDD //join以后,会根据key进行join,并返回JavaPairRDD //JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的 //第二个泛型类型,是Tuple2
的类型,Tuple2的两个泛型分别为原始RDD的value的类型 JavaPairRDD
> studentScores = students.join(scores); //打印 studentScores.foreach(new VoidFunction
>>() { @Override public void call(Tuple2
> t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:" + t._2._1); System.out.println("student score:" + t._2._2); System.out.println("=========================="); } }); sc.close(); }

更深的方法参见:

转自: 

本文转自whk66668888 51CTO博客,原文链接:http://blog.51cto.com/12597095/2063767

转载地址:http://tfmol.baihongyu.com/

你可能感兴趣的文章
TCP/IP、Http、Socket的区别
查看>>
Linux用户不在sudoers文件中
查看>>
Django Nginx+uwsgi 安装配置
查看>>
Python 连接数据库
查看>>
MyBatis的配置方式
查看>>
5分钟,关于Python 解包,你需要知道的一切
查看>>
卫星定位领域相关基础知识汇总
查看>>
Windows10 VS2017 C++使用crypto++库加密解密(AES)
查看>>
喜鹊医药获近亿元B轮融资,助力创新药物研究开发
查看>>
AnalyticDB for PostgreSQL (原HybridDB for PostgreSQL) 实时数据仓库上手指南
查看>>
分组填报表的制作
查看>>
Elasticsearch数据
查看>>
共享雨伞漂流伞获近亿元战略投资,投资方为蚂蚁金服
查看>>
Cookie(1)
查看>>
Confluence 6 可以自定义的元素
查看>>
你让我怎么说
查看>>
服务器怎么租?阿里云实例规格怎么选?
查看>>
KodExplorer 4.40 发布,权限机制优化
查看>>
WPF 为资源字典 添加事件响应的后台类
查看>>
ASP.NET MVC-异常处理&自定义错误页
查看>>