政府網(wǎng)站建設(shè)十強百度seo公司興田德潤
RDD相關(guān)知識
RDD介紹
RDD
是Spark
的核心抽象,即 彈性分布式數(shù)據(jù)集(residenta distributed dataset
)。代表一個不可變,可分區(qū),里面元素可并行計算的集合。其具有數(shù)據(jù)流模型的特點:自動容錯,位置感知性調(diào)度和可伸縮性。 在Spark
中,對數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD
、轉(zhuǎn)化已有RDD
以及調(diào)用 RDD
操作進行求值。
?
RDD結(jié)構(gòu)圖
RDD具有五大特性
-
一組分片(
Partition
),即數(shù)據(jù)集的基本組成單位(RDD
是由一系列的partition
組成的)。將數(shù)據(jù)加載為RDD
時,一般會遵循數(shù)據(jù)的本地性(一般一個HDFS
里的block
會加載為一個partition
)。 -
RDD
之間的依賴關(guān)系。依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD
都有依賴。為了容錯(重算,cache
,checkpoint
),也就是說在內(nèi)存中的RDD
操作時出錯或丟失會進行重算。 -
由一個函數(shù)計算每一個分片。
Spark
中的RDD
的計算是以分片為單位的,每個RDD
都會實現(xiàn)compute
函數(shù)以達到這個目的。compute
函數(shù)會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。 -
(可選)如果
RDD
里面存的數(shù)據(jù)是key-value
形式,則可以傳遞一個自定義的Partitioner
進行重新分區(qū)。 -
(可選)
RDD
提供一系列最佳的計算位置,即數(shù)據(jù)的本地性。
RDD之間的依賴關(guān)系
RDD
之間有一系列的依賴關(guān)系,依賴關(guān)系又分為窄依賴和寬依賴。
窄依賴:父RDD
和子RDD
partition
之間的關(guān)系是一對一的?;蛘?code>父RDD一個partition
只對應(yīng)一個子RDD
的partition
情況下的父RDD
和子RDD
partition
關(guān)系是多對一的,也可以理解為沒有觸發(fā)shuffle
。
寬依賴:父RDD
與子RDD
partition
之間的關(guān)系是一對多。 父RDD
的一個分區(qū)的數(shù)據(jù)去到子RDD
的不同分區(qū)里面。也可以理解為觸發(fā)了shuffle
。
特別說明:對于join
操作有兩種情況,如果join
操作的使用每個partition
僅僅和已知的Partition
進行join
,此時的join
操作就是窄依賴;其他情況的join
操作就是寬依賴。
RDD創(chuàng)建
-
從
Hadoop
文件系統(tǒng)(或與Hadoop
兼容的其他持久化存儲系統(tǒng),如Hive
、Cassandra
、HBase
)輸入(例如HDFS
)創(chuàng)建。 -
通過集合進行創(chuàng)建。
算子
算子可以分為Transformation
轉(zhuǎn)換算子和Action
行動算子。 RDD
是懶執(zhí)行的,如果沒有行動操作出現(xiàn),所有的轉(zhuǎn)換操作都不會執(zhí)行。
RDD
直觀圖,如下:
RDD 的 五大特性
-
一組分片(
Partition
),即數(shù)據(jù)集的基本組成單位。對于RDD
來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD
時指定RDD
的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core
的數(shù)目。 -
一個計算每個分區(qū)的函數(shù)。
Spark
中RDD
的計算是以分片為單位的,每個RDD
都會實現(xiàn)compute
函數(shù)以達到這個目的。compute
函數(shù)會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。 -
RDD
之間的依賴關(guān)系。RDD
的每次轉(zhuǎn)換都會生成一個新的RDD
,所以RDD
之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark
可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD
的所有分區(qū)進行重新計算。 -
一個
Partitioner
,即RDD
的分片函數(shù)。當(dāng)前Spark
中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner
,另外一個是基于范圍的RangePartitioner
。只有對于于key-value
的RDD
,才會有Partitioner
,非key-value
的RDD
的Parititioner
的值是None
。Partitioner
函數(shù)不但決定了RDD
本身的分片數(shù)量,也決定了parent RDD Shuffle
輸出時的分片數(shù)量。 -
一個列表,存儲存取每個
Partition
的優(yōu)先位置(preferred location
)。對于一個HDFS
文件來說,這個列表保存的就是每個 Partition 所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark 在進行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。
相關(guān)API介紹
SparkContext
創(chuàng)建;
sc = SparkContext("local", "Simple App")
說明:"local"
是指讓Spark
程序本地運行,"Simple App"
是指Spark
程序的名稱,這個名稱可以任意(為了直觀明了的查看,最好設(shè)置有意義的名稱)。
- 集合并行化創(chuàng)建
RDD
;
data = [1,2,3,4]
rdd = sc.parallelize(data)
collect
算子:在驅(qū)動程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回(注意數(shù)據(jù)集不能過大);
rdd.collect()
- 停止
SparkContext
。
sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用 rdd.collect() 收集 rdd 的內(nèi)容。 rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會詳細(xì)說明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 5.打印 rdd 的內(nèi)容print(result)# 6.停止 SparkContextsc.stop()#********** End **********#
讀取外部數(shù)據(jù)集創(chuàng)建RDD?
編寫讀取本地文件創(chuàng)建Spark RDD
的程序。
相關(guān)知識
為了完成本關(guān)任務(wù),你需要掌握:1.如何讀取本地文件系統(tǒng)中的文件來創(chuàng)建Spark RDD
。
textFile 介紹
PySpark
可以從Hadoop
支持的任何存儲源創(chuàng)建分布式數(shù)據(jù)集,包括本地文件系統(tǒng),HDFS
,Cassandra
,HBase
,Amazon S3
等。Spark
支持文本文件,SequenceFiles
和任何其他Hadoop InputFormat
。
文本文件RDD
可以使用創(chuàng)建SparkContex
的textFile
方法。此方法需要一個 URI
的文件(本地路徑的機器上,或一個hdfs://,s3a://
等 URI),并讀取其作為行的集合。這是一個示例調(diào)用:
distFile = sc.textFile("data.txt")
# -*- coding: UTF-8 -*- from pyspark import SparkContextif __name__ == '__main__':#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 文本文件 RDD 可以使用創(chuàng)建 SparkContext 的t extFile 方法。 #此方法需要一個 URI的 文件(本地路徑的機器上,或一個hdfs://,s3a://等URI), #并讀取其作為行的集合# 2.讀取本地文件,URI為:/root/wordcount.txtrdd = sc.textFile("/root/wordcount.txt")# 3.使用 rdd.collect() 收集 rdd 的內(nèi)容。 #rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會詳細(xì)說明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 4.打印 rdd 的內(nèi)容print(result)# 5.停止 SparkContextsc.stop()#********** End **********#
map
算子
本關(guān)任務(wù):使用Spark
的 map
算子按照相關(guān)需求完成轉(zhuǎn)換操作。
相關(guān)知識
為了完成本關(guān)任務(wù),你需要掌握:如何使用map
算子。
map
將原來RDD
的每個數(shù)據(jù)項通過map
中的用戶自定義函數(shù) f
映射轉(zhuǎn)變?yōu)橐粋€新的元素。
圖中每個方框表示一個RDD
分區(qū),左側(cè)的分區(qū)經(jīng)過自定義函數(shù) f:T->U
映射為右側(cè)的新 RDD
分區(qū)。但是,實際只有等到 Action
算子觸發(fā)后,這個 f
函數(shù)才會和其他函數(shù)在一個 Stage
中對數(shù)據(jù)進行運算。
map 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_map = rdd.map(lambda x: x * 2) print(rdd_map.collect())
輸出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
說明:rdd1
的元素( 1 , 2 , 3 , 4 , 5 , 6
)經(jīng)過 map
算子( x -> x*2
)轉(zhuǎn)換成了 rdd2
( 2 , 4 , 6 , 8 , 10
)。
編程要求
請仔細(xì)閱讀右側(cè)代碼,根據(jù)方法內(nèi)的提示,在Begin - End
區(qū)域內(nèi)進行代碼補充,具體任務(wù)如下:
需求:使用 map
算子,將rdd
的數(shù)據(jù) (1, 2, 3, 4, 5)
按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:
- 偶數(shù)轉(zhuǎn)換成該數(shù)的平方;
- 奇數(shù)轉(zhuǎn)換成該數(shù)的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個1到5的列表Listdata = [1, 2, 3, 4, 5]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 map 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 map 算子完成以上需求rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x)# 6.使用rdd.collect() 收集完成 map 轉(zhuǎn)換的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
mapPartitions
算子
mapPartitions
mapPartitions
函數(shù)獲取到每個分區(qū)的迭代器,在函數(shù)中通過這個分區(qū)整體的迭 代器對整個分區(qū)的元素進行操作。
圖中每個方框表示一個RDD
分區(qū),左側(cè)的分區(qū)經(jīng)過自定義函數(shù) f:T->U
映射為右側(cè)的新RDD
分區(qū)。
mapPartitions 與 map
map
:遍歷算子,可以遍歷RDD
中每一個元素,遍歷的單位是每條記錄。
mapPartitions
:遍歷算子,可以改變RDD
格式,會提高RDD
并行度,遍歷單位是Partition
,也就是在遍歷之前它會將一個Partition
的數(shù)據(jù)加載到內(nèi)存中。
那么問題來了,用上面的兩個算子遍歷一個RDD
誰的效率高? mapPartitions
算子效率高。
mapPartitions 案例
-
def f(iterator): list = [] for x in iterator: list.append(x*2) return listif __name__ == "__main__": sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) partitions = rdd.mapPartitions(f) print(partitions.collect())
輸出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
mapPartitions()
:傳入的參數(shù)是rdd
的 iterator
(元素迭代器),返回也是一個iterator
(迭代器)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list = []for x in iterator:list.append((x, len(x)))return list#********** End **********#
if __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2. 一個內(nèi)容為("dog", "salmon", "salmon", "rat", "elephant")的列表Listdata = ["dog", "salmon", "salmon", "rat", "elephant"]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 mapPartitions 算子,將 rdd 的數(shù)據(jù) ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:將字符串與該字符串的長度組合成一個元組,例如:dog --> (dog,3)salmon --> (salmon,6)"""# 5.使用 mapPartitions 算子完成以上需求partitions = rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 轉(zhuǎn)換的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
filter
算子。
filter
filter
函數(shù)功能是對元素進行過濾,對每個元素應(yīng)用f
函數(shù),返 回值為 true
的元素在RDD
中保留,返回值為false
的元素將被過濾掉。內(nèi)部實現(xiàn)相當(dāng)于生成。
FilteredRDD(this,sc.clean(f))
下面代碼為函數(shù)的本質(zhì)實現(xiàn):
-
def filter(self, f): """ Return a new RDD containing only the elements that satisfy a predicate.>>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] """ def func(iterator): return filter(fail_on_stopiteration(f), iterator) return self.mapPartitions(func, True)
上圖中每個方框代表一個 RDD
分區(qū), T
可以是任意的類型。通過用戶自定義的過濾函數(shù) f
,對每個數(shù)據(jù)項操作,將滿足條件、返回結(jié)果為 true
的數(shù)據(jù)項保留。例如,過濾掉 V2
和 V3
保留了 V1
,為區(qū)分命名為 V’1
。
filter 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_filter = rdd.filter(lambda x: x>2) print(rdd_filter.collect())
輸出:
[1, 2, 3, 4, 5, 6]
[3, 4, 5, 6]
說明:rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] )
經(jīng)過 filter
算子轉(zhuǎn)換成 rdd2( [ 3 ,4 , 5 , 6 ] )
。
使用 filter
算子,將 rdd
中的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8)
按照以下規(guī)則進行過濾,規(guī)則如下:
- 過濾掉
rdd
中的所有奇數(shù)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 filter 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:過濾掉rdd中的奇數(shù)"""# 5.使用 filter 算子完成以上需求rdd_filter = rdd.filter(lambda x: x % 2 == 0)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
flatMap
算子
flatMap
將原來RDD
中的每個元素通過函數(shù)f
轉(zhuǎn)換為新的元素,并將生成的RDD
中每個集合的元素合并為一個集合,內(nèi)部創(chuàng)建:
FlatMappedRDD(this,sc.clean(f))
上圖表示RDD
的一個分區(qū),進行flatMap
函數(shù)操作,flatMap
中傳入的函數(shù)為f:T->U
,T
和U
可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f
轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個RDD
分區(qū),小方框代表一個集合。V1
、V2
、V3
在一個集合作為RDD
的一個數(shù)據(jù)項,可能存儲為數(shù)組或其他容器,轉(zhuǎn)換為V’1
、V’2
、V’3
后,將原來的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成RDD
中的數(shù)據(jù)項。
flatMap 案例
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())
輸出:
[['m'], ['a', 'n']]
['m', 'a', 'n']
flatMap
:將兩個集合轉(zhuǎn)換成一個集合
?
需求:使用 flatMap
算子,將rdd
的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9])
按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:
- 合并
RDD
的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listlist = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(list)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect()) """使用 flatMap 算子,將 rdd 的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:合并RDD的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)([2,3],[4,5],[6]) --> (1,2,3,4,5,6)"""# 5.使用 filter 算子完成以上需求flat_map = rdd.flatMap(lambda x: x)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
distinct
算子distinct
distinct
將RDD
中的元素進行去重操作。上圖中的每個方框代表一個
RDD
分區(qū),通過distinct
函數(shù),將數(shù)據(jù)去重。 例如,重復(fù)數(shù)據(jù)V1
、V1
去重后只保留一份V1
。distinct 案例
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.collect()) distinct = rdd.distinct()
-
輸出
['python', 'python', 'python', 'java', 'java'] ['python', 'java']
-
print(distinct.collect())
sortByKey
算子sortByKey
-
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): if numPartitions is None: numPartitions = self._defaultReducePartitions()memory = self._memory_limit() serializer = self._jrdd_deserializerdef sortPartition(iterator): sort = ExternalSorter(memory * 0.9, serializer).sorted return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))if numPartitions == 1: if self.getNumPartitions() > 1: self = self.coalesce(1) return self.mapPartitions(sortPartition, True)# first compute the boundary of each part via sampling: we want to partition # the key-space into bins such that the bins have roughly the same # number of (key, value) pairs falling into them rddSize = self.count() if not rddSize: return self # empty RDD maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner f\fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect() samples = sorted(samples, key=keyfunc)# we have numPartitions many parts but one of the them has # an implicit boundary bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] for i in range(0, numPartitions - 1)]def rangePartitioner(k): p = bisect.bisect_left(bounds, keyfunc(k)) if ascending: return p else: return numPartitions - 1 - preturn self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
-
說明:
ascending
參數(shù)是指排序(升序還是降序),默認(rèn)是升序。numPartitions
參數(shù)是重新分區(qū),默認(rèn)與上一個RDD
保持一致。keyfunc
參數(shù)是排序規(guī)則。sortByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
key = rdd.sortByKey()
print(key.collect())
-
輸出:
-
[('a', 1), ('a', 2), ('b', 1), ('c', 1)]
-
?需求:使用 sortBy
算子,將 rdd
中的數(shù)據(jù)進行排序(升序)。
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個內(nèi)容為[('B',1),('A',2),('C',3)]的列表ListList = [('B',1),('A',2),('C',3)]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortByKey 算子,將 rdd 的數(shù)據(jù) ('B', 1), ('A', 2), ('C', 3) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:元素排序,例如:[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]"""# 5.使用 sortByKey 算子完成以上需求key = rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 轉(zhuǎn)換的元素print(key.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
mapValues
算子
mapValues
mapValues
:針對(Key, Value)
型數(shù)據(jù)中的 Value
進行 Map
操作,而不對 Key
進行處理。
上圖中的方框代表 RDD
分區(qū)。 a=>a+2
代表對 (V1,1)
這樣的 Key Value
數(shù)據(jù)對,數(shù)據(jù)只對 Value
中的 1
進行加 2
操作,返回結(jié)果為 3
。
mapValues 案例
-
sc = SparkContext("local", "Simple App") data = [("a",1),("a",2),("b",1)] rdd = sc.parallelize(data) values = rdd.mapValues(lambda x: x + 2) print(values.collect())
輸出:
-
[('a', 3), ('a', 4), ('b', 3)]
需求:使用mapValues
算子,將rdd
的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)
按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:
- 偶數(shù)轉(zhuǎn)換成該數(shù)的平方
- 奇數(shù)轉(zhuǎn)換成該數(shù)的立方
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App") # 2.創(chuàng)建一個內(nèi)容為[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表ListList = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 mapValues 算子,將 rdd 的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key,value)的value進行以下操作:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 mapValues 算子完成以上需求values = rdd.mapValues(lambda x: x + 2)# 6.使用rdd.collect() 收集完成 mapValues 轉(zhuǎn)換的元素print(values.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
reduceByKey
算子reduceByKey
reduceByKey
算子,只是兩個值合并成一個值,比如疊加。函數(shù)實現(xiàn)
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
上圖中的方框代表 RDD
分區(qū)。通過自定義函數(shù) (A,B) => (A + B)
,將相同 key
的數(shù)據(jù) (V1,2)
和 (V1,1)
的 value
做加法運算,結(jié)果為( V1,3)
。
reduceByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())
輸出:
[('a', 3), ('b', 1)]
需求:使用 reduceByKey
算子,將 rdd(key-value類型)
中的數(shù)據(jù)進行值累加。
例如:
("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App") # 2.創(chuàng)建一個內(nèi)容為[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表ListList = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List) # 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 reduceByKey 算子,將 rdd 的數(shù)據(jù)[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key-value)的value累加操作,例如:(1,1),(1,1),(1,2) --> (1,4)(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)"""# 5.使用 reduceByKey 算子完成以上需求reduce = rdd.reduceByKey(lambda x,y:x+y)# 6.使用rdd.collect() 收集完成 reduceByKey 轉(zhuǎn)換的元素print(reduce.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
Action
的常用算子
count
count()
:返回 RDD
的元素個數(shù)。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.count())
輸出:
5
first
first()
:返回 RDD
的第一個元素(類似于take(1)
)。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.first())
輸出:
python
take
take(n)
:返回一個由數(shù)據(jù)集的前 n
個元素組成的數(shù)組。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.take(2))
輸出:
['python', 'python']
reduce
reduce()
:通過func
函數(shù)聚集 RDD
中的所有元素,該函數(shù)應(yīng)該是可交換的和關(guān)聯(lián)的,以便可以并行正確計算。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.reduce(lambda x,y:x+y))
輸出:
4
collect
collect()
:在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.collect())
輸出:
[1,1,1,1]
具體任務(wù)如下:
需求1:使用 count
算子,統(tǒng)計下 rdd
中元素的個數(shù);
需求2:使用 first
算子,獲取 rdd
首個元素;
需求3:使用 take
算子,獲取 rdd
前三個元素;
需求4:使用 reduce
算子,進行累加操作;
需求5:使用 collect
算子,收集所有元素。
?
from pyspark import SparkContext
if __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個內(nèi)容為[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表ListList = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.收集rdd的所有元素并print輸出print(rdd.collect())# 5.統(tǒng)計rdd的元素個數(shù)并print輸出print(rdd.count())# 6.獲取rdd的第一個元素并print輸出print(rdd.first())# 7.獲取rdd的前3個元素并print輸出print(rdd.take(3))# 8.聚合rdd的所有元素并print輸出print(rdd.reduce(lambda x,y:x+y))# 9.停止 SparkContextsc.stop()# ********** End **********#