並列処理

パイプラインはマシンのクラスタで実行されます。実行する必要がある作業を分割し、クラスタ全体に分散された複数のエグゼキュータで並列に実行することで、高スループットを実現します。一般に、分割(パーティションとも呼ばれます)の数が多いほど、パイプラインを高速に実行できます。パイプラインの並列処理レベルは、パイプラインのソースとシャッフル ステージによって決定されます。

ソース

各パイプラインの実行の開始時に、パイプラインのすべてのソースが、読み取る必要があるデータと、そのデータをスプリットに分割する方法を計算します。たとえば、Cloud Storage から読み取り、Wrangler 変換を実行してから Cloud Storage に書き戻す基本的なパイプラインについて考えてみましょう。

Cloud Storage ソース、Wrangler 変換、Cloud Storage シンクを示す基本的なパイプライン

パイプラインが開始されると、Cloud Storage ソースは入力ファイルを調べ、ファイルサイズに基づいて分割します。たとえば、1 ギガバイトのファイルを 100 個のスプリットに分割し、それぞれのサイズを 10 MB にすることができます。各エグゼキュータは、そのスプリットのデータを読み取り、Wrangler 変換を実行してから、出力を part ファイルに書き込みます。

Cloud Storage 内のデータをパーティション分割し、Wrangler の並列変換でパートファイルに変換する

パイプラインの実行速度が低速である場合は、最初に確認すべきことの一つは、ソースが並列処理を最大限に活用するのに十分な分割を作成しているかどうかです。たとえば、一部の圧縮形式では、プレーンテキスト ファイルを分割できません。gzip 圧縮されたファイルを読み取っている場合、未圧縮のファイルや BZIP で圧縮されたファイル(分割可能)を読み取る場合よりもパイプラインの実行速度が大幅に低下することがあります。同様に、データベース ソースを使用していて、1 つの分割のみを使用するように構成している場合、複数の分割を使用するように構成した場合よりも実行速度が大幅に低下します。

シャッフル

特定のタイプのプラグインを使用すると、クラスタ間でデータがシャッフルされます。これは、1 つのエグゼキュータによって処理されているレコードを別のエグゼキュータに送信して計算を実行する必要がある場合に発生します。シャッフルは大量の I/O を伴うため、費用のかかるオペレーションです。データをシャッフルするプラグインはすべて、Pipeline Studio の [分析] セクションに表示されます。これには、Group By、Deduplicate、Distinct、Joiner などのプラグインが含まれます。たとえば、上の例のパイプラインに [グループ条件] ステージが追加されているとします。

また、読み取られるデータが食料品店で行われた購入を表しているとします。各レコードには、item フィールドと num_purchased フィールドが含まれます。[グループ条件] ステージでは、item フィールドでレコードをグループ化し、num_purchased フィールドの合計を計算するようにパイプラインを構成します。

パイプラインが実行されると、前述のように入力ファイルが分割されます。その後、同じアイテムを持つすべてのレコードが同じエグゼキュータに属するように、各レコードがクラスタ間でシャッフルされます。

上の例に示すように、リンゴの購入の記録は、元々は複数のエグゼキュータに分散されていました。集計を実行するには、これらのレコードをすべてクラスタ全体で同じエグゼキュータに送信する必要がありました。

シャッフルを必要とするほとんどのプラグインでは、データをシャッフルする際に使用するパーティションの数を指定できます。これは、シャッフルされたデータを処理するために使用されるエグゼキュータの数を制御します。

上の例では、パーティション数が 2 に設定されている場合、各エグゼキュータは 1 つではなく 2 つのアイテムの集計を計算します。

このステージの後でパイプラインの並列処理を減らすことは可能です。たとえば、パイプラインの論理ビューについて考えてみましょう。

ソースでデータが 500 個のパーティションに分割されているが、Group By で 200 個のパーティションを使用してシャッフルされる場合、Group By 後の最大並列レベルは 500 から 200 に低下します。Cloud Storage に書き込まれる 500 個の異なる部分ファイルではなく、200 個のみになります。

パーティションの選択

パーティション数が過剰に少ないと、クラスタの容量を最大限に使用して、できるだけ多くの作業を並列化できません。パーティションを大きすぎる値に設定すると、不要なオーバーヘッドが増加します。一般的に、使用するパーティション数が過剰に少ないよりも、過剰に多い方が良好であると言えます。パイプラインの実行に数分かかる場合、数分を短縮しようとすると、追加のオーバーヘッドが問題になります。パイプラインの実行に数時間かかる場合、オーバーヘッドは通常気にする必要はありません。

