テンプレートの実行

テンプレートの作成とステージングが完了したら、Google Cloud Platform Console、REST API または gcloud コマンドライン ツールを使用してテンプレートを実行します。Cloud Dataflow テンプレート ジョブは、App Engine スタンダード環境、Cloud Functions、その他の制限された環境など、さまざまな環境からデプロイできます。

注: テンプレート化されたパイプラインの実行は、テンプレート ファイルに加えて、テンプレート作成時にステージングおよび参照されたファイルにも依存します。ステージングされたファイルの移動や削除を行うと、パイプライン ジョブは失敗します。

GCP Console を使用する

GCP Console を使用して、Google 提供およびカスタムの Cloud Dataflow テンプレートを実行できます。

Google 提供のテンプレート

Google 提供のテンプレートを実行するには:

  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから、実行する Google 提供のテンプレートを選択します。
  6. WordCount テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  8. 表示されるパラメータ フィールドにパラメータ値を入力します。Google 提供のテンプレートを使用する場合は、[追加のパラメータ] セクションへの入力は不要です。
  9. [ジョブを実行] をクリックします。

カスタム テンプレート

カスタム テンプレートを実行するには:

  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから [カスタム テンプレート] を選択します。
  6. カスタム テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  8. テンプレートの Cloud Storage パスのフィールドに、テンプレート ファイルへの Cloud Storage のパスを入力します。
  9. テンプレートにパラメータが必要な場合は、[追加のパラメータ] セクションにある [項目を追加] をクリックします。パラメータの [名前] と [] に入力します。必要なパラメータについて、この手順を繰り返します。
  10. [ジョブを実行] をクリックします。

REST API の使用

REST API リクエストでテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

使用できるパラメータの詳細については、projects.templates.launch の REST API リファレンスをご覧ください。

例 1: カスタム テンプレートとバッチジョブ

この projects.templates.launch リクエストの例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートから、バッチジョブを作成します。リクエストが成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更する必要があります。

  • [YOUR_PROJECT_ID] を実際のプロジェクト ID に置き換えます。
  • [JOB_NAME] を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • [YOUR_BUCKET_NAME] を Cloud Storage バケットの名前に置き換えます。
  • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters を Key-Value ペアのリストに設定します。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://[YOUR_BUCKET_NAME]/templates/TemplateName
    {
        "jobName": "[JOB_NAME]",
        "parameters": {
            "inputFile" : "gs://[YOUR_BUCKET_NAME]/input/my_input.txt",
            "outputFile": "gs://[YOUR_BUCKET_NAME]/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://[YOUR_BUCKET_NAME]/temp",
            "zone": "us-central1-f"
        }
    }

例 2: カスタム テンプレートとストリーミング ジョブ

この projects.templates.launch リクエストの例では、Cloud Pub/Sub トピックから読み取って BigQuery テーブルに書き込むテンプレートから、ストリーミング ジョブを作成します。BigQuery テーブルは、正しいスキーマですでに存在している必要があります。成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更する必要があります。

  • [YOUR_PROJECT_ID] を実際のプロジェクト ID に置き換えます。
  • [JOB_NAME] を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • [YOUR_BUCKET_NAME] を Cloud Storage バケットの名前に置き換えます。
  • [YOUR_TOPIC_NAME] を Cloud Pub/Sub トピック名に置き換えます。
  • [YOUR_DATASET] を BigQuery データセットに置き換え、[YOUR_TABLE_NAME] を BigQuery テーブル名に置き換えます。
  • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters を Key-Value ペアのリストに設定します。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://[YOUR_BUCKET_NAME]/templates/TemplateName
    {
        "jobName": "[JOB_NAME]",
        "parameters": {
            "topic": "projects/[YOUR_PROJECT_ID]/topics/[YOUR_TOPIC_NAME]",
            "table": "[YOUR_PROJECT_ID]:[YOUR_DATASET].[YOUR_TABLE_NAME]"
        },
        "environment": {
            "tempLocation": "gs://[YOUR_BUCKET_NAME]/temp",
            "zone": "us-central1-f"
        }
    }

Google API クライアント ライブラリの使用

Google API クライアント ライブラリを使用すると、Cloud Dataflow REST API を簡単に呼び出せます。このサンプル スクリプトでは、Python 用 Google API クライアント ライブラリを使用しています。

この例では、次の変数を設定する必要があります。

  • project: プロジェクト ID に設定します。
  • job: 選択した一意のジョブ名に設定します。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • template: テンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters: テンプレート パラメータを含む辞書に設定します。
from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
    projectId=project,
    gcsPath=template,
    body={
        'jobName': job,
        'parameters': parameters,
    }
)

response = request.execute()

使用可能なオプションについては、Cloud Dataflow REST API リファレンスの projects.templates.launch メソッドをご覧ください。

gcloud の使用

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

gcloud コマンドライン ツールでは、gcloud dataflow jobs run コマンドを使用してカスタムまたは Google 提供のいずれかのテンプレートを実行できます。Google 提供のテンプレートの実行例については、Google 提供のテンプレート ページをご覧ください。

次のカスタム テンプレートの例では、次の値を設定します。

  • [JOB_NAME] を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • [YOUR_BUCKET_NAME] を Cloud Storage バケットの名前に置き換えます。
  • --gcs-location フラグを使用する必要があります。--gcs-location をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • --parameters を、ジョブに渡すパラメータのカンマ区切りのリストに設定します。カンマと値の間にスペースは使用できません。

例 1: カスタム テンプレートとバッチジョブ

この例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートから、バッチジョブを作成します。

    gcloud dataflow jobs run [JOB_NAME] \
        --gcs-location gs://[YOUR_BUCKET_NAME]/templates/MyTemplate \
        --parameters inputFile=gs://[YOUR_BUCKET_NAME]/input/my_input.txt,outputFile=gs://[YOUR_BUCKET_NAME]/output/my_output

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: [YOUR_PROJECT_ID]
    type: JOB_TYPE_BATCH

例 2: カスタム テンプレートとストリーミング ジョブ

この例では、Cloud Pub/Sub トピックから読み取って BigQuery テーブルに書き込むテンプレートから、ストリーミング ジョブを作成します。BigQuery テーブルは、正しいスキーマですでに存在している必要があります。

    gcloud dataflow jobs run [JOB_NAME] \
        --gcs-location gs://[YOUR_BUCKET_NAME]/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: [YOUR_PROJECT_ID]
    type: JOB_TYPE_STREAMING

gcloud dataflow jobs run コマンドのフラグの一覧については、gcloud ツールのリファレンスをご覧ください。

モニタリングとトラブルシューティング

Dataflow モニタリング インターフェースを使用して Cloud Dataflow ジョブをモニタリングできます。ジョブが失敗した場合は、パイプラインのトラブルシューティング ガイドで、トラブルシューティングのヒント、デバッグの方法、一般的なエラーのカタログをご確認いただけます。

パイプラインの更新

Cloud Dataflow テンプレートを使用する既存のパイプラインの更新は現在サポートされていません。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。