国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

政府網(wǎng)站建設(shè)十強百度seo公司興田德潤

政府網(wǎng)站建設(shè)十強,百度seo公司興田德潤,微信網(wǎng)站如何制作軟件,做年報的網(wǎng)站怎么登不上去了RDD相關(guān)知識 RDD介紹 RDD 是Spark的核心抽象,即 彈性分布式數(shù)據(jù)集(residenta distributed dataset)。代表一個不可變,可分區(qū),里面元素可并行計算的集合。其具有數(shù)據(jù)流模型的特點:自動容錯,位置…

RDD相關(guān)知識

RDD介紹

RDDSpark核心抽象,即 彈性分布式數(shù)據(jù)集residenta distributed dataset)。代表一個不可變,可分區(qū),里面元素可并行計算的集合。其具有數(shù)據(jù)流模型的特點:自動容錯,位置感知性調(diào)度和可伸縮性。Spark中,對數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD轉(zhuǎn)化已有RDD以及調(diào)用 RDD操作進行求值。
?

RDD結(jié)構(gòu)圖

RDD具有五大特性
  1. 一組分片Partition),即數(shù)據(jù)集的基本組成單位(RDD是由一系列的partition組成的)。將數(shù)據(jù)加載為RDD時,一般會遵循數(shù)據(jù)的本地性(一般一個HDFS里的block會加載為一個partition)。

  2. RDD之間的依賴關(guān)系。依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD都有依賴。為了容錯(重算,cachecheckpoint),也就是說在內(nèi)存中的RDD操作時出錯或丟失會進行重算。

  3. 由一個函數(shù)計算每一個分片。Spark中的RDD的計算是以分片為單位的,每個RDD都會實現(xiàn)compute函數(shù)以達到這個目的。compute函數(shù)會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。

  4. (可選)如果RDD里面存的數(shù)據(jù)是key-value形式,則可以傳遞一個自定義的Partitioner進行重新分區(qū)。

  5. (可選)RDD提供一系列最佳的計算位置,即數(shù)據(jù)的本地性。

RDD之間的依賴關(guān)系

RDD之間有一系列的依賴關(guān)系,依賴關(guān)系又分為窄依賴和寬依賴。

窄依賴父RDD子RDD partition之間的關(guān)系是一對一的?;蛘?code>父RDD一個partition只對應(yīng)一個子RDDpartition情況下的父RDD子RDD partition關(guān)系是多對一的,也可以理解為沒有觸發(fā)shuffle

寬依賴父RDD子RDD partition之間的關(guān)系是一對多。 父RDD的一個分區(qū)的數(shù)據(jù)去到子RDD的不同分區(qū)里面。也可以理解為觸發(fā)了shuffle

特別說明:對于join操作有兩種情況,如果join操作的使用每個partition僅僅和已知的Partition進行join,此時的join操作就是窄依賴;其他情況的join操作就是寬依賴。