使用するパーティションの数を決定する便利な方法は、パーティションを max(cluster CPUs, input records / 500,000) に設定することですが、これは単純すぎます。つまり、入力レコード数を 500,000 で割ります。この数がクラスタ CPU の数より大きい場合は、パーティションの数に使用します。それ以外の場合は、クラスタ CPU の数を使用します。たとえば、クラスタに 100 個の CPU があり、シャッフル ステージに 1 億個の入力レコードが予想される場合は、200 個のパーティションを使用します。

より完全な回答としては、各パーティションの中間シャッフル データがエグゼキュータのメモリに完全に収まり、ディスクにスパイルする必要がない場合、シャッフルのパフォーマンスが最適になります。Spark は、シャッフル データを保持するためにエグゼキュータのメモリの 30% 未満を予約します。正確な数値は、(合計メモリ - 300 MB)* 30% です。各エグゼキュータが 2 GB のメモリを使用するように設定されていると仮定すると、各パーティションは(2 GB - 300 MB)* 30% = 約 500 MB のレコードしか保持できません。各レコードが 1 KB に圧縮されると仮定すると、(500 MB / パーティション)÷(1 KB / レコード)= パーティションあたり 500,000 レコードとなります。エグゼキュータがより多くのメモリを使用している場合や、レコードが小さい場合は、この数値を適宜調整できます。

データスキュー

上記の例では、さまざまなアイテムの購入が均等に分散されています。つまり、リンゴ、バナナ、ニンジン、卵がそれぞれ 3 回購入されたということです。均等に分散されたキーでのシャッフルは、最もパフォーマンスの高いシャッフル タイプですが、多くのデータセットにはこのプロパティがありません。前述の例のスーパーでの購入について考えてみましょう。結婚式の招待状よりも卵の購入の方がはるかに多いことが予想されます。他のキーよりもはるかに一般的なシャッフル キーがいくつかある場合は、データが偏っています。偏ったデータでは、少数のエグゼキュータによって不均衡な量の処理が行われるため、偏っていないデータよりもパフォーマンスが大幅に低下する可能性があります。その結果、パーティションの小さなサブセットが他のパーティションよりもはるかに大きくなります。

この例では、卵の購入がカードの購入の 5 倍であるため、卵の集計の計算に約 5 倍の時間がかかることになります。2 件ではなく 10 件のレコードを処理する場合は問題ありませんが、10 億件ではなく 50 億件のレコードを処理する場合は大きな違いが生じます。データの偏差がある場合、シャッフルで使用されるパーティションの数は、パイプラインのパフォーマンスに大きな影響を与えません。

時間の経過に伴う出力レコードのグラフを調べることで、データの偏りを認識できます。パイプラインの実行の開始時にステージがレコードを出力するペースが非常に速く、その後突然速度が低下する場合は、データの偏りがある可能性があります。

クラスタのメモリ使用量の経時的な変化を調べることで、データの偏りを認識することもできます。クラスタが一定期間容量に達しているが、一定期間メモリ使用量が突然低下する場合も、データの偏差に対処している兆候です。

非対称データは、結合の実行時にパフォーマンスに最も大きな影響を与えます。スキュー結合のパフォーマンスを改善するために使用できる手法はいくつかあります。詳細については、JOIN オペレーションの並列処理をご覧ください。

実行のための適応型チューニング

実行を適応的にチューニングするには、厳密なパーティション番号ではなく、使用するパーティションの範囲を指定します。適応型実行が有効になっている場合、正確なパーティション番号は、パイプライン構成で設定されている場合でも無視されます。

エフェメラル Dataproc クラスタを使用している場合、Cloud Data Fusion は適切な構成を自動的に設定しますが、静的 Dataproc クラスタまたは Hadoop クラスタの場合は、次の 2 つの構成パラメータを設定できます。

  • spark.default.parallelism: クラスタで使用可能な vCore の合計数に設定します。これにより、クラスタの負荷不足を防ぎ、パーティション数の下限を定義できます。
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: クラスタで使用可能な vCore 数の 32 倍に設定します。これにより、パーティション数の上限が定義されます。
  • Spark.sql.adaptive.enabled: 最適化を有効にするには、この値を true に設定します。Dataproc では自動的に設定されますが、汎用 Hadoop クラスタを使用している場合は、有効にする必要があります。

これらのパラメータは、特定のパイプラインエンジン構成または静的 Dataproc クラスタのクラスタ プロパティで設定できます。

次のステップ