Dataflow パイプラインをデプロイする

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このドキュメントでは、パイプライン デプロイの概要と、デプロイされたパイプラインで実行できるオペレーションの一部について説明します。

パイプラインを実行する

Apache Beam パイプラインを作成してテストしたら、パイプラインを実行します。パイプラインをローカルで実行すると、Apache Beam パイプラインのテストやデバッグを行えます。また、Dataflow 上で Apache Beam パイプラインの実行に使用できるデータ処理システムを実行できます。

ローカルでの実行

次のようにパイプラインをローカルで実行します。

Java

このクイックスタートから取得したサンプルコードは、WordCount パイプラインをローカルで実行する方法を示しています。詳細については、Java パイプラインをローカルで実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

このクイックスタートから取得したサンプルコードは、WordCount パイプラインをローカルで実行する方法を示しています。詳細については、Python のパイプラインをローカルで実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

python -m apache_beam.examples.wordcount \ --output outputs

Go

このクイックスタートから取得したサンプルコードは、WordCount パイプラインをローカルで実行する方法を示しています。詳細については、Go パイプラインをローカルで実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

ダイレクト ランナーを使用してマシン上でパイプラインをローカルで実行する方法を学習する。

Dataflow で実行

Dataflow でパイプラインを実行します。

Java

このクイックスタートから取得したサンプルコードは、Dataflow で WordCount パイプラインを実行する方法を示しています。詳細については、Dataflow で Java パイプラインを実行する方法をご覧ください。

ターミナルで(word-count-beam ディレクトリから)次のコマンドを実行します。

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

以下を置き換えます。

Python

このクイックスタートから取得したサンプルコードは、Dataflow で WordCount パイプラインを実行する方法を示しています。詳細については、Dataflow で Python パイプラインを実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

以下を置き換えます。

  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン エンドポイント(例: europe-west1

    --region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

  • STORAGE_BUCKET: 先ほどコピーした Cloud Storage の名前
  • PROJECT_ID: 先ほどコピーした Google Cloud プロジェクト ID

Go

このクイックスタートから取得したサンプルコードは、Dataflow で WordCount パイプラインを実行する方法を示しています。詳細については、Dataflow で Go パイプラインを実行する方法をご覧ください。

ターミナルで、次のコマンドを実行します。

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

以下を置き換えます。

  • STORAGE_BUCKET: Cloud Storage バケット名。
  • PROJECT_ID: Google Cloud プロジェクト ID。
  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: europe-west1。 使用可能なロケーションのリストについては、Dataflow のロケーションをご覧ください。--region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

Dataflow ランナーを使用して Dataflow サービスでパイプラインを実行する方法を学習します。

パイプラインを Dataflow で実行すると、Dataflow は Apache Beam パイプライン コードを Dataflow ジョブに変換します。Dataflow は、Dataflow ジョブを実行するための Compute EngineCloud Storage などの Google Cloud サービスを完全に管理し、必要なリソースを自動的にスピンアップして破棄します。Dataflow により Apache Beam コードが Dataflow ジョブに変換される仕組みの詳細については、パイプラインのライフサイクルをご覧ください。

パイプライン オプションを設定する

Apache Beam パイプライン コードでパイプライン オプションを設定することで、Dataflow がジョブを実行する方法の一部の側面を制御できます。たとえば、パイプライン オプションを使用して、パイプラインをワーカー仮想マシン、Dataflow サービス バックエンド、またはローカルで実行するかどうかを設定できます。

ジョブをモニタリングする

Dataflow では、Dataflow Monitoring InterfaceDataflow コマンドライン インターフェースなどのツールによってジョブを可視化できます。

ワーカー VM にアクセスする

Google Cloud Console を使用して、特定のパイプラインの VM インスタンスを表示できます。そこから、SSH を使用して各インスタンスにアクセスできます。ただし、ジョブが完了または失敗すると、Dataflow サービスが自動的にシャットダウンし、VM インスタンスがクリーンアップされます。

ジョブの最適化

Dataflow は、Google Cloud リソースの管理に加え、分散並列処理のさまざまな側面を自動的に実行し、最適化します。

並列化と分散

Dataflow は、データを自動的に分割し、並列処理のためにワーカーコードを Compute Engine インスタンスに分散します。詳細については、並列化と分散をご覧ください。

融合と結合の最適化

Dataflow は、パイプライン コードを使用して、パイプラインの PCollection と変換を表す実行グラフを作成し、最も効率的なパフォーマンスとリソース使用率を実現するためにグラフを最適化します。Dataflow は、データ集計など、コストがかかる可能性のある操作も自動的に最適化します。詳細については、融合の最適化結合の最適化をご覧ください。

自動チューニング機能。

Dataflow サービスには、リソース割り当てとデータ パーティショニングのオンザフライ調整を提供する機能がいくつか含まれます。これらの機能は、Dataflow がジョブをできるだけ素早く効率的に実行するのに役立ちます。主な機能は次のとおりです。

Streaming Engine

デフォルトでは、Dataflow パイプライン ランナーは、ストリーミング パイプラインのステップをすべてワーカー仮想マシンで実行し、ワーカーの CPU、メモリ、Persistent Disk ストレージを消費します。Dataflow の Streaming Engine は、パイプラインの実行をワーカー VM から Dataflow サービスのバックエンドに移動します。詳細については、Streaming Engine をご覧ください。

Dataflow Flexible Resource Scheduling

Dataflow FlexRS は、高度なスケジューリング技術Dataflow Shuffle サービス、プリエンプティブル仮想マシン(VM)インスタンスと通常の VM の組み合わせを使用することで、バッチ処理コストを削減します。プリエンプティブル VM と通常の VM を並列実行することで、システム イベント中に Compute Engine がプリエンプティブル VM インスタンスを停止した場合の Dataflow のユーザー エクスペリエンスが向上します。FlexRS は、プリエンプティブル VM が Compute Engine によってプリエンプトされたときに、パイプラインの処理を続行して以前の作業を確実に保持するのに効果的です。FlexRS の詳細については、Dataflow での Flexible Resource Scheduling の使用をご覧ください。

Dataflow Shielded VM

2022 年 6 月 1 日以降、Dataflow サービスはすべてのワーカーに Shielded VM を使用します。Shielded VM の機能の詳細については、Shielded VM をご覧ください。