変換のプッシュダウンの概要

データ パイプラインのパフォーマンスを改善するには、一部の変換オペレーションを Apache Spark ではなく BigQuery に push します。変換のプッシュダウンとは、Cloud Data Fusion データ パイプラインのオペレーションを実行エンジンとして BigQuery に push できるようにする設定を指します。その結果、オペレーションとそのデータが BigQuery に転送され、そこでオペレーションが実行されます。

変換のプッシュダウンを使用すると、複数の複雑な JOIN オペレーションやその他のサポートされている変換を含むパイプラインのパフォーマンスが向上します。BigQuery で一部の変換を実行すると、Spark で実行するよりも高速になる場合があります。

サポートされていない変換とすべてのプレビュー変換は Spark で実行されます。

サポートされている変換

変換のプッシュダウンは Cloud Data Fusion バージョン 6.5.0 以降で使用できますが、次の変換の一部は新しいバージョンでのみサポートされています。

JOIN オペレーション

  • 変換のプッシュダウンは、Cloud Data Fusion バージョン 6.5.0 以降の JOIN オペレーションで使用できます。

  • 基本(キーオン)オペレーションと高度な JOIN オペレーションがサポートされています。

  • BigQuery で実行するには、結合で 2 つの入力ステージが必要です。

  • 以下の場合を除いて、1 つ以上の入力をメモリに読み込むように構成された結合は、BigQuery ではなく Spark で実行されます。

    • 結合への入力のいずれかがすでにプッシュダウンされている場合。
    • 結合を SQL Engine で実行するように構成した場合(強制実行のステージ オプションを参照)。

BigQuery シンク:

変換のプッシュダウンは、Cloud Data Fusion バージョン 6.7.0 以降の BigQuery シンクで使用できます。

BigQuery シンクが BigQuery で実行されたステージに従うと、BigQuery にレコードを書き込むオペレーションが BigQuery で直接実行されます。

このシンクのパフォーマンスを向上させるには、次のものが必要です。

  • サービス アカウントには、BigQuery シンクで使用されるデータセット内のテーブルを作成および更新する権限が必要です。
  • 変換のプッシュダウン使用するデータセットと BigQuery シンクは、同じロケーションに保存する必要があります。
  • オペレーションは次のいずれかにする必要があります。
    • InsertTruncate Table オプションはサポートされていません)
    • Update
    • Upsert

GROUP BY 集計

変換のプッシュダウンは、Cloud Data Fusion バージョン 6.7.0 以降の GROUP BY 集計で使用できます。

BigQuery の GROUP BY 集計は、次のオペレーションに使用できます。

  • Avg
  • Collect List(null 値は出力配列から削除されます)
  • Collect Set(null 値は出力配列から削除されます)
  • Concat
  • Concat Distinct
  • Count
  • Count Distinct
  • Count Nulls
  • Logical And
  • Logical Or
  • Max
  • Min
  • Standard Deviation
  • Sum
  • Sum of Squares
  • Corrected Sum of Squares
  • Variance
  • Shortest String
  • Longest String

GROUP BY 集計は、次の場合に BigQuery で実行されます。

  • すでにプッシュダウンされたステージに従います。
  • SQL Engine で実行するように構成した(強制実行のステージ オプションを参照)。

重複除去の集計

変換のプッシュダウンは、以下のオペレーションのために、Cloud Data Fusion バージョン 6.7.0 以降での重複除去の集計に使用できます。

  • フィルタ オペレーションが指定されていない
  • ANY(目的のフィールドの null 以外の値)
  • MIN(指定したフィールドの最小値)
  • MAX(指定したフィールドの最大値)

次のオペレーションはサポートされていません。

  • FIRST
  • LAST

重複除去の集計は、次の場合に SQL エンジンで実行されます。

  • すでにプッシュダウンされたステージに従います。
  • SQL Engine で実行するように構成した(強制実行のステージ オプションを参照)。

BigQuery ソースのプッシュダウン

BigQuery Source Pushdown は Cloud Data Fusion バージョン 6.8.0 以降で使用できます。