RDD創(chuàng)建
  1. Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲系統(tǒng),如Hive、Cassandra、HBase)輸入(例如HDFS創(chuàng)建。

  2. 通過集合進行創(chuàng)建。

算子

算子可以分為Transformation 轉(zhuǎn)換算子和Action 行動算子RDD懶執(zhí)行的,如果沒有行動操作出現(xiàn),所有的轉(zhuǎn)換操作都不會執(zhí)行。

RDD直觀圖,如下:

RDD 的 五大特性
  • 一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

  • 一個計算每個分區(qū)的函數(shù)。SparkRDD的計算是以分片為單位的,每個 RDD都會實現(xiàn) compute 函數(shù)以達到這個目的。compute函數(shù)會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。

  • RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark 可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進行重新計算。

  • 一個Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner,另外一個是基于范圍的 RangePartitioner。只有對于于key-valueRDD,才會有Partitioner,非key-valueRDDParititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。

  • 一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個 Partition 所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark 在進行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。

相關(guān)API介紹
  • SparkContext創(chuàng)建;
  1. sc = SparkContext("local", "Simple App")

說明:"local" 是指讓Spark程序本地運行,"Simple App" 是指Spark程序的名稱,這個名稱可以任意(為了直觀明了的查看,最好設(shè)置有意義的名稱)。

  • 集合并行化創(chuàng)建RDD
  1. data = [1,2,3,4]
  2. rdd = sc.parallelize(data)
  • collect算子:在驅(qū)動程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回(注意數(shù)據(jù)集不能過大);
  1. rdd.collect()
  • 停止SparkContext。
  1. sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用 rdd.collect() 收集 rdd 的內(nèi)容。 rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會詳細(xì)說明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 5.打印 rdd 的內(nèi)容print(result)# 6.停止 SparkContextsc.stop()#********** End **********#

讀取外部數(shù)據(jù)集創(chuàng)建RDD?

編寫讀取本地文件創(chuàng)建Spark RDD的程序。

相關(guān)知識

為了完成本關(guān)任務(wù),你需要掌握:1.如何讀取本地文件系統(tǒng)中的文件來創(chuàng)建Spark RDD。

textFile 介紹

PySpark可以從Hadoop支持的任何存儲源創(chuàng)建分布式數(shù)據(jù)集,包括本地文件系統(tǒng),HDFSCassandraHBaseAmazon S3 等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat

文本文件RDD可以使用創(chuàng)建SparkContextextFile方法。此方法需要一個 URI的文件(本地路徑的機器上,或一個hdfs://,s3a:// 等 URI),并讀取其作為行的集合。這是一個示例調(diào)用:

  1. distFile = sc.textFile("data.txt")
    # -*- coding: UTF-8 -*-
    from pyspark import SparkContextif __name__ == '__main__':#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 文本文件 RDD 可以使用創(chuàng)建 SparkContext 的t extFile 方法。
    #此方法需要一個 URI的 文件(本地路徑的機器上,或一個hdfs://,s3a://等URI),
    #并讀取其作為行的集合# 2.讀取本地文件,URI為:/root/wordcount.txtrdd = sc.textFile("/root/wordcount.txt")# 3.使用 rdd.collect() 收集 rdd 的內(nèi)容。 
    #rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會詳細(xì)說明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 4.打印 rdd 的內(nèi)容print(result)# 5.停止 SparkContextsc.stop()#********** End **********#

map 算子

本關(guān)任務(wù):使用Sparkmap 算子按照相關(guān)需求完成轉(zhuǎn)換操作。

相關(guān)知識

為了完成本關(guān)任務(wù),你需要掌握:如何使用map算子。

map

將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù) f 映射轉(zhuǎn)變?yōu)橐粋€新的元素。

圖中每個方框表示一個RDD 分區(qū),左側(cè)的分區(qū)經(jīng)過自定義函數(shù) f:T->U 映射為右側(cè)的新 RDD 分區(qū)。但是,實際只有等到 Action 算子觸發(fā)后,這個 f 函數(shù)才會和其他函數(shù)在一個 Stage 中對數(shù)據(jù)進行運算。

map 案例
  1. sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    rdd_map = rdd.map(lambda x: x * 2)
    print(rdd_map.collect())

輸出:

[1, 2, 3, 4, 5, 6] [2, 4, 6, 8, 10, 12]

說明:rdd1 的元素( 1 , 2 , 3 , 4 , 5 , 6 )經(jīng)過 map 算子( x -> x*2 )轉(zhuǎn)換成了 rdd2 ( 2 , 4 , 6 , 8 , 10 )。

編程要求

請仔細(xì)閱讀右側(cè)代碼,根據(jù)方法內(nèi)的提示,在Begin - End區(qū)域內(nèi)進行代碼補充,具體任務(wù)如下:

