Dataflow パイプラインをデプロイする

このドキュメントでは、パイプラインのデプロイの概要と、デプロイしたパイプラインで実行できるオペレーションについて説明します。

パイプラインを実行する

Apache Beam パイプラインを作成してテストしたら、パイプラインを実行します。パイプラインをローカルで実行し、Apache Beam パイプラインをテストしてデバッグできます。この処理は Dataflow(Apache Beam パイプラインの実行に使用できるデータ処理システム)でも実行できます。

ローカルで実行する

パイプラインをローカルで実行します。

Java

このクイックスタートから抜粋した次のコードは、WordCount パイプラインをローカルで実行する方法を示しています。詳細については、Java パイプラインをローカルで実行するをご覧ください。

ターミナルで、次のコマンドを実行します。

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

このクイックスタートから抜粋した次のコードは、WordCount パイプラインをローカルで実行する方法を示しています。詳細については、Python パイプラインをローカルで実行するをご覧ください。

ターミナルで、次のコマンドを実行します。

python -m apache_beam.examples.wordcount \ --output outputs

Go

このクイックスタートから抜粋した次のコードは、WordCount パイプラインをローカルで実行する方法を示しています。詳細については、Go パイプラインをローカルで実行するをご覧ください。

ターミナルで、次のコマンドを実行します。

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

ダイレクト ランナーを使用してローカルマシンでパイプラインを実行する方法を学習します。

Dataflow で実行する

Dataflow でパイプラインを実行します。

Java

このクイックスタートから抜粋した次のコードは、WordCount パイプラインを Dataflow で実行する方法を示しています。詳細については、Dataflow で Java パイプラインを実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します(word-count-beam ディレクトリから実行します)。

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

次のように置き換えます。

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID
  • BUCKET_NAME: Cloud Storage バケットの名前
  • REGION: Dataflow リージョンus-central1 など)

Python

このクイックスタートから抜粋した次のコードは、WordCount パイプラインを Dataflow で実行する方法を示しています。詳細については、Dataflow で Python パイプラインを実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

