Dataflow でのカスタム コンテナの使用

このページでは、カスタム コンテナ イメージを提供して、Dataflow パイプラインのユーザーコードのランタイム環境をカスタマイズする方法について説明します。カスタム コンテナは、Dataflow Runner v2 を使用するパイプラインでサポートされています。

Dataflow がワーカー VM を起動すると、Docker コンテナ イメージを使用してワーカー上でコンテナ化された SDK プロセスを起動します。デフォルトの Apache Beam イメージのいずれかを使用する代わりに、カスタム コンテナ イメージを指定できます。カスタム コンテナ イメージを指定すると、Dataflow は指定されたイメージを pull するワーカーを起動します。カスタム コンテナを使用する理由は次のとおりです。

  • ワーカーの起動時間を短縮するために、パイプラインの依存関係をプリインストールする。
  • パブリック リポジトリでは使用できないパイプラインの依存関係をプリインストールする。
  • ワーカーの起動時間を短縮するために、大きなファイルを事前ステージングする。
  • サードパーティ ソフトウェアをバックグラウンドで起動する。
  • 実行環境をカスタマイズする。

カスタム コンテナの詳細については、Apache Beam カスタム コンテナ ガイドをご覧ください。

始める前に

Runner v2 とお使いの言語バージョンをサポートする Apache Beam SDK のバージョンがインストールされていることを確認します。

次に、以下のパイプライン オプションを使用して、カスタム コンテナを有効にします。

Java

Runner v2 を有効にするには、--experiments=use_runner_v2 を使用します。

--sdkContainerImage を使用して、Java ランタイム用のカスタム コンテナ イメージを指定します。

Python

SDK バージョン 2.30.0 以降を使用している場合は、パイプライン オプション --sdk_container_image を使用します。

より古いバージョンの SDK の場合は、パイプライン オプション --worker_harness_container_image を使用します。

詳細については、Apache Beam SDK のインストールのガイドをご覧ください。

コンテナ イメージをローカルでテストするには、Docker がインストールされている必要があります。詳細については、Docker の取得をご覧ください。

デフォルトの SDK コンテナ イメージ

ベースのコンテナ イメージとして、デフォルトの Apache Beam SDK イメージで始めることをおすすめします。デフォルトのイメージは、Apache Beam リリースの一環として DockerHub にリリースされます。

コンテナ イメージの作成とビルド

このセクションでは、カスタム SDK コンテナ イメージを作成するさまざまな例を示します。

カスタム SDK コンテナ イメージは、次の要件を満たす必要があります。

  • Apache Beam SDK と必要な依存関係がインストールされている。
  • デフォルトの ENTRYPOINT スクリプト(デフォルト コンテナの /opt/apache/beam/boot)が、コンテナ起動時の最後のステップとして実行される。詳細については、コンテナのエントリポイントの変更をご覧ください。

Apache Beam のベースイメージの使用

カスタム コンテナ イメージを作成するには、Apache Beam イメージを親イメージとして指定し、独自のカスタマイズを追加します。Dockerfile の作成方法については、Dockerfiles 作成のベスト プラクティスをご覧ください。

  1. FROM 命令を使用してベースイメージを指定して、新しい Dockerfile を作成します。

    Java

    この例では、Apache Beam SDK バージョン 2.34.0 とともに Java 8 を使用します。

    FROM apache/beam_java8_sdk:2.34.0
    
    # Make your customizations here, for example:
    ENV FOO=/bar
    COPY path/to/myfile ./
    

    カスタム コンテナのランタイム バージョンは、パイプラインの開始に使用するランタイムと一致する必要があります。たとえば、パイプラインをローカルの Java 11 環境から開始する場合は、FROM 行に apache/beam_java11_sdk:... を記述し、Java 11 環境を指定する必要があります。

    Python

    この例では、Python 3.8 と Apache Beam SDK バージョン 2.34.0 を使用します。

    FROM apache/beam_python3.8_sdk:2.34.0
    
    # Make your customizations here, for example:
    ENV FOO=/bar
    COPY path/to/myfile ./
    

    カスタム コンテナのランタイム バージョンは、パイプラインの開始に使用するランタイムと一致する必要があります。たとえば、パイプラインをローカルの Python 3.8 環境から開始する場合は、FROM 行に apache/beam_python3.8_sdk:... を記述し、Python 3.8 環境を指定する必要があります。

  2. 子イメージをビルドし、このイメージをコンテナ レジストリに push します。

    Cloud Build

    export PROJECT=PROJECT
    export REPO=REPO
    export TAG=TAG
    export IMAGE_URI=gcr.io/$PROJECT/$REPO:$TAG
    gcloud builds submit . --tag $IMAGE_URI
    

    Docker

    export PROJECT=PROJECT
    export REPO=REPO
    export TAG=TAG
    export IMAGE_URI=gcr.io/$PROJECT/$REPO:$TAG
    docker build . --tag $IMAGE_URI
    docker push $IMAGE_URI
    

    次のように置き換えます。

    • PROJECT: プロジェクト名またはユーザー名。
    • REPO: イメージ リポジトリの名前。
    • TAG: イメージタグ。通常は latest です。