需求:使用 map 算子,將rdd的數(shù)據(jù) (1, 2, 3, 4, 5) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:

  • 偶數(shù)轉(zhuǎn)換成該數(shù)的平方;
  • 奇數(shù)轉(zhuǎn)換成該數(shù)的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個1到5的列表Listdata = [1, 2, 3, 4, 5]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 map 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 map 算子完成以上需求rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x)# 6.使用rdd.collect() 收集完成 map 轉(zhuǎn)換的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

mapPartitions算子

mapPartitions

mapPartitions函數(shù)獲取到每個分區(qū)的迭代器,在函數(shù)中通過這個分區(qū)整體的迭 代器對整個分區(qū)的元素進行操作。

圖中每個方框表示一個RDD分區(qū),左側(cè)的分區(qū)經(jīng)過自定義函數(shù) f:T->U 映射為右側(cè)的新RDD分區(qū)。

mapPartitions 與 map

map:遍歷算子,可以遍歷RDD中每一個元素,遍歷的單位是每條記錄。

mapPartitions:遍歷算子,可以改變RDD格式,會提高RDD并行度,遍歷單位是Partition,也就是在遍歷之前它會將一個Partition的數(shù)據(jù)加載到內(nèi)存中。

那么問題來了,用上面的兩個算子遍歷一個RDD誰的效率高? mapPartitions算子效率高。

mapPartitions 案例
  1. def f(iterator):
    list = []
    for x in iterator:
    list.append(x*2)
    return listif __name__ == "__main__":
    sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    partitions = rdd.mapPartitions(f)
    print(partitions.collect())

輸出:


[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]

mapPartitions():傳入的參數(shù)是rdditerator(元素迭代器),返回也是一個iterator(迭代器)。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list = []for x in iterator:list.append((x, len(x)))return list#********** End **********#
if __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2. 一個內(nèi)容為("dog", "salmon", "salmon", "rat", "elephant")的列表Listdata = ["dog", "salmon", "salmon", "rat", "elephant"]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 mapPartitions 算子,將 rdd 的數(shù)據(jù) ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:將字符串與該字符串的長度組合成一個元組,例如:dog  -->  (dog,3)salmon   -->  (salmon,6)"""# 5.使用 mapPartitions 算子完成以上需求partitions = rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 轉(zhuǎn)換的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

filter算子。

filter

filter 函數(shù)功能是對元素進行過濾,對每個元素應(yīng)用f函數(shù),返 回值為 true的元素在RDD中保留,返回值為false的元素將被過濾掉。內(nèi)部實現(xiàn)相當(dāng)于生成。

  1. FilteredRDD(this,sc.clean(f))

下面代碼為函數(shù)的本質(zhì)實現(xiàn):

  1. def filter(self, f):
    """
    Return a new RDD containing only the elements that satisfy a predicate.>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
    >>> rdd.filter(lambda x: x % 2 == 0).collect()
    [2, 4]
    """
    def func(iterator):
    return filter(fail_on_stopiteration(f), iterator)
    return self.mapPartitions(func, True)

上圖中每個方框代表一個 RDD 分區(qū), T 可以是任意的類型。通過用戶自定義的過濾函數(shù) f,對每個數(shù)據(jù)項操作,將滿足條件、返回結(jié)果為 true 的數(shù)據(jù)項保留。例如,過濾掉 V2V3 保留了 V1,為區(qū)分命名為 V’1。

filter 案例
  1. sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    rdd_filter = rdd.filter(lambda x: x>2)
    print(rdd_filter.collect())

輸出:

  1. [1, 2, 3, 4, 5, 6]
  2. [3, 4, 5, 6]

說明:rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] ) 經(jīng)過 filter 算子轉(zhuǎn)換成 rdd2( [ 3 ,4 , 5 , 6 ] )。

使用 filter 算子,將 rdd 中的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8) 按照以下規(guī)則進行過濾,規(guī)則如下:

  • 過濾掉rdd中的所有奇數(shù)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 filter 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:過濾掉rdd中的奇數(shù)"""# 5.使用 filter 算子完成以上需求rdd_filter = rdd.filter(lambda x: x % 2 == 0)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

