博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark基础脚本入门实践3:Pair RDD开发
阅读量:5290 次
发布时间:2019-06-14

本文共 3749 字,大约阅读时间需要 12 分钟。

Pair RDD转化操作

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))

//reduceByKey,通过key来做合并

val r1 = rdd.reduceByKey((x,y)=>x+y).collect()
val r1 = rdd.reduceByKey(_+_).collect()
res0: Array[(Int, Int)] = Array((1,2), (3,10))

val r1 = rdd.reduceByKey((x,y)=>(x max y)).collect()

r1: Array[(Int, Int)] = Array((1,2), (3,6))

 

//groupByKey,通过key来做分组

val r2=rdd.groupByKey().collect()

r2: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2)), (3,CompactBuffer(4, 6)))

//mapValues,对每个值应用函数,不改变键

val r3=rdd.mapValues(x => x+10 ).collect()

r3: Array[(Int, Int)] = Array((1,12), (3,14), (3,16))

//flatMapValues,对每个值应用一个返回迭代器函数,每个元素生成一个键值对

val r4=rdd.flatMapValues(x => x to 10 ).collect()

r4: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (1,6), (1,7), (1,8), (1,9), (1,10), (3,4), (3,5), (3,6), (3,7), (3,8), (3,9), (3,10), (3,6), (3,7), (3,8), (3,9), (3,10))

//keys,返回键

val r5=rdd.keys.collect()

//values,仅返回值

val r5=rdd.values.collect()

r5: Array[Int] = Array(2, 4, 6)

//sortByKey 排序后返回

scala> val r6=rdd.sortBy(x => x, false).collect()
r6: Array[(Int, Int)] = Array((3,6), (3,4), (1,2))

scala> val r6=rdd.sortBy(x => x, true).collect()

r6: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))

需要注意的是,Pair RDD也是RDD,也可以使用RDD函数
比如
scala> val r7=rdd.filter{case (key,value)=>value<20}.collect()
r7: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))

scala> val r7=rdd.filter{case (key,value)=>value<5}.collect()

r7: Array[(Int, Int)] = Array((1,2), (3,4))

scala> val r7=rdd.filter{case (k,v)=>v<20}.collect()

r7: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))

统计单词重复的次数

方法1:

val input = sc.textFile("file:///usr/local/spark/README.md")
val words = input.flatMap(x => x.split(" "))
val count = words.countByValue()

方法2:用传统的map-reduce

//我们会发现x =>(x,1)是把每一个单值,转换成了一个数组,数组的值都是1,非常精妙
val count1 = words.map(x =>(x,1)).collect()
count1: Array[(String, Int)] = Array((#,1), (Apache,1), (Spark,1), ("",1), (Spark,1), (is,1), (a,1), (fast,1), (and,1), (general,1), (cluster,1), (computing,1), (system,1), (for,1), (Big,1), (Data.,1), (It,1), (provides,1), (high-level,1), (APIs,1), (in,1), (Scala,,1), (Java,,1), (Python,,1), (and,1), (R,,1), (and,1), (an,1), (optimized,1), (engine,1), (that,1), (supports,1), (general,1), (computation,1), (graphs,1), (for,1), (data,1), (analysis.,1), (It,1), (also,1), (supports,1), (a,1), (rich,1), (set,1), (of,1), (higher-level,1), (tools,1), (including,1), (Spark,1), (SQL,1), (for,1), (SQL,1), (and,1), (DataFrames,,1), (MLlib,1), (for,1), (machine,1), (learning,,1), (GraphX,1), (for,1), (graph,1), (processing,,1), (and,1), (Spark,1), (Streaming,1), (for,1), (stream,1), (processing.,1)...
//reduceByKey的作用是把上一步做的数组按照key来合并累加
val count2 = words.map(x =>(x,1)).reduceByKey((x,y)=>x+y).collect
res1: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (are...

//如果是统计单词数:

scala> val count1 = words.map(x =>(x,1))
count1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[55] at map at <console>:28

scala> count1.count

res3: Long = 568

 

转载于:https://www.cnblogs.com/starcrm/p/7028247.html

你可能感兴趣的文章
VS2008+Qt 项目目录编辑配置
查看>>
【动态规划DP】传娃娃-C++
查看>>
LOJ.121.[离线可过]动态图连通性(线段树分治 按秩合并)
查看>>
201521123072 结对编程
查看>>
最长上升子序列
查看>>
maven 依赖、聚合和继承 (转)
查看>>
selinux介绍/状态查看/开启/关闭
查看>>
DockerAPI版本不匹配的问题
查看>>
Leetcode: Ugly Number II
查看>>
项目立项管理
查看>>
(没时间维护,已下架)博客园第三方客户端-i博客园正式发布App Store
查看>>
map使用实例
查看>>
关于ShapeDrawable应用的一些介绍(上)
查看>>
洛谷 P3984 高兴的津津
查看>>
洛谷 P1308 统计单词数
查看>>
使用GitHub
查看>>
1.25回溯 n皇后问题,素数环,困难的串
查看>>
大量界面刷新时手动Dispose也是有必要的
查看>>
机电传动控制第三周学习笔记
查看>>
删除.gitignore中的在version control中的文件
查看>>