カスタム ベース イメージまたはマルチステージ ビルドの使用

既存のベースイメージがある場合や、デフォルトの Apache Beam イメージのベース要素(OS バージョン、パッチなど)を変更する必要がある場合は、マルチステージ ビルドプロセスを使用して、デフォルトの Apache Beam ベースイメージから必要なアーティファクトをコピーし、カスタム コンテナ イメージを提供します。

以下に、Apache Beam SDK からファイルをコピーする Dockerfile の例を示します。

Java

FROM openjdk:8

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_java8_sdk:2.34.0 /opt/apache/beam /opt/apache/beam

# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT ["/opt/apache/beam/boot"]

Python

FROM python:3.8-slim

# Install SDK.
RUN pip install --no-cache-dir apache-beam[gcp]==2.34.0

# Verify that the image does not have conflicting dependencies.
RUN pip check

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.8_sdk:2.34.0 /opt/apache/beam /opt/apache/beam

# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT ["/opt/apache/beam/boot"]

この例では、必要な依存関係(この場合は Python 3.8 と pip)が既存のベースイメージにインストールされていることを前提としています。イメージに Apache Beam SDK をインストールすると、必要な SDK 依存関係がイメージにあり、ワーカーの起動時間が短縮されます。RUN 命令で指定されるバージョンは、パイプラインの起動に使用されるバージョンと一致する必要があります。

コンテナ エントリポイントの変更

カスタム コンテナはデフォルトの ENTRYPOINT スクリプト /opt/apache/beam/boot を実行する必要があります。これにより、ワーカー環境が初期化され、SDK ワーカー プロセスが開始されます。このエントリポイントを設定しない場合、ワーカーは正しく起動しません。

コンテナの起動時に独自のスクリプトを実行する必要がある場合は、イメージ ENTRYPOINT が、このスクリプトに Dataflow の引数を渡すことを含め、このワーカー SDK プロセスを引き続き適切に起動する必要があります。

つまり、カスタム ENTRYPOINT/opt/apache/beam/boot の実行で終了する必要があります。また、コンテナの起動時に Dataflow によって渡されたすべての必須引数が、デフォルトの起動スクリプトに正しく渡される必要があります。これを行うには、/opt/apache/beam/boot を実行するカスタム スクリプトを作成します。

#!/bin/bash

echo "This is my custom script"

# ...

# Pass command arguments to the default boot script.
/opt/apache/beam/boot "$@"

次に、デフォルトの ENTRYPOINT をオーバーライドします。Dockerfile の例:

Java

FROM apache/beam_java8_sdk:2.34.0

COPY script.sh path/to/my/script.sh
ENTRYPOINT [ "path/to/my/script.sh" ]

Python

FROM apache/beam_python3.8_sdk:2.34.0

COPY script.sh path/to/my/script.sh
ENTRYPOINT [ "path/to/my/script.sh" ]

カスタム コンテナを使用したジョブの実行

このセクションでは、テストと Dataflow の両方で、カスタム コンテナを使用してパイプラインを実行する方法について説明します。コンテナ イメージとパイプラインをすでに確認している場合は、Dataflow ジョブの起動に進んでください。

始める前に

パイプラインを実行するときは、カスタム コンテナ イメージ上の SDK と同じバージョンと言語バージョンの Apache Beam SDK を使用してパイプラインを起動してください。これにより、互換性のない依存関係や SDK からの予期しないエラーを回避できます。

ローカルでのテスト