flatMap算子

flatMap

將原來RDD中的每個元素通過函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD中每個集合的元素合并為一個集合,內(nèi)部創(chuàng)建:

  1. FlatMappedRDD(this,sc.clean(f))

上圖表示RDD的一個分區(qū),進行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->UTU可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個RDD分區(qū),小方框代表一個集合。V1、V2V3在一個集合作為RDD的一個數(shù)據(jù)項,可能存儲為數(shù)組或其他容器,轉(zhuǎn)換為V’1、V’2V’3后,將原來的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成RDD中的數(shù)據(jù)項。

flatMap 案例
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())

輸出:

  1. [['m'], ['a', 'n']]
  2. ['m', 'a', 'n']

flatMap:將兩個集合轉(zhuǎn)換成一個集合
?

需求:使用 flatMap 算子,將rdd的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:

  • 合并RDD的元素,例如:
    1. ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
    2. ([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
      from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listlist = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(list)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())        """使用 flatMap 算子,將 rdd 的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:合并RDD的元素,例如:([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)"""# 5.使用 filter 算子完成以上需求flat_map = rdd.flatMap(lambda x: x)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
      

      distinct 算子

      distinct

      distinctRDD 中的元素進行去重操作。

      上圖中的每個方框代表一個 RDD 分區(qū),通過 distinct 函數(shù),將數(shù)據(jù)去重。 例如,重復(fù)數(shù)據(jù) V1V1 去重后只保留一份 V1 。

      distinct 案例
      sc = SparkContext("local", "Simple App")
      data = ["python", "python", "python", "java", "java"]
      rdd = sc.parallelize(data)
      print(rdd.collect())
      distinct = rdd.distinct()
    3. 輸出

      ['python', 'python', 'python', 'java', 'java']
      ['python', 'java']
    4. print(distinct.collect())

      sortByKey 算子

      sortByKey
    5. def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
      if numPartitions is None:
      numPartitions = self._defaultReducePartitions()memory = self._memory_limit()
      serializer = self._jrdd_deserializerdef sortPartition(iterator):
      sort = ExternalSorter(memory * 0.9, serializer).sorted
      return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))if numPartitions == 1:
      if self.getNumPartitions() > 1:
      self = self.coalesce(1)
      return self.mapPartitions(sortPartition, True)# first compute the boundary of each part via sampling: we want to partition
      # the key-space into bins such that the bins have roughly the same
      # number of (key, value) pairs falling into them
      rddSize = self.count()
      if not rddSize:
      return self # empty RDD
      maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
      f\fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
      samples = self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect()
      samples = sorted(samples, key=keyfunc)# we have numPartitions many parts but one of the them has
      # an implicit boundary
      bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
      for i in range(0, numPartitions - 1)]def rangePartitioner(k):
      p = bisect.bisect_left(bounds, keyfunc(k))
      if ascending:
      return p
      else:
      return numPartitions - 1 - preturn self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
    6. 說明:ascending參數(shù)是指排序(升序還是降序),默認(rèn)是升序。numPartitions參數(shù)是重新分區(qū),默認(rèn)與上一個RDD保持一致。keyfunc參數(shù)是排序規(guī)則。

      sortByKey 案例
    7. sc = SparkContext("local", "Simple App")
    8. data = [("a",1),("a",2),("c",1),("b",1)]
    9. rdd = sc.parallelize(data)
    10. key = rdd.sortByKey()
    11. print(key.collect())
    12. 輸出:

    13. [('a', 1), ('a', 2), ('b', 1), ('c', 1)]

?需求:使用 sortBy 算子,將 rdd 中的數(shù)據(jù)進行排序(升序)。

