JOIN オペレーションの並列処理

このページでは、Cloud Data Fusion の JOIN オペレーションのパフォーマンス チューニングについて説明します。

JOIN オペレーションは、パイプラインの中で最もコストがかかる部分です。パイプラインの他のすべての処理と同様に、オペレーションは並行して実行されます。JOIN の最初のステップでは、同じ JOIN キーを持つすべてのレコードが同じエグゼキュータに送信されるように、データをシャッフルします。すべてのデータがシャッフルされると、結合され、出力はパイプラインを通過します。

JOIN オペレーションでの並列処理の例

たとえば、PurchasesItems というデータセットに対して JOIN オペレーションを実行するとします。各購入レコードには、アイテム名と購入数が表示されます。各アイテム レコードには、アイテム名とそのアイテムの価格が含まれています。アイテム名に対して JOIN が実行され、各購入の合計金額が計算されます。データが結合されると、同じ ID のレコードが同じエグゼキュータに配置されるように、クラスタ全体でデータがシャッフルされます。

JOIN キーがほぼ均等に分散されていると、並行して実行できるため、JOIN オペレーションのパフォーマンスが良好になります。

他のシャッフルと同様、データスキューがあるとパフォーマンスに悪影響を及ぼします。上の例では、卵は鶏肉や牛乳よりもはるかに頻繁に購入されています。つまり、卵の購入に関するデータを結合するエグゼキュータは、他のエグゼキュータよりも多くの処理を行います。JOIN が偏っている場合は、パフォーマンスを向上させる方法が 2 つあります。

偏ったパーティションを自動的に分割する

適応型クエリ実行では、非常に大きなスキューは自動的に処理されます。JOIN が他のパーティションよりもはるかに大きいパーティションを生成するとすぐに、それらは小さいパーティションに分割されます。適応型クエリ実行が有効になっていることを確認するには、自動チューニングをご覧ください。

インメモリ JOIN を使用する

JOIN の片側がメモリに収まる程度の小さいサイズであれば、メモリ内 JOIN を実行できます。この場合、小さなデータセットがメモリに読み込まれ、すべてのエグゼキュータにブロードキャストされます。大規模なデータセットはいずれもシャッフルされず、JOIN キーでシャッフルするときに生成される不均一なパーティションが削除されます。

上の例では、まずアイテムのデータセットが Spark ドライバのメモリに読み込まれます。その後、各エグゼキュータにブロードキャストされます。エグゼキュータは、購入データセットをシャッフルしなくてもデータを結合できるようになりました。

このアプローチでは、ブロードキャスト データセットをメモリに保存できるように、Spark ドライバとエグゼキュータの両方に十分なメモリを割り当てる必要があります。デフォルトでは、Spark はこのようなデータの保存用に 30% をわずかに下回るメモリを予約します。メモリ内 JOIN を使用する場合は、データセットのサイズに 4 を乗じて、エグゼキュータとドライバのメモリとして設定します。たとえば、アイテム データセットのサイズが 1 GB の場合、エグゼキュータとドライバのメモリを少なくとも 4 GB に設定する必要があります。8 GB を超えるデータセットはメモリに読み込めません。

鍵の配布

JOIN の両側のサイズが大きくメモリに収まらない場合は、別の手法を使用して各 JOIN キーを複数のキーに分割し、並列処理のレベルを上げることができます。この手法は、INNER JOIN オペレーションと LEFT OUTER JOIN オペレーションに適用できます。FULL OUTER JOIN オペレーションには使用できません。

この手法では、偏った側に、1~N の乱数を含む新しい整数列がソルトとして追加されます。偏りのない側が分割され、既存の行ごとに N 個の新しい行が生成されます。分割された側に新しい列が追加され、1~N の各数値が入力されます。その後、通常の JOIN が実行されますが、新しい列が JOIN キーの一部として追加されます。これにより、単一のパーティションに送信されていたすべてのデータが、最大 N 個の異なるパーティションに分散されます。

上記の例では、分散係数 N3 に設定されています。元のデータセットが左側に表示されています。データセットのソルト処理と展開されたバージョンが中央に表示されます。右側にシャッフルされたデータが表示されます。1 つではなく、3 つの異なるエグゼキュータが卵の購入処理に関与しています。

分散を増やすと、並列処理が強化されます。ただし、この方法では JOIN の片側が分割されるため、クラスタ間でシャッフルされるデータが増えます。このため、分散が広がるにつれてメリットは減少します。ほとんどの場合、20 以下に設定します。

次のステップ

  • Cloud Data Fusion の並列処理の詳細を確認する。