惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Security Archives - TechRepublic
MongoDB | Blog
MongoDB | Blog
量子位
博客园 - 叶小钗
罗磊的独立博客
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
Hacker News: Ask HN
Hacker News: Ask HN
MyScale Blog
MyScale Blog
GbyAI
GbyAI
Help Net Security
Help Net Security
Y
Y Combinator Blog
Engineering at Meta
Engineering at Meta
Hacker News - Newest:
Hacker News - Newest: "LLM"
Latest news
Latest news
H
Hacker News: Front Page
Blog — PlanetScale
Blog — PlanetScale
雷峰网
雷峰网
Microsoft Azure Blog
Microsoft Azure Blog
P
Proofpoint News Feed
C
CXSECURITY Database RSS Feed - CXSecurity.com
Scott Helme
Scott Helme
S
Schneier on Security
博客园 - 司徒正美
Hugging Face - Blog
Hugging Face - Blog
S
Security @ Cisco Blogs
Recorded Future
Recorded Future
S
Securelist
博客园 - Franky
Application and Cybersecurity Blog
Application and Cybersecurity Blog
A
About on SuperTechFans
N
News and Events Feed by Topic
AI
AI
T
Tenable Blog
N
News | PayPal Newsroom
C
Cybersecurity and Infrastructure Security Agency CISA
V
V2EX - 技术
T
Threat Research - Cisco Blogs
Cisco Talos Blog
Cisco Talos Blog
L
LINUX DO - 热门话题
N
Netflix TechBlog - Medium
S
SegmentFault 最新的问题
T
The Blog of Author Tim Ferriss
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Google Online Security Blog
Google Online Security Blog
S
Security Affairs
Webroot Blog
Webroot Blog
D
Darknet – Hacking Tools, Hacker News & Cyber Security
博客园 - 三生石上(FineUI控件)
C
Comments on: Blog
G
GRAHAM CLULEY

博客园 - 王晓成

(转)Mysql哪些字段适合建立索引 Specified key was too long; max key length is 767 bytes解决方案 (转)并发编程 – Concurrent 用户指南 - 王晓成 协同过滤 (转)K-近邻算法(KNN) 贝叶斯、朴素贝叶斯及调用spark官网 mllib NavieBayes示例 决策树之ID3,C4.5及CART kmeans Spark下的FP-Growth和Apriori scala spark-streaming整合kafka (spark 2.3 kafka 0.10) (转)Java 详解 JVM 工作原理和流程 Scala map与flatMap php 正则表达式 (转发)storm 入门原理介绍 shell :将标准输出及标准错误输出写到指定文件 shell循环(两个日期比较,改变某个特定日期来改变当前比较值) MongoDB基本操作 (转)cenntos 安装mongodb 通过spark sql 将 hdfs上文件导入到mongodb
Spark RDD 操作
王晓成 · 2018-10-28 · via 博客园 - 王晓成

1. Spark RDD 创建操作

 1.1 数据集合

  parallelize 可以创建一个能够并行操作的RDD。其函数定义如下:

def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

  由定义可见有两个参数,第一个参数指定数据集合,第二个参数指定数据分区。

实例:由普通数组创建RDD 

scala> val data=Array(1,2,3,4,5,6,7,8,9) 

data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val rdd=sc.parallelize(data,3)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

1.2 外部数据源

    textFiles 可以通过Hadoop支持的外部数据源(包括本地文件系统、HDFS、Cassandra、HBase等)建立RDD。其定义如下:

def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String]

 第一个参数指定数据路径,第二个参数指定数据分区。

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

 scala> sc.defaultParallelism 

res0: Int = 2

由以上可知,如果第二个参数如果不设置默认为2,默认的并行度最大不超过2.

 实例1:读取本地文件创建RDD

scala> val rdd1=sc.textFile("file:///usr/local/doc/name1.txt")

rdd1: org.apache.spark.rdd.RDD[String] = file:///usr/local/doc/name1.txt MapPartitionsRDD[15] at textFile at <console>:24

scala> rdd1.collect

res7: Array[String] = Array(james, jack, jenny)

 实例2:读取hdfs上的文件创建RDD 

[root@master doc]# hdfs dfs -cat /1.txt

hello world

hello terry

hello james

hello curry

hello bill

hello kact

hello james 

scala> val rdd2=sc.textFile("hdfs://master:9000/1.txt")

rdd2: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/1.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd2.collect

res0: Array[String] = Array(hello world, hello terry, hello james, hello curry, hello bill, hello kact, hello james) 