from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個內(nèi)容為[('B',1),('A',2),('C',3)]的列表ListList = [('B',1),('A',2),('C',3)]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortByKey 算子,將 rdd 的數(shù)據(jù) ('B', 1), ('A', 2), ('C', 3) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:元素排序,例如:[(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]"""# 5.使用 sortByKey 算子完成以上需求key = rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 轉(zhuǎn)換的元素print(key.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

mapValues 算子

mapValues

mapValues :針對(Key, Value)型數(shù)據(jù)中的 Value 進行 Map 操作,而不對 Key 進行處理。

上圖中的方框代表 RDD 分區(qū)。 a=>a+2 代表對 (V1,1) 這樣的 Key Value 數(shù)據(jù)對,數(shù)據(jù)只對 Value 中的 1 進行加 2 操作,返回結(jié)果為 3

mapValues 案例
  1. sc = SparkContext("local", "Simple App")
    data = [("a",1),("a",2),("b",1)]
    rdd = sc.parallelize(data)
    values = rdd.mapValues(lambda x: x + 2)
    print(values.collect())

輸出:

  1. [('a', 3), ('a', 4), ('b', 3)]

需求:使用mapValues算子,將rdd的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:

  • 偶數(shù)轉(zhuǎn)換成該數(shù)的平方
  • 奇數(shù)轉(zhuǎn)換成該數(shù)的立方
    from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")    # 2.創(chuàng)建一個內(nèi)容為[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表ListList = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 mapValues 算子,將 rdd 的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key,value)的value進行以下操作:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 mapValues 算子完成以上需求values = rdd.mapValues(lambda x: x + 2)# 6.使用rdd.collect() 收集完成 mapValues 轉(zhuǎn)換的元素print(values.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
    

    reduceByKey 算子

    reduceByKey

    reduceByKey 算子,只是兩個值合并成一個值,比如疊加。

    函數(shù)實現(xiàn)

    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
    return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

上圖中的方框代表 RDD 分區(qū)。通過自定義函數(shù) (A,B) => (A + B) ,將相同 key 的數(shù)據(jù) (V1,2)(V1,1)value 做加法運算,結(jié)果為( V1,3)。

reduceByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())

輸出:

[('a', 3), ('b', 1)]

需求:使用 reduceByKey 算子,將 rdd(key-value類型) 中的數(shù)據(jù)進行值累加。

例如:

  1. ("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")    # 2.創(chuàng)建一個內(nèi)容為[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表ListList = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]# 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)  # 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 reduceByKey 算子,將 rdd 的數(shù)據(jù)[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的規(guī)則進行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key-value)的value累加操作,例如:(1,1),(1,1),(1,2)  --> (1,4)(1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)"""# 5.使用 reduceByKey 算子完成以上需求reduce = rdd.reduceByKey(lambda x,y:x+y)# 6.使用rdd.collect() 收集完成 reduceByKey 轉(zhuǎn)換的元素print(reduce.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

Action 的常用算子

count

count():返回 RDD 的元素個數(shù)。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.count())

輸出:

  1. 5
first

first():返回 RDD 的第一個元素(類似于take(1))。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.first())

輸出:

  1. python
take

take(n):返回一個由數(shù)據(jù)集的前 n 個元素組成的數(shù)組。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.take(2))

輸出:

  1. ['python', 'python']
reduce

reduce():通過func函數(shù)聚集 RDD 中的所有元素,該函數(shù)應(yīng)該是可交換的和關(guān)聯(lián)的,以便可以并行正確計算。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = [1,1,1,1]
    rdd = sc.parallelize(data)
    print(rdd.reduce(lambda x,y:x+y))

輸出:

  1. 4
collect

collect():在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = [1,1,1,1]
    rdd = sc.parallelize(data)
    print(rdd.collect())

輸出:

  1. [1,1,1,1]
具體任務(wù)如下:

需求1:使用 count 算子,統(tǒng)計下 rdd 中元素的個數(shù);

