轉帖|使用教程|編輯:龔雪|2017-04-01 09:39:14.000|閱讀 354 次
概述:Spark 作為一個基于內存的分布式計算引擎,其內存管理模塊在整個系統中扮演著非常重要的角色。
# 界面/圖表報表/文檔/IDE等千款熱門軟控件火熱銷售中 >>
彈性分布式數據集(RDD)作為 Spark 最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基于在穩定物理存儲中的數據集上創建,或者在其他已有的 RDD 上執行轉換(Transformation)操作產生一個新的 RDD。轉換后的 RDD 與原始的 RDD 之間產生的依賴關系,構成了血統(Lineage)。憑借血統,Spark 保證了每一個 RDD 都可以被重新恢復。但 RDD 的所有轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 才會創建任務讀取 RDD,然后真正觸發轉換的執行。
Task 在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查 Checkpoint 或按照血統重新計算。所以如果一個 RDD 上要執行多次行動,可以在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在后面的行動時提升計算速度。事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,便可以對緩存 RDD 時使用的內存做統一的規劃和管 理 (存儲內存的其他應用場景,如緩存 broadcast 數據,暫時不在本文的討論范圍之內)。
RDD 的持久化由 Spark 的 Storage 模塊 [7] 負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。Storage 模塊在邏輯上以 Block 為基本存儲單位,RDD 的每個 Partition 經過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而 Slave 需要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。
圖 7 . Storage 模塊示意圖
在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的 ,而存儲級別是以下 5 個變量的組合:
清單 3 . 存儲級別
class StorageLevel private( private var _useDisk: Boolean, //磁盤 private var _useMemory: Boolean, //這里其實是指堆內內存 private var _useOffHeap: Boolean, //堆外內存 private var _deserialized: Boolean, //是否為非序列化 private var _replication: Int = 1 //副本個數 )
通過對數據結構的分析,可以看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:
RDD 在緩存到存儲內存之前,Partition 中的數據一般以迭代器()的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。通過 Iterator 可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些 Record 的對象實例在邏輯上占用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不同 Record 的空間并不連續。
RDD 在緩存到存儲內存之后,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中占用一塊連續的空間。將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為”展開”(Unroll)。Block 有序列化和非序列化兩種存儲格式,具體以哪種方式取決于該 RDD 的存儲級別。非反序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲所有的 Java 對象,非序列化的 Block 則以 SerializedMemoryEntry 的數據結構定義,用字節緩沖區(ByteBuffer)來存儲二進制數據。每個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的 Block 對象的實例[6],對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。
因為不能保證存儲空間可以一次容納 Iterator 中的所有數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時占位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行。對于序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,采樣估算其所需的 Unroll 空間并進行申請,空間不足時可以中斷,釋放已占用的 Unroll 空間。如果最終 Unroll 成功,當前 Partition 所占用的 Unroll 空間被轉換為正常的緩存 RDD 的存儲空間.
由于同一個 Executor 的所有的計算任務共享有限的存儲內存空間,當有新的 Block 需要緩存但是剩余空間不足且無法動態占用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接刪除該 Block。
存儲內存的淘汰規則為:
落盤的流程則比較簡單,如果其存儲級別符合_useDisk 為 true 的條件,再根據其_deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最后將數據存儲到磁盤,在 Storage 模塊中更新其信息。
Executor 內運行的任務同樣共享執行內存,Spark 用一個 HashMap 結構保存了任務到內存耗費的映射。每個任務可占用的執行內存大小的范圍為 1/2N ~ 1/N,其中 N 為當前 Executor 內正在運行的任務的個數。每個任務在啟動之時,要向 MemoryManager 請求申請最少為 1/2N 的執行內存,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行內存,該任務才可以被喚醒。
執行內存主要用來存儲任務在執行 Shuffle 時占用的內存,Shuffle 是按照一定規則對 RDD 數據重新分區的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:
Shuffle Write
Shuffle Read
在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程中所有數據并不能都保存到該哈希表中,當這個哈希表占用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其全部內容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸并(Merge)。
Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃[9],解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否采用 Tungsten 排序。Tungsten 采用的頁式內存管理機制建立在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心數據具體存儲在堆內還是堆外。每個內存頁用一個 MemoryBlock 來定義,并用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 為 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,并用頁表(pageTable)管理每個 Task 申請到的內存頁。
Tungsten 頁式管理下的所有內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:
有了統一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只需要對指針進行排序,并且無需反序列化,整個過程非常高效,對于內存訪問效率和 CPU 使用效率帶來了明顯的提升[10]。
Spark 的存儲內存和執行內存有著截然不同的管理方式:對于存儲內存來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉化而成;而對于執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的數據,在 Tungsten 排序中甚至抽象成為頁式內存管理,開辟了全新的 JVM 內存管理機制。
Spark 的內存管理是一套復雜的機制,且 Spark 的版本更新比較快,因此,在理解本文的操作方法的基礎之上,還需要結合新版本的功能,才能達到更好的效果。
更多行業資訊,更新鮮的技術動態,盡在。
本站文章除注明轉載外,均為本站原創或翻譯。歡迎任何形式的轉載,但請務必注明出處、不得修改原文相關鏈接,如果存在內容上的異議請郵件反饋至chenjj@fc6vip.cn