2. Spark RDD 转换操作

  2.1 map(func)

 对集合的每一个元素运用某个函数操作,然后将结果作为一个新的列表返回。 

 实例:将列表中每个元素值乘以2 

scala> val rdd1=sc.parallelize(1 to 6,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2=rdd1.map(_*2)

rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> rdd2.collect

res0: Array[Int] = Array(2, 4, 6, 8, 10, 12) 

  2.2 filter(func)

 对RDD元素进行过滤,返回值为true的元素组成的一个新的数据集。

实例:返回数据集中的偶数

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd3=rdd1.filter(x=>x%2==0)

rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:25

scala> rdd3.collect

res3: Array[Int] = Array(2, 4, 6, 8)

2.3 flatMap(func)  

对集合中每个元素运用某个函数操作(每个元素会被映射为0到多个输出元素)后,将结果扁平化组成一个新的集合。 

实例2:每个元素映射为多个元素

 

scala> val rdd1=sc.parallelize(1 to 3,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd4=rdd1.flatMap(x=>x to 5)

rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at flatMap at <console>:25

scala> rdd4.collect

res0: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5)

2.4 mapPartitions(func)

 与map类似,map函数是应用到每个元素,而mapPartitions的输入函数是每个分区的数据,把每个分区中的内容作为整体来处理的。 当map里面有比较耗时的初始化操作时,比如连接db,可以采用mapPartitions,它对每个partition操作一次,其函数的输入与输出都是iterator类型。其定义如下:

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

 实例如下:

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={

     | var res=List[(T,T)]()

     | var pre=iter.next

     | while (iter.hasNext) {

     | val cur=iter.next

     | res.::=(pre,cur)

     | pre=cur

     | }

     | res.iterator

     | }

myfunc: [T](iter: Iterator[T])Iterator[(T, T)]

scala> rdd1.mapPartitions(myfunc)

res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <console>:28

scala> res2.collect()

res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))  

2.5 mapPartitionsWithIndex(func)

与 mapPartitions 类似,其传入的函数除了数据集,还需一个分区的index.其定义如下:

private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

实例如下:

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 

scala> val mapReslut=rdd1.mapPartitionsWithIndex{

     | (index,iterator)=>{

     | val list=iterator.toList

     | list.map(x=>x +"->"+index).iterator

     | }

     | }

mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:25

scala> mapReslut.collect

res6: Array[String] = Array(1->0, 2->0, 3->0, 4->1, 5->1, 6->1, 7->2, 8->2, 9->2) 

 2.6 sample(withReplacement, fraction, seed)

 根据给定的随机种子seed,随机抽样出数量为fraction的数据。其定义如下:

def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

 withReplacement取回的数据是否放回抽样,fraction:比例,0.1表示10%,seed:随机种子,相同的seed得到的随机序列一样。

 实例: 

scala> val rdd1=sc.parallelize(1 to 1000,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd1.sample(false,0.1,1).count

res1: Long = 116

 2.7 union(otherDataset)

  两个数据集合并,不去重,返回一个新的数据集,即所有旧的rdd的partition,直接移到新的rdd,新rdd 的Partition数量为旧rdd的partition数量的和。

 实例:

scala> val rdd1=sc.parallelize(1 to 6,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd2=rdd1.map(_*2)

rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:25

scala> val rdd3=rdd1.union(rdd2)

rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at <console>:27

scala> rdd3.collect

res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 2, 4, 6, 8, 10, 12)

 查看新rdd的partition数量

scala> rdd3.partitions.length 

res3: Int = 6

 2.8 intersection(otherDataset)

 数据交集,相交的数据组成一个新的数据集返回。

实例:

scala> val rdd4=rdd1.intersection(rdd2)

rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at intersection at <console>:27

scala> rdd4.collect

res3: Array[Int] = Array(6, 4, 2)

2.9 distinct([numPartitions]))

去除两个数据集的重复数据,返回去重后的数据集。

 实例: 

scala> val rdd5=rdd1.union(rdd2).distinct 

rdd5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at distinct at <console>:27

scala> rdd5.collect

res4: Array[Int] = Array(6, 12, 1, 8, 2, 3, 4, 10, 5)

2.10 groupByKey([numPartitions]))

分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Iterable[V])对的数据集。 

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] 

 实例:

scala> val rdd=sc.parallelize(Array((1,2),(1,3),(1,4),(2,3),(2,4),(2,5)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd10=rdd.groupByKey()

rdd10: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupByKey at <console>:25

scala> rdd10.collect

res0: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2, 3, 4)), (2,CompactBuffer(3, 4, 5))) 

 2.11 reduceByKey(func, [numPartitions])