需求2:使用 first 算子,獲取 rdd 首個元素;

需求3:使用 take 算子,獲取 rdd 前三個元素;

需求4:使用 reduce 算子,進行累加操作;

需求5:使用 collect 算子,收集所有元素。
?

from pyspark import SparkContext
if __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個內(nèi)容為[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表ListList = [1, 3, 5, 7, 9, 8, 6, 4, 2]  # 3.通過 SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.收集rdd的所有元素并print輸出print(rdd.collect())# 5.統(tǒng)計rdd的元素個數(shù)并print輸出print(rdd.count())# 6.獲取rdd的第一個元素并print輸出print(rdd.first())# 7.獲取rdd的前3個元素并print輸出print(rdd.take(3))# 8.聚合rdd的所有元素并print輸出print(rdd.reduce(lambda x,y:x+y))# 9.停止 SparkContextsc.stop()# ********** End **********#

http://m.aloenet.com.cn/news/32122.html

相關(guān)文章:

  • 做網(wǎng)站如何將一張圖片直接變體馮耀宗seo視頻教程
  • 邢臺網(wǎng)站建設(shè)的公司湖南網(wǎng)絡(luò)推廣排名
  • apt-get install wordpress深圳外包seo
  • 吉安網(wǎng)站建設(shè)0796abc百度小說搜索風(fēng)云榜總榜
  • 室內(nèi)設(shè)計網(wǎng)站知乎南京響應(yīng)式網(wǎng)站建設(shè)
  • 網(wǎng)站專業(yè)術(shù)語中seo意思是網(wǎng)站制作基本流程
  • 做mv主題網(wǎng)站媒體發(fā)稿費用
  • 嘉興做網(wǎng)站的銷售培訓(xùn)課程一般有哪些
  • 網(wǎng)站流量的重要性seo推廣軟件代理
  • web網(wǎng)站開發(fā)報告深圳seo優(yōu)化
  • 豐臺網(wǎng)站建設(shè)小程序開發(fā)哪家更靠譜
  • 天津網(wǎng)站制作計劃電商項目策劃書
  • 湖南省人民政府駐深圳辦事處江門網(wǎng)站優(yōu)化公司
  • 寶安做棋牌網(wǎng)站建設(shè)找哪家公司好上海網(wǎng)站優(yōu)化
  • 電商網(wǎng)站開發(fā)價格優(yōu)化網(wǎng)站seo策略
  • 在哪個網(wǎng)站上做外貿(mào)好深圳網(wǎng)站優(yōu)化網(wǎng)站
  • 哪些網(wǎng)站是php企業(yè)qq和個人qq有什么區(qū)別
  • 做視頻網(wǎng)站收費侵權(quán)嗎全媒體廣告代理加盟
  • 農(nóng)村網(wǎng)站做移動濟南做網(wǎng)站比較好的公司
  • 網(wǎng)站備案信息怎么做百度熱搜高考大數(shù)據(jù)
  • pc網(wǎng)站怎么適配移動端網(wǎng)頁設(shè)計效果圖及代碼
  • 保險做的好的網(wǎng)站第三方推廣平臺
  • 鞋圖相冊網(wǎng)站怎么做app拉新平臺哪個好傭金高
  • 專業(yè)做網(wǎng)站建設(shè)建站公司網(wǎng)站怎么做
  • 建設(shè)項目立項網(wǎng)站搜索引擎優(yōu)化網(wǎng)站
  • 如何做二維碼跳轉(zhuǎn)到網(wǎng)站軟件開發(fā)
  • 杭州余杭做網(wǎng)站公司免費推廣網(wǎng)站地址大全
  • 傳統(tǒng)網(wǎng)站有沒有建設(shè)必要建網(wǎng)站賺錢
  • 承德網(wǎng)站建設(shè)方案在線排名優(yōu)化工具
  • 個人網(wǎng)站 數(shù)據(jù)庫如何上傳到空間視頻號推廣