平行處理 JOIN 作業

本頁說明 Cloud Data Fusion 中 JOIN 作業的效能調整。

JOIN 作業可能是管道中最耗費資源的部分。就像管道中的其他所有內容一樣,作業會以平行方式執行。JOIN 的第一個步驟是重新排序資料,讓每個具有相同 JOIN 鍵的記錄都傳送至相同的執行緒。所有資料重組完成後,系統就會進行彙整,並繼續透過管道輸出結果。

JOIN 作業中並行處理的範例

舉例來說,假設您對名為 PurchasesItems 的資料集執行 JOIN 作業。每個購買記錄都包含商品名稱和購買數量。每個商品記錄都包含商品名稱和價格。系統會對商品名稱執行 JOIN,以便計算每筆購買交易的總價。資料彙整後,系統會在叢集中洗牌資料,讓 ID 相同的記錄最終會出現在同一個執行緒上。

如果 JOIN 鍵分布得相當均勻,JOIN 作業就能順利執行,因為可以並行執行。

和任何隨機播放一樣,資料偏移會對效能造成負面影響。在上述範例中,雞蛋的購買頻率遠高於雞或牛奶,這表示加入雞蛋購買的執行緒比其他執行緒的工作量更多。如果您發現 JOIN 偏離,有兩種方法可以改善效能。

自動分割偏移的分區

透過自適應查詢執行功能,系統會自動處理極端偏差的情況。一旦 JOIN 產生的某些分區比其他分區大得多,就會拆分為較小的分區。如要確認是否已啟用自適應查詢執行功能,請參閱「自動調整」。

使用記憶體內的 JOIN

如果 JOIN 的一側足夠小,可放入記憶體中,即可執行記憶體內 JOIN。在這種情況下,系統會將小型資料集載入記憶體,然後廣播給每個執行緒。大型資料集不會進行洗牌,因此不會產生在 JOIN 鍵上洗牌時產生的不均勻分區。

在前述範例中,系統會先將商品資料集載入 Spark 驅動程式的記憶體中。然後廣播給每個執行緒。執行者現在可以彙整資料,而無須重新洗牌任何購買資料集。

使用這種方法時,您必須為 Spark 驅動程式和執行緒提供足夠的記憶體,讓它們能夠在記憶體中儲存廣播資料集。根據預設,Spark 會保留略低於 30% 的記憶體,用於儲存這類資料。使用記憶體內 JOIN 時,請將資料集大小乘以四,並將該值設為執行緒和驅動程式記憶體。舉例來說,如果項目資料集的大小為 1 GB,我們就需要將執行緒和驅動程式記憶體設為至少 4 GB。無法將大於 8 GB 的資料集載入記憶體。

金鑰發佈

如果 JOIN 的兩側都太大,無法放入記憶體,您可以使用其他技巧將每個 JOIN 鍵分割成多個鍵,以提高平行處理的程度。這項技巧可套用至 INNER JOINLEFT OUTER JOIN 作業。無法用於 FULL OUTER JOIN 運算。

在這種方法中,偏差的一方會使用新的整數資料欄進行加鹽處理,其中包含 1 到 N 之間的隨機數字。未傾斜的側邊會展開,每個現有資料列都會產生 N 個新資料列。系統會在展開的側邊新增一列,並填入 1 到 N 之間的每個數字。接著執行一般 JOIN,但新欄會新增為 JOIN 鍵的一部分。如此一來,原本會進入單一分區的所有資料,現在會分散到最多 N 個不同的分區。

在上述範例中,分配因子 N 已設為 3。左側顯示原始資料集。中間顯示經過加鹽和展開的資料集版本。右側顯示經過重新排序的資料,其中有三個不同的執行緒加入購買雞蛋,而非一個。

增加分布作業可提高平行處理能力。不過,這會導致 JOIN 的一側爆炸,導致叢集中有更多資料進行排序。因此,隨著發布量增加,效益就會降低。在大多數情況下,請將其設為 20 以下。

後續步驟

  • 進一步瞭解 Cloud Data Fusion 中的並行處理功能。