BigQuery ソースが BigQuery のプッシュダウンと互換性のあるステージに従う場合、パイプラインは BigQuery 内の互換性のあるすべてのステージを実行できます。

Cloud Data Fusion は、BigQuery 内でパイプラインを実行するために必要なレコードをコピーします。

BigQuery Source Pushdown を使用する場合、テーブルのパーティショニングとクラスタリングのプロパティが保持されるので、これらのプロパティを使用して結合などの以降のオペレーションを最適化できます。

その他の要件

BigQuery Source Pushdown を使用するには、次の要件を満たす必要があります。

  • BigQuery 変換のプッシュ通知用に構成されたサービス アカウントには、BigQuery ソースのデータセット内のテーブルを読み取る権限が必要です。

  • BigQuery ソースで使用されるデータセットと、変換のプッシュダウン用に構成されたデータセットは、同じロケーションに保存する必要があります。

ウィンドウ集計

変換のプッシュダウンは、Cloud Data Fusion バージョン 6.9 以降のウィンドウ集計で使用できます。BigQuery のウィンドウ集計は、次のオペレーションでサポートされています。

  • Rank
  • Dense Rank
  • Percent Rank
  • N tile
  • Row Number
  • Median
  • Continuous Percentile
  • Lead
  • Lag
  • First
  • Last
  • Cumulative distribution
  • Accumulate

ウィンドウ集計は、次の場合に BigQuery で実行されます。

Wrangler フィルタのプッシュダウン

Wrangler フィルタのプッシュダウンは、Cloud Data Fusion バージョン 6.9 以降で使用できます。

Wrangler プラグインを使用すると、フィルタを push して(Precondition オペレーション)、Spark ではなく BigQuery で実行できます。

フィルタのプッシュダウンは、これもバージョン 6.9 でリリースされた前提条件の SQL モードでのみサポートされています。このモードでは、プラグインが ANSI 標準 SQL の前提条件式を受け入れます。

前提条件に SQL モードを使用すると、Wrangler プラグインでディレクティブユーザー定義のディレクティブが無効になります。これは、SQL モードの前提条件では、それらがサポートされていないためです。

変換のプッシュダウンが有効になっている場合、前提条件の SQL モードは、複数の入力がある Wrangler プラグインでサポートされません。複数の入力で使用する場合、この Wrangler ステージと SQL フィルタ条件は Spark で実行されます。

フィルタは、次の場合に BigQuery で実行されます。

指標

BigQuery で実行されるパイプラインの部分に Cloud Data Fusion から提供される指標の詳細については、BigQuery プッシュダウン パイプラインの指標をご覧ください。

変換のプッシュダウンを使用するタイミング

BigQuery で変換を実行するには、次の手順に従います。

  1. パイプラインでサポートされているステージのレコードを BigQuery に書き込む。
  2. BigQuery でサポートされるステージを実行する。
  3. サポートされている変換が実行された後、BigQuery からレコードを読み取る(BigQuery シンクが続く場合を除く)。

データセットのサイズによっては、ネットワークのオーバーヘッドがかなり大きくなる可能性があります。これは、変換のプッシュダウンが有効な場合、パイプラインの実行時間全体に悪影響を与える可能性があります。

ネットワークオーバーヘッドのため、次のような場合は変換のプッシュダウンをおすすめします。

  • サポートされている複数のオペレーションが順番に実行れる(ステージ間にステップがない)。
  • BigQuery に変換の実行に関して得られるパフォーマンスが、Spark と比較して、BigQuery との間のデータ レイテンシおよび場合によっては BigQuery から生じるレイテンシを上回る。

仕組み

変換のプッシュダウンを使用するパイプラインを実行すると、Cloud Data Fusion は BigQuery でサポートされている変換ステージを実行します。パイプラインのその他のステージはすべて Spark で実行されます。

