このページでは、Cloud Data Fusion の JOIN
オペレーションのパフォーマンス チューニングについて説明します。
JOIN
オペレーションは、パイプラインの中で最もコストがかかる部分です。パイプラインの他のすべての処理と同様に、オペレーションは並行して実行されます。JOIN
の最初のステップでは、同じ JOIN
キーを持つすべてのレコードが同じエグゼキュータに送信されるように、データをシャッフルします。すべてのデータがシャッフルされると、結合され、出力はパイプラインを通過します。
JOIN
オペレーションでの並列処理の例
たとえば、Purchases
と Items
というデータセットに対して JOIN
オペレーションを実行するとします。各購入レコードには、アイテム名と購入数が表示されます。各アイテム レコードには、アイテム名とそのアイテムの価格が含まれています。アイテム名に対して JOIN
が実行され、各購入の合計金額が計算されます。データが結合されると、同じ ID のレコードが同じエグゼキュータに配置されるように、クラスタ全体でデータがシャッフルされます。
JOIN
キーがほぼ均等に分散されていると、並行して実行できるため、JOIN
オペレーションのパフォーマンスが良好になります。
他のシャッフルと同様、データスキューがあるとパフォーマンスに悪影響を及ぼします。上の例では、卵は鶏肉や牛乳よりもはるかに頻繁に購入されています。つまり、卵の購入に関するデータを結合するエグゼキュータは、他のエグゼキュータよりも多くの処理を行います。JOIN
が偏っている場合は、パフォーマンスを向上させる方法が 2 つあります。
偏ったパーティションを自動的に分割する
適応型クエリ実行では、非常に大きなスキューは自動的に処理されます。JOIN
が他のパーティションよりもはるかに大きいパーティションを生成するとすぐに、それらは小さいパーティションに分割されます。適応型クエリ実行が有効になっていることを確認するには、自動チューニングをご覧ください。
インメモリ JOIN
を使用する
JOIN
の片側がメモリに収まる程度の小さいサイズであれば、メモリ内 JOIN
を実行できます。この場合、小さなデータセットがメモリに読み込まれ、すべてのエグゼキュータにブロードキャストされます。大規模なデータセットはいずれもシャッフルされず、JOIN
キーでシャッフルするときに生成される不均一なパーティションが削除されます。
上の例では、まずアイテムのデータセットが Spark ドライバのメモリに読み込まれます。その後、各エグゼキュータにブロードキャストされます。エグゼキュータは、購入データセットをシャッフルしなくてもデータを結合できるようになりました。
このアプローチでは、ブロードキャスト データセットをメモリに保存できるように、Spark ドライバとエグゼキュータの両方に十分なメモリを割り当てる必要があります。デフォルトでは、Spark はこのようなデータの保存用に 30% をわずかに下回るメモリを予約します。メモリ内 JOIN
を使用する場合は、データセットのサイズに 4 を乗じて、エグゼキュータとドライバのメモリとして設定します。たとえば、アイテム データセットのサイズが 1 GB の場合、エグゼキュータとドライバのメモリを少なくとも 4 GB に設定する必要があります。8 GB を超えるデータセットはメモリに読み込めません。
鍵の配布
JOIN
の両側のサイズが大きくメモリに収まらない場合は、別の手法を使用して各 JOIN
キーを複数のキーに分割し、並列処理のレベルを上げることができます。この手法は、INNER JOIN
オペレーションと LEFT OUTER JOIN
オペレーションに適用できます。FULL OUTER JOIN
オペレーションには使用できません。
この手法では、偏った側に、1~N の乱数を含む新しい整数列がソルトとして追加されます。偏りのない側が分割され、既存の行ごとに N
個の新しい行が生成されます。分割された側に新しい列が追加され、1~N の各数値が入力されます。その後、通常の JOIN
が実行されますが、新しい列が JOIN
キーの一部として追加されます。これにより、単一のパーティションに送信されていたすべてのデータが、最大 N
個の異なるパーティションに分散されます。
上記の例では、分散係数 N
が 3
に設定されています。元のデータセットが左側に表示されています。データセットのソルト処理と展開されたバージョンが中央に表示されます。右側にシャッフルされたデータが表示されます。1 つではなく、3 つの異なるエグゼキュータが卵の購入処理に関与しています。
分散を増やすと、並列処理が強化されます。ただし、この方法では JOIN
の片側が分割されるため、クラスタ間でシャッフルされるデータが増えます。このため、分散が広がるにつれてメリットは減少します。ほとんどの場合、20 以下に設定します。
次のステップ
- Cloud Data Fusion の並列処理の詳細を確認する。