Apache Beam 固有の使用方法については、Apache Beam のガイドでカスタム コンテナ イメージを使用したパイプラインの実行に関する説明をご覧ください。

PortableRunner を使用した基本的なテスト

リモート コンテナ イメージを pull でき、Apache Beam PortableRunner を使用して少なくとも単純なパイプラインを実行できることをテストします。

次のサンプル パイプラインを実行します。

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --job_endpoint=embed \
    --environment_type=DOCKER \
    --environment_config=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=embed \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

次のように置き換えます。

  • IMAGE_URI: カスタム コンテナ イメージの URI。変数がスコープ内にある場合は、前の手順で作成したシェル変数 $IMAGE_URI を使用できます。
  • INPUT_FILE: テキスト ファイルとして読み取り可能な入力ファイル。このファイルは、コンテナ イメージまたはリモート ファイルにプリロードされた SDK ハーネス コンテナ イメージからアクセスできる必要があります。
  • OUTPUT_FILE: 出力を書き込むファイルパス。このパスは、リモートパスまたはコンテナのローカルパスになります。

パイプラインが正常に完了したら、コンソールのログを調べて、パイプラインが正常に完了したことを確認します。また、$IMAGE_URI で指定されたリモート イメージが使用されていたことを確認します。

実行後、コンテナに保存したファイルはローカル ファイルシステムにはなく、コンテナは停止することに注意してください。ファイルは停止したコンテナ ファイルシステム docker cp からコピーできます。

または

  • Cloud Storage などのリモート ファイル システムに出力を提供します。認証情報ファイルまたはアプリケーションのデフォルト認証情報など、テスト目的でアクセスを手動で構成しなければならない場合もあります。
  • 迅速なデバッグのために一時的なロギングを追加します。

Direct Runner の使用

コンテナ イメージとパイプラインのより詳細なローカルテストには、Apache Beam DirectRunner を使用します。

パイプラインをコンテナとは別に検証するには、コンテナ イメージと一致するローカル環境でテストするか、実行中のコンテナでパイプラインを起動します。

Java

docker run -it --entrypoint "/bin/bash" $IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" $IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

この例では、パイプライン ファイル(パイプライン自体を含む)がカスタム コンテナ自体に配置されていること、ローカル ファイル システムからマウントされていること、または Apache Beam とコンテナからリモートでアクセスできることを前提としています。たとえば、Maven(mvn)を使用して上記の Java の例を実行するには、Maven とその依存関係をコンテナでステージングする必要があります。詳細については、ストレージdocker run に関する Docker のドキュメントをご覧ください。

DirectRunner テストの目標は、カスタム コンテナ環境でパイプラインをテストすることです。デフォルトの ENTRYPOINT でコンテナを実際に実行してテストすることではありません。ENTRYPOINTdocker run --entrypoint ... など)を変更してパイプラインを直接実行するか、コンテナでコマンドを手動で実行します。

Compute Engine 上で実行することを特定の構成に依存する場合は、このコンテナを Compute Engine VM 上で直接実行できます。詳細については、Compute Engine 上のコンテナをご覧ください。

Dataflow ジョブの起動

Dataflow で Apache Beam パイプラインを起動するときは、次のように、コンテナ イメージへのパスを指定します。

Java

--sdkContainerImage を使用して、Java ランタイム用のカスタム コンテナ イメージを指定します。

Runner v2 を有効にするには、--experiments=use_runner_v2 を使用します。

Python

SDK バージョン 2.30.0 以降を使用している場合は、パイプライン オプション --sdk_container_image を使用します。

より古いバージョンの SDK の場合は、パイプライン オプション --worker_harness_container_image を使用します。

カスタム コンテナは Dataflow Runner v2 でのみサポートされます。バッチ Python パイプラインを起動する場合は、--experiments=use_runner_v2 フラグを設定します。ストリーミング Python パイプラインを起動する場合、ストリーミング Python パイプラインはデフォルトで Runner v2 を使用するため、テストを指定する必要はありません。

次の例では、カスタム コンテナを使用してバッチ wordcount サンプルを起動する方法を示しています。

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=$IMAGE_URI"

Python

Apache Beam SDK for Python バージョン 2.30.0 以降の使用:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=$IMAGE_URI

