做企業(yè)網(wǎng)站的架構(gòu)圖廣州王牌seo
相比MapReduce僵化的Map與Reduce分階段計(jì)算,Spark計(jì)算框架更有彈性和靈活性,運(yùn)行性能更佳。
1 Spark的計(jì)算階段
- MapReduce一個(gè)應(yīng)用一次只運(yùn)行一個(gè)map和一個(gè)reduce
- Spark可根據(jù)應(yīng)用復(fù)雜度,分割成更多的計(jì)算階段(stage),組成一個(gè)DAG,Spark任務(wù)調(diào)度器可根據(jù)DAG依賴關(guān)系執(zhí)行計(jì)算階段
邏輯回歸機(jī)器學(xué)習(xí)性能Spark比MapReduce快100多倍。因某些機(jī)器學(xué)習(xí)算法可能需大量迭代計(jì)算,產(chǎn)生數(shù)萬個(gè)計(jì)算階段,這些計(jì)算階段在一個(gè)應(yīng)用中處理完成,而不像MapReduce需要啟動數(shù)萬個(gè)應(yīng)用,因此運(yùn)行效率極高。
DAG,不同階段的依賴關(guān)系有向,計(jì)算過程只能沿依賴關(guān)系方向執(zhí)行,被依賴的階段執(zhí)行完成前,依賴的階段不能開始執(zhí)行。該依賴關(guān)系不能有環(huán)形依賴,否則就死循環(huán)。
典型的Spark運(yùn)行DAG的不同階段:
整個(gè)應(yīng)用被切分成3個(gè)階段,階段3依賴階段1、2,階段1、2互不依賴。Spark執(zhí)行調(diào)度時(shí),先執(zhí)行階段1、2,完成后,再執(zhí)行階段3。對應(yīng)Spark偽代碼:
rddB = rddA.groupBy(key)
rddD = rddC.map(func)
rddF = rddD.union(rddE)
rddG = rddB.join(rddF)
所以Spark作業(yè)調(diào)度執(zhí)行核心是DAG,整個(gè)應(yīng)用被切分成數(shù)個(gè)階段,每個(gè)階段的依賴關(guān)系也很清楚。根據(jù)每個(gè)階段要處理的數(shù)據(jù)量生成任務(wù)集合(TaskSet),每個(gè)任務(wù)都分配一個(gè)任務(wù)進(jìn)程去處理,Spark就實(shí)現(xiàn)大數(shù)據(jù)分布式計(jì)算。
負(fù)責(zé)Spark應(yīng)用DAG生成和管理的組件是DAGScheduler:
- DAGScheduler根據(jù)程序代碼生成DAG
- 然后將程序分發(fā)到分布式計(jì)算集群
- 按計(jì)算階段的先后關(guān)系調(diào)度執(zhí)行
Spark劃分計(jì)算階段的依據(jù)
顯然并非RDD上的每個(gè)轉(zhuǎn)換函數(shù)都會生成一個(gè)計(jì)算階段,如上4個(gè)轉(zhuǎn)換函數(shù),但只有3個(gè)階段。
觀察上面DAG圖,計(jì)算階段的劃分就看出,當(dāng)RDD之間的轉(zhuǎn)換連接線呈現(xiàn)多對多交叉連接,就產(chǎn)生新階段。一個(gè)RDD代表一個(gè)數(shù)據(jù)集,圖中每個(gè)RDD里面都包含多個(gè)小塊,每個(gè)小塊代表RDD的一個(gè)分片。
一個(gè)數(shù)據(jù)集中的多個(gè)數(shù)據(jù)分片需進(jìn)行分區(qū)傳輸,寫到另一個(gè)數(shù)據(jù)集的不同分片,這種數(shù)據(jù)分區(qū)交叉?zhèn)鬏敳僮?#xff0c;在MapReduce運(yùn)行過程也看過。
這就是shuffle過程,Spark也要通過shuffle將數(shù)據(jù)重組,相同Key的數(shù)據(jù)放在一起,進(jìn)行聚合、關(guān)聯(lián)等操作,因而每次shuffle都產(chǎn)生新的計(jì)算階段。這也是為什么計(jì)算階段會有依賴關(guān)系,它需要的數(shù)據(jù)來源于前面一個(gè)或多個(gè)計(jì)算階段產(chǎn)生的數(shù)據(jù),必須等待前面的階段執(zhí)行完畢才能進(jìn)行shuffle,并得到數(shù)據(jù)。
計(jì)算階段劃分依據(jù)是shuffle,而非轉(zhuǎn)換函數(shù)的類型,有的函數(shù)有時(shí)有shuffle,有時(shí)無。如上圖例子中RDD B和RDD F進(jìn)行join,得到RDD G,這里的RDD F需要進(jìn)行shuffle,RDD B不需要。
因?yàn)镽DD B在前面一個(gè)階段,階段1的shuffle過程中,已進(jìn)行數(shù)據(jù)分區(qū)。分區(qū)數(shù)目和分區(qū)K不變,無需再shuffle:
- 這種無需進(jìn)行shuffle的依賴,在Spark里稱窄依賴
- 需進(jìn)行shuffle的依賴,稱寬依賴
類似MapReduce,shuffle對Spark也重要,只有通過shuffle,相關(guān)數(shù)據(jù)才能互相計(jì)算。
既然都要shuffle,為何Spark更高效?
本質(zhì)Spark算一種MapReduce計(jì)算模型的不同實(shí)現(xiàn)。Hadoop MapReduce簡單粗暴根據(jù)shuffle將大數(shù)據(jù)計(jì)算分成Map、Reduce兩階段就完事。但Spark更細(xì),將前一個(gè)的Reduce和后一個(gè)的Map連接,當(dāng)作一個(gè)階段持續(xù)計(jì)算,形成一個(gè)更優(yōu)雅、高效地計(jì)算模型,其本質(zhì)依然是Map、Reduce。但這種多個(gè)計(jì)算階段依賴執(zhí)行的方案可有效減少對HDFS的訪問,減少作業(yè)的調(diào)度執(zhí)行次數(shù),因此執(zhí)行速度更快。
不同于Hadoop MapReduce主要使用磁盤存儲shuffle過程中的數(shù)據(jù),Spark優(yōu)先使用內(nèi)存進(jìn)行數(shù)據(jù)存儲,包括RDD數(shù)據(jù)。除非內(nèi)存不夠用,否則盡可能使用內(nèi)存, 這即Spark比Hadoop性能高。
2 Spark作業(yè)管理
Spark里面的RDD函數(shù)有兩種:
- 轉(zhuǎn)換函數(shù),調(diào)用后得到的還是RDD,RDD計(jì)算邏輯主要通過轉(zhuǎn)換函數(shù)
- action函數(shù),調(diào)用后不再返回RDD。如count()函數(shù),返回RDD中數(shù)據(jù)的元素個(gè)數(shù)
- saveAsTextFile(path),將RDD數(shù)據(jù)存儲到path路徑
Spark的DAGScheduler遇到shuffle時(shí),會生成一個(gè)計(jì)算階段,在遇到action函數(shù)時(shí),會生成一個(gè)作業(yè)(job)。
RDD里面的每個(gè)數(shù)據(jù)分片,Spark都會創(chuàng)建一個(gè)計(jì)算任務(wù)去處理,所以一個(gè)計(jì)算階段含多個(gè)計(jì)算任務(wù)(task)。
作業(yè)、計(jì)算階段、任務(wù)的依賴和時(shí)間先后關(guān)系:
橫軸時(shí)間,縱軸任務(wù)。兩條粗黑線之間是一個(gè)作業(yè),兩條細(xì)線之間是一個(gè)計(jì)算階段。一個(gè)作業(yè)至少包含一個(gè)計(jì)算階段。水平方向紅色的線是任務(wù),每個(gè)階段由很多個(gè)任務(wù)組成,這些任務(wù)組成一個(gè)任務(wù)集合。
DAGScheduler根據(jù)代碼生成DAG圖后,Spark任務(wù)調(diào)度就以任務(wù)為單位進(jìn)行分配,將任務(wù)分配到分布式集群的不同機(jī)器上執(zhí)行。
3 Spark執(zhí)行流程
Spark支持Standalone、Yarn、Mesos、K8s等多種部署方案,原理類似,僅不同組件的角色命名不同。
3.1 Spark cluster components
Spark應(yīng)用程序啟動在自己的JVM進(jìn)程里(Driver進(jìn)程),啟動后調(diào)用SparkContext初始化執(zhí)行配置和輸入數(shù)據(jù)。SparkContext啟動DAGScheduler構(gòu)造執(zhí)行的DAG圖,切分成最小的執(zhí)行單位-計(jì)算任務(wù)。
然后,Driver向Cluster Manager請求計(jì)算資源,用于DAG的分布式計(jì)算。Cluster Manager收到請求后,將Driver的主機(jī)地址等信息通知給集群的所有計(jì)算節(jié)點(diǎn)Worker。
Worker收到信息后,根據(jù)Driver的主機(jī)地址,跟Driver通信并注冊,然后根據(jù)自己的空閑資源向Driver通報(bào)自己可以領(lǐng)用的任務(wù)數(shù)。Driver根據(jù)DAG圖開始向注冊的Worker分配任務(wù)。
Worker收到任務(wù)后,啟動Executor進(jìn)程執(zhí)行任務(wù)。Executor先檢查自己是否有Driver的執(zhí)行代碼,若無,從Driver下載執(zhí)行代碼,通過Java反射加載后開始執(zhí)行。
4 Spark V.S Hadoop
4.1 個(gè)體對比
4.2 生態(tài)圈對比
4.3 MapReduce V.S Spark
4.4 優(yōu)勢
4.5 Spark 和 Hadoop 協(xié)作
5 總結(jié)
相比Mapreduce,Spark的主要特性:
- RDD編程模型更簡單
- DAG切分的多階段計(jì)算過程更快
- 使用內(nèi)存存儲中間計(jì)算結(jié)果更高效
Spark在2012開始流行,那時(shí)內(nèi)存容量提升和成本降低已經(jīng)比MapReduce出現(xiàn)的十年前強(qiáng)了一個(gè)數(shù)量級,Spark優(yōu)先使用內(nèi)存的條件已成熟。
本文描述的內(nèi)存模型自 Apache Spark 1.6+ 開始棄用,新的內(nèi)存模型基于 UnifiedMemoryManager,并在這篇文章中描述。
在最近的時(shí)間里,我在 StackOverflow 上回答了一系列與 ApacheSpark 架構(gòu)有關(guān)的問題。所有這些問題似乎都是因?yàn)榛ヂ?lián)網(wǎng)上缺少一份關(guān)于 Spark 架構(gòu)的好的通用描述造成的。即使是官方指南也沒有太多細(xì)節(jié),當(dāng)然也缺乏好的圖表。《學(xué)習(xí) Spark》這本書和官方研討會的資料也是如此。
在這篇文章中,我將嘗試解決這個(gè)問題,提供一個(gè)關(guān)于 Spark 架構(gòu)的一站式指南,以及對其一些最受歡迎的概念問題的解答。這篇文章并不適合完全的初學(xué)者——它不會為你提供關(guān)于 Spark 主要編程抽象(RDD 和 DAG)的洞見,但是它要求你有這些知識作為先決條件。
從 http://spark.apache.org/docs/1.3.0/cluster-overview.html 上可用的官方圖片開始:
Spark 架構(gòu)官方:
如你所見,它同時(shí)引入了許多術(shù)語——“executor”,“task”,“cache”,“Worker Node”等等。當(dāng)我開始學(xué)習(xí) Spark 概念的時(shí)候,這幾乎是互聯(lián)網(wǎng)上唯一關(guān)于 Spark 架構(gòu)的圖片,現(xiàn)在情況也沒有太大改變。我個(gè)人不是很喜歡這個(gè),因?yàn)樗鼪]有顯示一些重要的概念,或者顯示得不是最佳方式。
讓我們從頭說起。任何,任何在你的集群或本地機(jī)器上運(yùn)行的 Spark 過程都是一個(gè) JVM 過程。與任何 JVM 過程一樣,你可以用 -Xmx 和 -Xms JVM 標(biāo)志來配置它的堆大小。這個(gè)過程如何使用它的堆內(nèi)存,以及它為什么需要它?以下是 JVM 堆內(nèi)的 Spark 內(nèi)存分配圖表:
默認(rèn)情況下,Spark 以 512MB JVM 堆啟動。為了安全起見,避免 OOM 錯(cuò)誤,Spark 只允許使用堆的 90%,這由參數(shù) spark.storage.safetyFraction 控制。好的,正如你可能已經(jīng)聽說 Spark 是一個(gè)內(nèi)存中的工具,Spark 允許你將一些數(shù)據(jù)存儲在內(nèi)存中。如果你讀過我這里的文章 https://0x0fff.com/spark-misconceptions/,你應(yīng)該理解 Spark 并不是真的內(nèi)存工具,它只是利用內(nèi)存來緩存 LRU(http://en.wikipedia.org/wiki/Cache_algorithms)。所以一些內(nèi)存是為你處理的數(shù)據(jù)緩存而保留的部分,這部分通常是安全堆的 60%,由 spark.storage.memoryFraction 參數(shù)控制。所以如果你想知道你可以在 Spark 中緩存多少數(shù)據(jù),你應(yīng)該取所有執(zhí)行器的堆大小之和,乘以 safetyFraction 和 storage.memoryFraction,默認(rèn)情況下,它是 0.9 * 0.6 = 0.54 或者讓 Spark 使用的總的堆大小的 54%。
現(xiàn)在更詳細(xì)地了解 shuffle 內(nèi)存。它的計(jì)算方法為 “堆大小” * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。spark.shuffle.safetyFraction 的默認(rèn)值是 0.8 或 80%,spark.shuffle.memoryFraction 的默認(rèn)值是 0.2 或 20%。所以最終你可以使用最多 0.8*0.2 = 0.16 或 JVM 堆的 16% 用于 shuffle。但是 Spark 如何使用這些內(nèi)存呢?你可以在這里獲取更多細(xì)節(jié)(https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala),但總的來說,Spark 用這些內(nèi)存進(jìn)行它的 Shuffle。當(dāng) Shuffle 進(jìn)行時(shí),有時(shí)你也需要對數(shù)據(jù)進(jìn)行排序。當(dāng)你排序數(shù)據(jù)時(shí),你通常需要一個(gè)緩沖區(qū)來存儲排序后的數(shù)據(jù)(記住,你不能就地修改 LRU 緩存中的數(shù)據(jù),因?yàn)樗怯脕砩院笾赜玫?#xff09;。所以它需要一些 RAM 來存儲排序的數(shù)據(jù)塊。如果你沒有足夠的內(nèi)存來排序數(shù)據(jù)會怎樣?有一系列通常被稱為“外部排序”的算法(http://en.wikipedia.org/wiki/External_sorting)允許你進(jìn)行分塊數(shù)據(jù)的排序,然后再將最終結(jié)果合并起來。
我還沒涵蓋的 RAM 的最后部分是“unroll”內(nèi)存。被 unroll 過程使用的 RAM 部分是 spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction,默認(rèn)值等于 0.2 * 0.6 * 0.9 = 0.108 或者堆的 10.8%。這是當(dāng)你將數(shù)據(jù)塊 unroll 到內(nèi)存時(shí)可以使用的內(nèi)存。為什么你需要 unroll 它呢?Spark 允許你以序列化和非序列化形式存儲數(shù)據(jù)。序列化形式的數(shù)據(jù)不能直接使用,因此你需要在使用之前 unroll 它,所以這是用于 unroll 的 RAM。它與存儲 RAM 共享,這意味著如果你需要一些內(nèi)存來 unroll 數(shù)據(jù),這可能會導(dǎo)致 Spark LRU 緩存中存儲的一些分區(qū)被刪除。
這很好,因?yàn)榇丝棠阒懒耸裁词?Spark 過程以及它如何利用它的 JVM 過程的內(nèi)存?,F(xiàn)在讓我們轉(zhuǎn)到集群模式——當(dāng)你啟動一個(gè) Spark 集群時(shí),它實(shí)際上是什么樣的呢?我喜歡 YARN,所以我將講述它在 YARN 上是如何工作的,但是總的來說,對于任何你使用的集群管理器來說都是一樣的:
在 YARN 上的 Spark 架構(gòu):
當(dāng)你有一個(gè) YARN 集群時(shí),它有一個(gè) YARN Resource Manager 守護(hù)進(jìn)程,控制集群資源(實(shí)際上是內(nèi)存)以及在集群節(jié)點(diǎn)上運(yùn)行的一系列 YARN Node Managers,控制節(jié)點(diǎn)資源利用率。從 YARN 的角度來看,每個(gè)節(jié)點(diǎn)代表你有控制權(quán)的 RAM 池。當(dāng)你向 YARN Resource Manager 請求一些資源時(shí),它會給你提供你可以聯(lián)系哪些 Node Managers 為你啟動執(zhí)行容器的信息。每個(gè)執(zhí)行容器是一個(gè)具有請求堆大小的 JVM。JVM 位置由 YARN Resource Manager 選擇,你無法控制它——如果節(jié)點(diǎn)有 64GB 的 RAM 被 YARN 控制(yarn-site.xml 中的 yarn.nodemanager.resource.memory-mb 設(shè)置)并且你請求 10 個(gè)執(zhí)行器,每個(gè)執(zhí)行器 4GB,它們所有的都可以容易地在一個(gè) YARN 節(jié)點(diǎn)上啟動,即使你有一個(gè)大集群。
當(dāng)你在 YARN 之上啟動 Spark 集群時(shí),你指定了你需要的執(zhí)行器數(shù)量(–num-executors 標(biāo)志或 spark.executor.instances 參數(shù))、每個(gè)執(zhí)行器使用的內(nèi)存量(–executor-memory 標(biāo)志或 spark.executor.memory 參數(shù))、每個(gè)執(zhí)行器允許使用的核心數(shù)量(–executor-cores 標(biāo)志或 spark.executor.cores 參數(shù)),以及為每個(gè)任務(wù)的執(zhí)行專用的核心數(shù)量(spark.task.cpus 參數(shù))。同時(shí)你還指定了驅(qū)動程序應(yīng)用程序使用的內(nèi)存量(–driver-memory 標(biāo)志或 spark.driver.memory 參數(shù))。
當(dāng)你在集群上執(zhí)行某事時(shí),你的工作處理被分割成階段,每個(gè)階段又被分割成任務(wù)。每個(gè)任務(wù)分別被調(diào)度。你可以將每個(gè)作為執(zhí)行者工作的 JVM 視為一個(gè)任務(wù)執(zhí)行槽池,每個(gè)執(zhí)行者會給你 spark.executor.cores / spark.task.cpus 執(zhí)行槽供你的任務(wù)使用,總共有 spark.executor.instances 執(zhí)行器。這是一個(gè)例子。有 12 個(gè)節(jié)點(diǎn)運(yùn)行 YARN Node Managers 的集群,每個(gè)節(jié)點(diǎn) 64GB 的 RAM 和 32 個(gè) CPU 核心(16 個(gè)物理核心與超線程)。這樣,在每個(gè)節(jié)點(diǎn)上你可以啟動 2 個(gè)執(zhí)行器,每個(gè)執(zhí)行器 26GB 的 RAM(為系統(tǒng)進(jìn)程、YARN NM 和 DataNode 留下一些 RAM),每個(gè)執(zhí)行器有 12 個(gè)核心用于任務(wù)(為系統(tǒng)進(jìn)程、YARN NM 和 DataNode 留下一些核心)。所以總的來說你的集群可以處理 12 臺機(jī)器 * 每臺機(jī)器 2 個(gè)執(zhí)行器 * 每個(gè)執(zhí)行器 12 個(gè)核心 / 每個(gè)任務(wù) 1 個(gè)核心 = 288 個(gè)任務(wù)槽。這意味著你的 Spark 集群將能夠并行運(yùn)行多達(dá) 288 個(gè)任務(wù),從而利用你在這個(gè)集群上擁有的幾乎所有資源。你可以在這個(gè)集群上緩存數(shù)據(jù)的內(nèi)存量是 0.9 * spark.storage.safetyFraction * 0.6 * spark.storage.memoryFraction * 12 臺機(jī)器 * 每臺機(jī)器 2 個(gè)執(zhí)行器 * 每個(gè)執(zhí)行器 26 GB = 336.96 GB。不算太多,但在大多數(shù)情況下它是足夠的。
到目前為止效果很好,現(xiàn)在你知道了 Spark 如何使用它的 JVM 的內(nèi)存以及你在集群上有哪些執(zhí)行槽。正如你可能已經(jīng)注意到的,我沒有詳細(xì)介紹“任務(wù)”究竟是什么。這將是下一篇文章的主題,但基本上它是 Spark 執(zhí)行的一個(gè)單一工作單元,并作為 線程* 在執(zhí)行器 JVM 中執(zhí)行。這是 Spark 低作業(yè)啟動時(shí)間的秘訣——在 JVM 中啟動額外的線程比啟動整個(gè) JVM 快得多,而后者是在 Hadoop 中開始 MapReduce 作業(yè)時(shí)執(zhí)行的。
現(xiàn)在讓我們關(guān)注另一個(gè)叫做“partition”的 Spark 抽象。你在 Spark 中工作的所有數(shù)據(jù)都被分割成分區(qū)。一個(gè)單一的分區(qū)是什么,它是如何確定的?分區(qū)大小完全取決于你使用的數(shù)據(jù)源。對于大多數(shù)在 Spark 中讀取數(shù)據(jù)的方法,你可以指定你想要在你的 RDD 中有多少分區(qū)。當(dāng)你從 HDFS 讀取一個(gè)文件時(shí),你使用的是 Hadoop 的 InputFormat 來做到這一點(diǎn)。默認(rèn)情況下,InputFormat 返回的每個(gè)輸入分割都映射到 RDD 中的單個(gè)分區(qū)。對于 HDFS 上的大多數(shù)文件,每個(gè)輸入分割生成一個(gè)對應(yīng)于 HDFS 上存儲的一個(gè)數(shù)據(jù)塊的數(shù)據(jù),大約是 64MB 或 128MB 的數(shù)據(jù)。大約,因?yàn)樵?HDFS 中,數(shù)據(jù)是按照字節(jié)的確切塊邊界分割的,但是在處理時(shí)它是按照記錄分割分割的。對于文本文件,分割字符是換行符,對于序列文件,是塊末等等。這個(gè)規(guī)則的唯一例外是壓縮文件——如果你有整個(gè)文本文件被壓縮,那么它不能被分割成記錄,整個(gè)文件將成為一個(gè)單一的輸入分割,從而在 Spark 中成為一個(gè)單一的分區(qū),你必須手動重新分區(qū)它。
現(xiàn)在我們所擁有的真的很簡單——為了處理一個(gè)單獨(dú)的數(shù)據(jù)分區(qū),Spark 生成一個(gè)單一任務(wù),這個(gè)任務(wù)在靠近你擁有的數(shù)據(jù)的位置(Hadoop 塊位置,Spark 緩存的分區(qū)位置)的任務(wù)槽中執(zhí)行。
參考
- https://spark.apache.org/docs/3.2.1/cluster-overview.html
- shuffle可以在這里找到
- 新內(nèi)存管理模型可以在這里找到