変換を実行する場合:

  1. Cloud Data Fusion は、Cloud Storage にレコードを書き込み、BigQuery 読み込みジョブを実行して、入力データセットを BigQuery に読み込みます。

  2. JOIN オペレーションとサポートされている変換は、SQL ステートメントを使用して BigQuery ジョブとして実行されます。

  3. ジョブの実行後にさらに処理が必要な場合は、レコードを BigQuery から Spark にエクスポートできます。ただし、BigQuery シンクに直接コピーを試行オプションが有効で、BigQuery シンクが BigQuery で実行されたステージに従うと、レコードがエクスポート先の BigQuery シンク テーブルに直接書き込まれます。

次の図は、変換のプッシュダウンによって、Spark の代わりに BigQuery でサポートされる変換がどのように実行されるかを示しています。

Cloud Data Fusion パイプラインでの BigQuery への変換のプッシュダウン。

おすすめの方法

クラスタとエグゼキュータのサイズを調整する

パイプラインでリソース管理を最適化するには、次の手順に従います。

  • ワークロードに適切な数のクラスタ ワーカー(ノード)を使用します。つまり、プロビジョニングされた Dataproc クラスタを最大限に活用するには、インスタンスで使用可能な CPU とメモリを完全に使用し、大規模なジョブの場合は BigQuery の実行速度を活用できます。

  • 自動スケーリング クラスタを使用して、パイプラインの並列処理を向上させます。

  • パイプラインの実行中に BigQuery からレコードを push または pull するステージで、リソース構成を調整します。

推奨: エグゼキュータのリソースの CPU コア数を増やしてみてください(ワーカーノードで使用する CPU コア数まで)。 エグゼキュータは、データが BigQuery に送受信されるにつれて、シリアル化および非シリアル化ステップにおいて CPU 使用を最適化します。詳細については、クラスタのサイジングをご覧ください。

BigQuery で変換を実行するメリットは、パイプラインを小さな Dataproc クラスタで実行できることです。結合がパイプラインで最もリソース消費量の多いオペレーションである場合は、より小さいクラスタサイズで試すことができます。負荷の大きい JOIN オペレーションが BigQuery で実行されるため、全体的なコンピューティング費用を削減できます。

BigQuery Storage Read API を使用してデータを迅速に取得する

BigQuery が変換を実行した後、パイプラインに Spark で実行する追加のステージが存在することがあります。Cloud Data Fusion バージョン 6.7.0 以降では、変換のプッシュダウンは BigQuery Storage Read API をサポートしています。これにより、レイテンシが改善され、Spark への読み取りオペレーションが高速化されます。これにより、パイプライン全体の実行時間を短縮できます。

API はレコードを並行して読み取るため、それに応じてエグゼキュータのサイズを調整することをおすすめします。リソースを大量に消費するオペレーションを BigQuery で実行する場合は、エグゼキュータのメモリ割り当てを減らし、パイプラインの実行時に並列処理を改善します(クラスタとエグゼキュータのサイズを調整するをご覧ください)。

BigQuery Storage Read API はデフォルトで無効になっています。Scala 2.12 がインストールされている実行環境(Dataproc 2.0 と Dataproc 1.5 を含む)で有効にできます。

データセットのサイズを検討する

JOIN オペレーションのデータセットのサイズを検討します。クロス出力の JOIN オペレーションに似ているものなど、かなりの数の出力レコードを生成する JOIN オペレーションの場合、生成されるデータセットのサイズは入力データセットよりも大きくなります。また、全体的なパイプライン パフォーマンスのコンテキストで、これらのレコードに対する Spark の追加処理(変換やシンクなど)が発生したときに、これらのレコードを Spark に戻すオーバーヘッドも検討します。

偏りのあるデータを軽減する

偏りのあるデータに対する JOIN オペレーションでは、BigQuery ジョブがリソース使用率の上限を超過し、JOIN オペレーションが失敗する可能性があります。これを防ぐには、Joiner プラグインの設定に移動し、[偏りのある入力ステージ] フィールドで偏りのある入力を特定します。これにより、Cloud Data Fusion は、BigQuery ステートメントが上限を超えるリスクを軽減するように入力を配置できます。

Joiner プラグインの設定の [偏りのある入力ステージ] フィールドで、偏りのあるデータを特定します。

次のステップ