次のように置き換えます。

  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン(例: europe-west1

    --region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

  • STORAGE_BUCKET: 先ほどコピーした Cloud Storage の名前
  • PROJECT_ID: 先ほどコピーした Google Cloud プロジェクト ID

Go

このクイックスタートから抜粋した次のコードは、WordCount パイプラインを Dataflow で実行する方法を示しています。詳細については、Dataflow で Go パイプラインを実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

次のように置き換えます。

  • STORAGE_BUCKET: Cloud Storage バケット名。
  • PROJECT_ID: Google Cloud プロジェクト ID。
  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン。例: europe-west1。使用可能なロケーションのリストについては、Dataflow のロケーションをご覧ください。--region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

Dataflow Runner を使用して Dataflow サービスでパイプラインを実行する方法を学習します。

Dataflow でパイプラインを実行すると、Dataflow は Apache Beam パイプライン コードを Dataflow ジョブに変換します。Dataflow は、Dataflow ジョブを実行するための Compute EngineCloud Storage などの Google Cloud サービスを完全に管理し、必要なリソースを自動的にスピンアップして破棄します。Dataflow が Apache Beam コードを Dataflow ジョブに変換する方法については、パイプラインのライフサイクルをご覧ください。

パイプラインの検証

Dataflow でパイプラインを実行すると、Dataflow は、ジョブの起動前にパイプラインに対して検証テストを実行します。検証テストでパイプラインに問題が見つかると、Dataflow は早期にジョブの送信を失敗します。ジョブログに次のテキストを含むメッセージが記録されます。各メッセージには、検証結果の詳細と問題の解決方法も含まれています。

The preflight pipeline validation failed for job JOB_ID.

実行される検証テストは、Dataflow ジョブが使用するリソースとサービスによって異なります。

  • プロジェクトで Service Usage API が有効になっている場合、パイプライン検証テストは、Dataflow ジョブの実行に必要なサービスが有効になっているかどうかを確認します。
  • プロジェクトで Cloud Resource Manager API が有効になっている場合、パイプライン検証テストは、Dataflow ジョブの実行に必要なプロジェクト レベルの構成があるかどうかを確認します。

サービスの有効化の詳細については、サービスの有効化と無効化をご覧ください。

パイプラインの検証中に検出された権限の問題を解決する方法については、パイプラインを検証できないをご覧ください。

パイプライン検証をオーバーライドし、検証エラーのあるジョブを起動する場合は、次のパイプライン サービス オプションを使用します。

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Go

--dataflow_service_options=enable_preflight_validation=false

パイプライン オプションを設定する

Apache Beam パイプライン コードでパイプライン オプションを設定することで、Dataflow によるジョブの実行方法の一部を制御できます。たとえば、パイプライン オプションを使用して、パイプラインをワーカー仮想マシン、Dataflow サービスのバックエンド、またはローカルで実行するかどうかを設定できます。

パイプラインの依存関係を管理する

Apache Beam パイプラインの多くは、デフォルトの Dataflow ランタイム環境を使用して実行できます。ただし、一部のデータ処理のユースケースでは、追加のライブラリまたはクラスを使用することでメリットが得られます。このような場合には、パイプラインの依存関係を管理することが必要になる可能性があります。依存関係の管理の詳細については、Dataflow でパイプラインの依存関係を管理するをご覧ください。

ジョブをモニタリングする

Dataflow では、Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースなどのツールを使用して、ジョブを可視化できます。

ワーカー VM にアクセスする

Google Cloud コンソールを使用して、特定のパイプラインの VM インスタンスを表示できます。そこから、SSH を使用して各インスタンスにアクセスできます。ただし、ジョブが完了または失敗すると、Dataflow サービスが自動的にシャットダウンし、VM インスタンスがクリーンアップされます。

ジョブの最適化

Dataflow は、Google Cloud リソースの管理に加え、分散並列処理の多くの側面を自動的に実行し、最適化します。

並列化と分散

Dataflow は、データを自動的に分割し、並列処理のためにワーカーコードを Compute Engine インスタンスに分散します。詳細については、並列化と分散をご覧ください。

融合と結合の最適化

Dataflow は、パイプライン コードを使用して、パイプラインの PCollection と変換を表す実行グラフを作成し、最も効率的なパフォーマンスとリソース使用率を実現するためにグラフを最適化します。Dataflow は、データ集計など、コストがかかる可能性のある操作も自動的に最適化します。詳細については、融合の最適化結合の最適化をご覧ください。

自動チューニング機能

Dataflow サービスには、リソース割り当てとデータ パーティショニングを実行中に調整する機能がいくつか用意されています。これらの機能は、Dataflow がジョブを可能な限り迅速かつ効率的に実行するのに役立ちます。主な機能は次のとおりです。

Streaming Engine

デフォルトでは、Dataflow パイプライン ランナーは、ストリーミング パイプラインのステップをすべてワーカー仮想マシンで実行し、ワーカーの CPU、メモリ、Persistent Disk ストレージを消費します。Dataflow の Streaming Engine は、パイプラインの実行をワーカー VM から Dataflow サービスのバックエンドに移動します。詳細については、Streaming Engine をご覧ください。

Dataflow Flexible Resource Scheduling

Dataflow FlexRS は、高度なスケジューリング技術Dataflow Shuffle サービス、プリエンプティブル仮想マシン(VM)インスタンスと通常の VM の組み合わせを使用することで、バッチ処理コストを削減します。プリエンプティブル VM と通常の VM を並列実行することで、システム イベント中に Compute Engine がプリエンプティブル VM インスタンスを停止した場合の Dataflow のユーザー エクスペリエンスが向上します。FlexRS は、プリエンプティブル VM が Compute Engine によってプリエンプトされたときに、パイプラインの処理を続行して以前の作業を確実に保持するのに効果的です。FlexRS の詳細については、Dataflow での Flexible Resource Scheduling の使用をご覧ください。

Dataflow Shielded VM

2022 年 6 月 1 日以降、Dataflow サービスはすべてのワーカーに Shielded VM を使用します。Shielded VM の機能の詳細については、Shielded VM をご覧ください。