カスタム コンテナで Dataflow ジョブを実行する

このドキュメントでは、カスタム コンテナを使用して Dataflow パイプラインを実行する方法について説明します。

コンテナ イメージの作成方法については、Dataflow 用のカスタム コンテナ イメージをビルドするをご覧ください。

パイプラインを実行するときに、カスタム コンテナ イメージ上の SDK と同じバージョンと言語バージョンの Apache Beam SDK を使用してパイプラインを起動します。このステップにより、互換性のない依存関係や SDK からの予期しないエラーを回避できます。

ローカルでテストする

Dataflow でパイプラインを実行する前に、コンテナ イメージをローカルでテストすることをおすすめします。これにより、テストとデバッグをより迅速に行うことができます。

Apache Beam 固有の使用方法については、Apache Beam のガイドでカスタム コンテナ イメージを使用したパイプラインの実行に関する説明をご覧ください。

PortableRunner を使用した基本的なテスト

リモート コンテナ イメージを pull でき、簡単なパイプラインを実行できることを確認するには、Apache Beam PortableRunner を使用します。PortableRunner を使用すると、ジョブ送信がローカル環境で行われ、DoFn の実行が Docker 環境で行われます。

GPU を使用する場合、Docker コンテナが GPU にアクセスできない場合があります。GPU を使用してコンテナをテストするには、Direct Runner を使用し、「GPU を使用する」ページのスタンドアロン VM でデバッグするの手順に沿って、スタンドアロン VM でコンテナ イメージをテストします。

次のサンプル パイプラインを実行します。

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

次のように置き換えます。

  • REGION: 使用するジョブサービス リージョン(アドレスとポートの形式)。例: localhost:3000。プロセス内ジョブサービスを実行するには embed を使用します。
  • IMAGE_URI: カスタム コンテナ イメージの URI。
  • INPUT_FILE: テキスト ファイルとして読み取り可能な入力ファイル。このファイルは、コンテナ イメージまたはリモート ファイルにプリロードされた SDK ハーネス コンテナ イメージからアクセスできる必要があります。
  • OUTPUT_FILE: 出力を書き込むパス。このパスは、リモートパスまたはコンテナのローカルパスになります。

パイプラインが正常に完了したらコンソールのログを調べて、パイプラインが正常に完了し、IMAGE_URI で指定されたリモート イメージが使用されていることを確認します。

パイプラインの実行後、コンテナに保存されたファイルはローカル ファイル システムに存在せず、コンテナは停止します。停止したコンテナ ファイル システムからファイルをコピーするには、docker cp を使用します。

または

  • Cloud Storage などのリモート ファイル システムに出力を提供します。認証情報ファイルまたはアプリケーションのデフォルト認証情報など、テスト目的でアクセスを手動で構成しなければならない場合があります。
  • すばやくデバッグできるように、一時的なロギングを追加します。

Direct Runner を使用する

コンテナ イメージとパイプラインのより詳細なローカルテストには、Apache Beam Direct Runner を使用します。

パイプラインをコンテナとは別に検証するには、コンテナ イメージと一致するローカル環境でテストするか、実行中のコンテナでパイプラインを起動します。

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

IMAGE_URI は、カスタム コンテナ イメージの URI に置き換えます。

この例では、パイプライン ファイル(パイプライン自体を含む)がカスタム コンテナに配置されていることを前提としています。また、ローカル ファイル システムからマウントされているか、Apache Beam とコンテナからリモートでアクセスできることを前提としています。たとえば、Maven(mvn)を使用して前の Java の例を実行するには、Maven とその依存関係をコンテナでステージングする必要があります。詳細については、Docker ドキュメントのストレージdocker run をご覧ください。

Direct Runner でテストする目的は、デフォルトの ENTRYPOINT でコンテナの実行をテストすることではなく、カスタム コンテナ環境でパイプラインをテストすることです。ENTRYPOINTdocker run --entrypoint ... など)を変更してパイプラインを直接実行するか、コンテナでコマンドを手動で実行します。

Compute Engine でのコンテナの実行に基づく特定の構成に依存している場合は、Compute Engine VM でコンテナを直接実行できます。詳細については、Compute Engine のコンテナをご覧ください。

Dataflow ジョブを起動する

Dataflow で Apache Beam パイプラインを起動するときは、コンテナ イメージへのパスを指定します。カスタム イメージでは :latest タグを使用しないでください。ビルドには、日付または一意の識別子をタグとして設定します。このタイプのタグを使用すると、問題が発生したときにパイプラインの実行を既知の動作構成に戻し、変更の検査が可能になります。

Java

--sdkContainerImage を使用して、Java ランタイム用の SDK コンテナ イメージを指定します。

Runner v2 を有効にするには、--experiments=use_runner_v2 を使用します。

Python

SDK バージョン 2.30.0 以降を使用している場合は、パイプライン オプション --sdk_container_image を使用して SDK コンテナ イメージを指定します。

古いバージョンの SDK の場合は、パイプライン オプション --worker_harness_container_image を使用して、ワーカー ハーネスに使用するコンテナ イメージの場所を指定します。

カスタム コンテナは Dataflow Runner v2 でのみサポートされます。バッチ Python パイプラインを起動する場合は、--experiments=use_runner_v2 フラグを設定します。ストリーミング Python パイプラインを起動する場合、ストリーミング Python パイプラインはデフォルトで Runner v2 を使用するため、テストを指定する必要はありません。

Go

SDK バージョン 2.40.0 以降を使用している場合は、パイプライン オプション --sdk_container_image を使用して SDK コンテナ イメージを指定します。

古いバージョンの SDK の場合は、パイプライン オプション --worker_harness_container_image を使用して、ワーカー ハーネスに使用するコンテナ イメージの場所を指定します。

カスタム コンテナは、デフォルトで Dataflow Runner v2 を使用するため、Go SDK のすべてのバージョンでサポートされています。

次の例では、カスタム コンテナを使用してバッチ WordCount サンプルを起動する方法を示しています。

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Apache Beam SDK for Python バージョン 2.30.0 以降の使用:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

次のように置き換えます。

  • INPUT_FILE: サンプルの実行時に Dataflow によって読み取られる Cloud Storage 入力パス。
  • OUTPUT_FILE: サンプル パイプラインによって書き込まれる Cloud Storage 出力パス。このファイルには文字数が含まれています。
  • PROJECT_ID: Google Cloud プロジェクトの ID。
  • REGION: Dataflow ジョブをデプロイするリージョン。
  • TEMP_LOCATION: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。
  • DISK_SIZE_GB: 省略可。コンテナのサイズが大きい場合は、ディスク容量が不足しないように、デフォルトのブートディスク サイズを増やすことを検討してください。
  • IMAGE_URI: SDK カスタム コンテナ イメージの URI。バージョニングされたコンテナの SHA またはタグを常に使用します。:latest タグや可変タグは使用しないでください。