次のように置き換えます。

  • INPUT_FILE: サンプルの実行時に Dataflow によって読み取られる Cloud Storage 入力ファイルのパス。
  • OUTPUT_FILE: サンプル パイプラインによって書き込まれる Cloud Storage 出力ファイルのパス。ここには文字数が書き込まれます。
  • PROJECT_ID: Google Cloud プロジェクトの ID。
  • REGION: Dataflow ジョブをデプロイするリージョン エンドポイント。
  • TEMP_LOCATION: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。
  • DISK_SIZE_GB: (省略可)コンテナのサイズが大きい場合は、ディスク容量が不足しないように、デフォルトのブートディスク サイズを増やすことを検討してください。
  • $IMAGE_URI: カスタム コンテナ イメージの URI。変数がスコープ内にある場合は、前の手順で作成したシェル変数 $IMAGE_URI を使用できます。

トラブルシューティング

このセクションでは、Dataflow でカスタム コンテナの使用時に発生する問題のトラブルシューティングについて説明します。主に、コンテナまたはワーカーが起動しない場合について説明します。ワーカーが起動可能で、処理が進行している場合は、パイプラインのトラブルシューティングの一般的なガイダンスに従ってください。

サポートに問い合わせる前に、コンテナ イメージに関連する問題でないことを確認してください。

  • 手順に沿ってコンテナ イメージをローカルでテストします
  • ジョブのログまたはワーカーのログでエラーを検索します。見つかったエラーを一般的なエラーのガイダンスと比較します。
  • パイプラインの起動に使用している Apache Beam SDK のバージョンと言語バージョンが、カスタム コンテナ イメージの SDK と一致していることを確認します。
  • Java を使用している場合は、パイプラインの起動に使用する Java メジャー バージョンがコンテナ イメージにインストールされているバージョンと一致していることを確認します。
  • Python を使用している場合は、パイプラインの起動に使用する Python のメジャー バージョンとマイナー バージョンがコンテナ イメージにインストールされているバージョンと一致していることと、イメージに競合する依存関係がないことを確認します。pip check を実行して確認できます。

カスタム コンテナに関連するワーカーログの確認

コンテナ関連のエラー メッセージを含む Dataflow ワーカーログは、ログ エクスプローラで確認できます。

  1. ログ名を選択します。カスタム コンテナの起動エラーは、ほとんどが次のいずれかで確認できます。

    • dataflow.googleapis.com/docker
    • dataflow.googleapis.com/kubelet
    • dataflow.googleapis.com/worker-startup
  2. Dataflow Step リソースを選択して、job_id を指定します。

特に、「Error Syncing pod...」というログメッセージがある場合は、一般的なエラー ガイダンスに従ってください。ログ エクスプローラで次のクエリを使用すると、Dataflow ワーカーログでこれらのログメッセージを探すことができます。

resource.type="dataflow_step" AND jsonPayload.message:("$IMAGE_URI") AND severity="ERROR"

一般的な問題

コンテナ イメージを pull できないため、ジョブでエラーが発生したか、失敗した

Dataflow ワーカーがカスタム コンテナ イメージにアクセスできる必要があります。無効な URL、認証情報の構成の誤り、ネットワーク アクセスがないことが原因でワーカーがイメージを pull できない場合、ワーカーは起動できません。

処理がまったく開始されておらず、複数のワーカーが順次開始できないバッチジョブの場合、Dataflow でジョブが失敗します。それ以外の場合、Dataflow はエラーをログに記録しますが、長時間実行ジョブの状態破棄を回避するためのアクションは実行しません。

一般的なエラー ガイダンスに従って、この問題の原因と解決方法を確認してください。

ワーカーが起動していないか、処理が進行していない

なんらかのエラーで SDK コンテナが起動できない場合、Dataflow は永続的なエラーか致命的なエラーかを判断できず、失敗するたびにワーカーの再起動を試行することがあります。

Worker logs で特定のエラーを探し、一般的なエラーのガイダンスを確認してください。

明らかなエラーがないにもかかわらず、dataflow.googleapis.com/kubelet[topologymanager] RemoveContainer INFO レベルのログがある場合は、カスタム コンテナ イメージが早期に終了し、長時間実行ワーカー SDK プロセスを開始していないことを示しています。これは、カスタム ENTRYPOINT がデフォルトの起動スクリプト /opt/apache/beam/boot を開始しなかった場合、またはこのスクリプトに引数を適切に渡していない場合に発生することがあります。カスタム ENTRYPOINT の変更をご覧ください。