Flex テンプレートをビルドして実行する


Dataflow Flex テンプレートを使用すると、Dataflow パイプラインをデプロイ用にパッケージ化できます。このチュートリアルでは、Dataflow Flex テンプレートを作成し、そのテンプレートを使用して Dataflow ジョブを実行する方法について説明します。

目標

  • Dataflow Flex テンプレートを作成する。
  • テンプレートを使用して Dataflow ジョブを実行する。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, Resource Manager, Artifact Registry, and Cloud Build API:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, Resource Manager, Artifact Registry, and Cloud Build API:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.writer
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    次のように置き換えます。

環境を準備する

SDK と開発環境のすべての要件をインストールします。

Java

  1. Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールしますJAVA_HOME 環境変数が設定され、ご使用の JDK インストールを指していることを確認します。

  2. ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven のダウンロードとインストールを行います。

Python

Apache Beam SDK for Python をインストールします。

Go

Go のダウンロードとインストール ガイドに沿って、ご使用のオペレーティング システム用の Go をダウンロードしてインストールします。Apache Beam でサポートされている Go ランタイム環境については、Apache Beam ランタイム サポートをご覧ください。

コードサンプルをダウンロードします。

Java

  1. java-docs-samples リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    
  2. このチュートリアルのコードサンプルに移動します。

    cd java-docs-samples/dataflow/flex-templates/getting_started
    
  3. Java プロジェクトから Uber JAR ファイルをビルドします。

    mvn clean package

    この Uber JAR ファイルにはすべての依存関係が含まれます。このファイルは、他のライブラリで外部依存関係のないスタンドアロン アプリケーションとして実行できます。

Python

  1. python-docs-samples リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. このチュートリアルのコードサンプルに移動します。

    cd python-docs-samples/dataflow/flex-templates/getting_started
    

Go

  1. golang-samples リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git
    
  2. このチュートリアルのコードサンプルに移動します。

    cd golang-samples/dataflow/flex-templates/wordcount
    
  3. Go バイナリをコンパイルします。

    GOOS=linux GOARCH=amd64 go build -o wordcount .

Cloud Storage バケットを作成する

Cloud Storage バケットの作成には、gcloud storage buckets create コマンドを使用します。

gcloud storage buckets create gs://BUCKET_NAME

BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。Cloud Storage バケット名はグローバルに一意であり、バケットの命名要件を満たしている必要があります。

Artifact Registry リポジトリを作成する

テンプレートの Docker コンテナ イメージを push する Artifact Registry リポジトリを作成します。

  1. gcloud artifacts repositories create コマンドを使用して、新しい Artifact Registry リポジトリを作成します。

    gcloud artifacts repositories create REPOSITORY \
     --repository-format=docker \
     --location=LOCATION
    

    次のように置き換えます。

    • REPOSITORY: リポジトリの名前。リポジトリ名は、プロジェクト内のリポジトリのロケーションごとに一意である必要があります。
    • LOCATION: リポジトリのリージョンまたはマルチリージョンのロケーション
  2. gcloud auth configure-docker コマンドを使用して、Artifact Registry のリクエストを認証するように Docker を構成します。このコマンドを実行すると、Docker の構成が更新され、Artifact Registry に接続してイメージを push できるようになります。

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

Flex テンプレートでは、非公開レジストリに保存されているイメージも使用できます。詳細については、非公開レジストリのイメージを使用するをご覧ください。

Flex テンプレートを作成する

このステップでは、gcloud dataflow flex-template build コマンドを使用して Flex テンプレートを作成します。

Flex テンプレートは、次の要素で構成されています。

  • パイプライン コードをパッケージ化する Docker コンテナ イメージ。Java と Python の Flex テンプレートの場合、gcloud dataflow flex-template build コマンドを実行すると、Docker イメージがビルドされ、Artifact Registry リポジトリに push されます。
  • テンプレート仕様ファイル。このファイルは、コンテナ イメージの場所とテンプレートに関するメタデータ(パイプライン パラメータなど)が記載されている JSON ドキュメントです。

GitHub のサンプル リポジトリには、metadata.json ファイルが含まれています。

メタデータを追加してテンプレートを拡張するには、独自の metadata.json ファイルを作成します。

Java

gcloud dataflow flex-template build gs://BUCKET_NAME/getting_started-java.json \
 --image-gcr-path "LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/getting-started-java:latest" \
 --sdk-language "JAVA" \
 --flex-template-base-image JAVA11 \
 --metadata-file "metadata.json" \
 --jar "target/flex-template-getting-started-1.0.jar" \
 --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="com.example.dataflow.FlexTemplateGettingStarted"