分组聚合操作, 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集。K相同的值,都被使用相同的reduce函数聚合在一起。

实例:相同的key的值加起来 

scala> val rdd11=rdd.reduceByKey(_+_)

rdd11: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25

scala> rdd11.collect

res3: Array[(Int, Int)] = Array((1,9), (2,12))

2.12 aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值.

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]  

 aggregateByKey函数的使用,需为它提供以下三个参数:

    1.zeroValue:U,初始值,即聚合的初始值

    2.seqOp: (U, V) => U,seq操作符, 描述如何将V合并到数据结构U

    3.combOp: (U, U) => U,comb操作符,描述如何合并两个数据结构U。

实例1:                            

scala> val rdd=sc.parallelize(List((1,3),(1,2),(1,4),(2,3)),2)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val rdd12=rdd.aggregateByKey(0)(math.max(_,_),_+_)

rdd12: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at aggregateByKey at <console>:25

scala> rdd12.collect

res1: Array[(Int, Int)] = Array((2,3), (1,7))

 分析过程:

分为两个分区,(1,3),(1,2)会落入0分区,(1,4),(2,3)落入1分区,每个分区分开计算。

初始值为0,所以第一步不对列表值产生影响。

 seqOP:函数是相同key取最大值,0分区的结果为(1,3),1分区的结果为(1,4),(2,3)

 combOP:函数是相同的key的value进行相加,结果为(1,3+4)=>(1,7),(2,3) 

 实例2:

scala> val rdd=sc.parallelize(List((1,3),(1,2),(1,4),(2,3)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd12=rdd.aggregateByKey(0)(math.max(_,_),_+_)

rdd12: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at aggregateByKey at <console>:25

scala> rdd12.collect

res2: Array[(Int, Int)] = Array((1,9), (2,3))

分析过程:

分为三个分区,(1,3)会落入0分区,(1,2)进入1分区,(1,4),(2,3)落入2分区,每个分区分开计算。

初始值为0,所以第一步不对列表值产生影响。

seqOP:函数是相同key取最大值,0分区的结果为(1,3),1分区的结果为(1,2),2分区的结果为(1,4),(2,3)

combOP:函数是相同的key的value进行相加,结果为(1,3+2+4)=>(1,9),(2,3) 

 2.13 combineByKey

对RDD中的数据集按照key进行聚合操作。聚合操作通过自定义函数提供。  

def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

三个参数解释:

 createCombiner:在遍历(k,v)时,如果combineByKey第一次遇到值为k的key(类型K),那么将这个(k,v)调用combineCombiner函数,将v转换为C.

 mergeValue:在遍历(k,v)时,如果combineByKey不是第一次遇到值为k的key (类型K),那么将这个(k,v)调用mergeValue函数,它的作用是将v累加到聚合对象(类型为C)中。

mergeCombiners:combineByKey是在分布式环境下执行的,RDD的生个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合。

实例如下:

scala> val rdd=sc.parallelize(Array((1,1.0),(1,2.0),(1,3.0),(2,4.0),(2,5.0),(2,6.0)),2)

rdd: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val combine=rdd.combineByKey(createCombiner = (v: Double) => (v: Double, 1),

     | mergeValue = (c: (Double, Int), v: Double) => (c._1 + v, c._2 + 1),

     | mergeCombiners = (c1: (Double, Int), c2: (Double, Int)) => (c1._1 + c2._1, c1._2 + c2._2),

     |  numPartitions = 2)

combine: org.apache.spark.rdd.RDD[(Int, (Double, Int))] = ShuffledRDD[2] at combineByKey at <console>:25

scala> combine.collect

res0: Array[(Int, (Double, Int))] = Array((2,(15.0,3)), (1,(6.0,3)))       

 2.14 sortByKey([ascending], [numPartitions])

按key对RDD进行排序,其定义如下: 

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

由上可知,该函数有两个参数,第一个参数是排序方式,默认是true(升序),第二个参数可以指定分区,即并行任务数。另外,排序的key需可排序的Ordering

实例: 

scala> val rdd=sc.parallelize(Array((1,3),(2,6),(2,3),(1,2),(1,8),(2,9)))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd.sortByKey()

res6: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at sortByKey at <console>:26

scala> res6.collect

res7: Array[(Int, Int)] = Array((1,3), (1,2), (1,8), (2,6), (2,3), (2,9))

2.15 join(otherDataset, [numPartitions])

 连接操作,将输入数据集(K, V) 和另一数据集 (K, W)进行join, 返回两个集合匹配的(K, (V, W))集合对。即该操作过滤掉不匹配的key,然后返回相同K的V,W集合进行笛卡尔积操作。其定义如下:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}

实例:

scala> val rdd=sc.parallelize(Array((1,2),(1,3),(2,4),(3,6)))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val rdd2=sc.parallelize(Array((1,2),(1,5),(2,6)))

rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val rdd14=rdd.join(rdd2)

rdd14: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[13] at join at <console>:27

scala> rdd14.collect

res2: Array[(Int, (Int, Int))] = Array((2,(4,6)), (1,(2,2)), (1,(2,5)), (1,(3,2)), (1,(3,5)))

由此可见,不匹配的key(3,6)未显示。与sql类似,还有左连接、右连接及全连接操作函数:leftOuterJoin、rightOuterJoin及fullOuterJoin。

 2.16 cogroup(otherDataset, [numPartitions])  

类似于join,像上面Join的定义,其底层使用了cogroup. 输入数据集(K, V) 和另一数据集 (K, W)进行cogroup, 将返回格式为(K, (Iterable<V>, Iterable<W>))的数据集,与join不同的是,两个集合中不匹配的Key,也会返回。 

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))]

 实例:

scala> val rdd=sc.parallelize(Array((1,2),(1,3),(2,4),(3,6)))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2=sc.parallelize(Array((1,2),(1,5),(2,6)))

rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd15=rdd.cogroup(rdd2)

rdd15: org.apache.spark.rdd.RDD[(Int, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[3] at cogroup at <console>:27

scala> rdd15.collect

res0: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((2,(CompactBuffer(4),CompactBuffer(6))), (1,(CompactBuffer(2, 3),CompactBuffer(2, 5))), (3,(CompactBuffer(6),CompactBuffer()))) 

2.17 cartesian(otherDataset)

两个集合进行笛卡尔积

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] 

实例:

scala> val rdd1=sc.parallelize(Array(1,2,3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> val rdd2=sc.parallelize(Array(4,5))

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24

scala> val rdd17=rdd1.cartesian(rdd2)

rdd17: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[20] at cartesian at <console>:27

scala> rdd17.collect

res7: Array[(Int, Int)] = Array((1,4), (1,5), (2,4), (3,4), (2,5), (3,5))

2.18 pipe(command, [envVars])

 可以通过pipe使用shell命令来处理RDD

实例:

scala>  val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd2=rdd1.pipe("head -n 1").collect

rdd2: Array[String] = Array(1, 4, 7)  

 2.19 coalesce(numPartitions)

 coalesce(numPartitions: Int)将RDD进行重分区,默认只能减少分区,默认不进行shuffle,当开启shuffle时,可以扩大分区。其定义如下:

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]

实例:减少分区

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd1.partitions.length

res1: Int = 3

scala> val rdd2=rdd1.coalesce(2)

rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[4] at coalesce at <console>:25

scala> rdd2.partitions.length

res4: Int = 2

实例:扩大分区 

scala> val rdd2=rdd1.coalesce(5)

rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:25

scala> rdd2.partitions.length

res0: Int = 3

由上面实例可见,不开启shuffle是不能扩大分区的。

实例:开启 shuffle后,可以扩大分区数

scala> val rdd3=rdd1.coalesce(5,true)

rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at coalesce at <console>:25

scala> rdd3.partitions.length

res1: Int = 5 

 2.20 repartition(numPartitions)

对RDD进行重分区,可以增加分区,也可减少分区,它创建新的分区,会进行shuffer操作 

实例: 

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val rdd3=rdd1.repartition(5)

rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at repartition at <console>:25

scala> rdd3.partitions.length

res6: Int = 5

2.21 repartitionAndSortWithinPartitions(partitioner)

根据给定的分区程序对RDD进行重新分区,并在每个生成的分区内按键对记录进行排序。 这比调用重新分区,它要比使用repartition And sortByKey 效率高,这是由于它的排序是在shuffle过程中进行,一边shuffle,一边排序。定义如下:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

使用repartitionAndSortWithinPartitions时,需要自己传入一个分区器,这个分区器可以使用系统提供的,也可以是自定义的,以下实例我们使用系统的HashPartitioner 

实例: 

scala> import org.apache.spark.HashPartitioner 

import org.apache.spark.HashPartitioner

scala> val rdd=sc.parallelize(Array(2,4,8,6,23,12,123,98,18))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:25

scala> rdd.zipWithIndex().repartitionAndSortWithinPartitions(new HashPartitioner(1)).foreach(println)

(2,0)

(4,1)

(6,3)

(8,2)

(12,5)

(18,8)

(23,4)

(98,7)

(123,6) 

3. 转换操作

3.1 reduce(func) 

对数据集中每个元素执行指定的聚集函数(有两个输入参数,一个返回值) ,这个函数必须是可交换的和组合的。

实例: 

scala>  val arrays=Array(1,2,3,4,5,6,7,8,9,10);

arrays: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala>  val rdd=sc.parallelize(arrays,3)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> val sum=rdd.reduce(_+_)

sum: Int = 55

3.2 collect()

将数据集的内容以Array数据的形式返回

实例:

scala> val rdd=sc.parallelize(1 to 9,3)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.collect

res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

 3.3 count()

返回数据集元素的个数。

实例:

scala> rdd.count

res0: Long = 9

3.4 first()

返回集合中的第一个元素。

实例: 

scala> val rdd=sc.parallelize(1 to 9,3) 

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.first

res2: Int = 1

3.5 take(n)

返回集合中前n个元素的数组。

3.6 takeSample(withReplacement, num, [seed])

返回包含随机的num个元素的数组。第一个参数withReplacement是抽样时是否放回,第二个参数num会精确指定抽样数,而不是比例.第三个参数seed是随机种子。

实例:

scala> rdd.take(3)

res3: Array[Int] = Array(1, 2, 3) 

3.7  takeOrdered(n, [ordering]) 

按自然顺序或者自定义比较器返回第1到n元素的数组

实例:

scala> rdd.takeOrdered(5)

res6: Array[Int] = Array(1, 2, 3, 4, 5)  

 3.8 saveAsTextFile(path) 

把数据集中的元素转换为文本文件写到指定的目录(本地系统、HDFS或者其它hadoop支持的文件系统).Spark将每个元素调用toString方法转换为文本文件中的一行。

 实例:将数据转为文本存储到hdfs中。

scala> val rdd=sc.parallelize(1 to 9,3)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24  

scala> rdd.saveAsTextFile("hdfs://master:9000/number")

hdfs中查看是否有文件存在,并进行3个分区存储  

[root@master ~]# hdfs dfs -ls /number.txt

Found 4 items

-rw-r--r--   3 root supergroup          0 2018-10-29 14:45 /number.txt/_SUCCESS

-rw-r--r--   3 root supergroup          6 2018-10-29 14:45 /number.txt/part-00000

-rw-r--r--   3 root supergroup          6 2018-10-29 14:45 /number.txt/part-00001

-rw-r--r--   3 root supergroup          6 2018-10-29 14:45 /number.txt/part-00002

[root@master ~]# hdfs dfs -ls /number

Found 4 items

-rw-r--r--   3 root supergroup          0 2018-10-29 14:47 /number/_SUCCESS

-rw-r--r--   3 root supergroup          6 2018-10-29 14:47 /number/part-00000

-rw-r--r--   3 root supergroup          6 2018-10-29 14:47 /number/part-00001

-rw-r--r--   3 root supergroup          6 2018-10-29 14:47 /number/part-00002

[root@master ~]# hdfs dfs -cat /number/part-00000

1

2

3

3.9 saveAsSequenceFile(path)  

 类型于saveAsTextFile,用于将RDD中元素转换为 hadoop SequenceFile保存到指定的目录(本地系统、HDFS或者其它hadoop支持的文件系统)。  

在RDD的键值对实现了hadoop的Writable接口是可用的,在Scala中,即类型可以隐式转为Writable(Spark可转换的基本类型如Int,Double,String等)  

 3.10 saveAsObjectFile

 用于将RDD中的元素序列化成对象,存储到文件中。

 3.11 countByKey()

对于类型 (K, V)的RDD. 返回一个 (K, Int)的map,Int为K的个数。

实例: 

scala> val rdd=sc.parallelize(Array((1,2),(1,3),(1,5),(2,4),(2,6),(3,8)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd.countByKey()

res2: scala.collection.Map[Int,Long] = Map(3 -> 1, 1 -> 3, 2 -> 2)     

3.12 foreach(func)

对数据集中每个元素执行func函数。

实例:

scala> val rdd=sc.parallelize(1 to 6,3)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd.foreach(x=>println(x*2))

2

4

10

12

6

8

参考文献:

http://spark.apache.org/docs/latest/rdd-programming-guide.html

Spark MLlib机器学习 -黄美灵

 版权声明:本文为博主原创文章,转载请注明出处:https://www.cnblogs.com/abcdwxc/p/9867475.html