次のように置き換えます。

  • BUCKET_NAME: 前に作成した Cloud Storage バケットの名前
  • LOCATION: ロケーション
  • PROJECT_ID: Google Cloud プロジェクト ID
  • REPOSITORY: 前に作成した Artifact Registry リポジトリの名前

Python

gcloud dataflow flex-template build gs://BUCKET_NAME/getting_started-py.json \
 --image-gcr-path "LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/getting-started-python:latest" \
 --sdk-language "PYTHON" \
 --flex-template-base-image "PYTHON3" \
 --metadata-file "metadata.json" \
 --py-path "." \
 --env "FLEX_TEMPLATE_PYTHON_PY_FILE=getting_started.py" \
 --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt"

次のように置き換えます。

  • BUCKET_NAME: 前に作成した Cloud Storage バケットの名前
  • LOCATION: ロケーション
  • PROJECT_ID: Google Cloud プロジェクト ID
  • REPOSITORY: 前に作成した Artifact Registry リポジトリの名前

Go

  1. gcloud builds submit コマンドを使用して、Cloud Build で Dockerfile を使用して Docker イメージをビルドします。このコマンドは、ファイルをビルドして Artifact Registry リポジトリに push します。

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/wordcount-go:latest .
    

    次のように置き換えます。

    • LOCATION: ロケーション
    • PROJECT_ID: Google Cloud プロジェクト ID
    • REPOSITORY: 前に作成した Artifact Registry リポジトリの名前
  2. gcloud dataflow flex-template build コマンドを使用して、Cloud Storage バケット内に wordcount-go.json という名前の Flex テンプレートを作成します。

    gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/wordcount-go.json \
      --image "LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/wordcount-go:latest" \
      --sdk-language "GO" \
      --metadata-file "metadata.json"

    BUCKET_NAME は、前に作成した Cloud Storage バケットの名前に置き換えます。

Flex テンプレートを実行する

このステップでは、テンプレートを使用して Dataflow ジョブを実行します。

Java

  1. gcloud dataflow flex-template run コマンドを使用して、Flex テンプレートを使用する Dataflow ジョブを実行します。

    gcloud dataflow flex-template run "getting-started-`date +%Y%m%d-%H%M%S`" \
     --template-file-gcs-location "gs://BUCKET_NAME/getting_started-java.json" \
     --parameters output="gs://BUCKET_NAME/output-" \
     --region "REGION"

    次のように置き換えます。

    • BUCKET_NAME: 前に作成した Cloud Storage バケットの名前
    • REGION: リージョン
  2. Google Cloud コンソールで Dataflow ジョブのステータスを確認するには、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

ジョブが正常に実行されると、Cloud Storage バケット内の gs://BUCKET_NAME/output--00000-of-00001.txt という名前のファイルに出力が書き込まれます。

Python

  1. gcloud dataflow flex-template run コマンドを使用して、Flex テンプレートを使用する Dataflow ジョブを実行します。

    gcloud dataflow flex-template run "getting-started-`date +%Y%m%d-%H%M%S`" \
     --template-file-gcs-location "gs://BUCKET_NAME/getting_started-py.json" \
     --parameters output="gs://BUCKET_NAME/output-" \
     --region "REGION"
    

    次のように置き換えます。

    • BUCKET_NAME: 前に作成した Cloud Storage バケットの名前
    • REGION: リージョン
  2. Google Cloud コンソールで Dataflow ジョブのステータスを確認するには、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

ジョブが正常に実行されると、Cloud Storage バケット内の gs://BUCKET_NAME/output--00000-of-00001.txt という名前のファイルに出力が書き込まれます。

Go

  1. gcloud dataflow flex-template run コマンドを使用して、Flex テンプレートを使用する Dataflow ジョブを実行します。

    gcloud dataflow flex-template run "wordcount-go-`date +%Y%m%d-%H%M%S`" \
     --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/wordcount-go.json" \
     --parameters output="gs://BUCKET_NAME/samples/dataflow/templates/counts.txt" \
     --region "REGION"
    

    次のように置き換えます。

    • BUCKET_NAME: 前に作成した Cloud Storage バケットの名前
    • REGION: リージョン
  2. Google Cloud コンソールで Dataflow ジョブのステータスを確認するには、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

ジョブが正常に実行されると、Cloud Storage バケット内の gs://BUCKET_NAME/samples/dataflow/templates/count.txt という名前のファイルに出力が書き込まれます。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

リソースを個別に削除する

  1. Cloud Storage バケットとバケット内のすべてのオブジェクトを削除します。
    gcloud storage rm gs://BUCKET_NAME --recursive
  2. Artifact Registry リポジトリを削除します。
    gcloud artifacts repositories delete REPOSITORY \
        --location=LOCATION
  3. Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.writer
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  4. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  5. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