クラシック テンプレートの実行

テンプレートの作成とステージングが完了したら、Google Cloud Console、REST API または gcloud コマンドライン ツールを使用してテンプレートを実行します。Dataflow テンプレート ジョブは、App Engine スタンダード環境、Cloud Functions、その他の制限された環境など、さまざまな環境からデプロイできます。

注: テンプレート化されたパイプラインの実行は、テンプレート ファイルに加えて、テンプレート作成時にステージングおよび参照されたファイルにも依存します。ステージングされたファイルの移動や削除を行うと、パイプライン ジョブは失敗します。

Cloud Console の使用

Cloud Console を使用して、Google 提供およびカスタムの Dataflow テンプレートを実行できます。

Google 提供のテンプレート

Google 提供のテンプレートを実行するには:

  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから、実行する Google 提供のテンプレートを選択します。
  6. WordCount テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。
  8. 表示されるパラメータ フィールドにパラメータ値を入力します。Google 提供のテンプレートを使用する場合は、[追加のパラメータ] セクションへの入力は不要です。
  9. [ジョブを実行] をクリックします。

カスタム テンプレート

カスタム テンプレートを実行するには:

  1. Cloud Console の [Dataflow] ページに移動します。
  2. Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから [カスタム テンプレート] を選択します。
  6. カスタム テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。
  8. テンプレートの Cloud Storage パスのフィールドに、テンプレート ファイルへの Cloud Storage のパスを入力します。
  9. テンプレートにパラメータが必要な場合は、[追加のパラメータ] セクションにある [項目を追加] をクリックします。パラメータの [名前] と [] に入力します。必要なパラメータについて、この手順を繰り返します。
  10. [ジョブを実行] をクリックします。

REST API の使用

REST API リクエストでテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

使用できるパラメータの詳細については、projects.templates.launch の REST API リファレンスをご覧ください。

例 1: カスタム テンプレート バッチジョブの作成

この projects.templates.launch リクエストの例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートから、バッチジョブを作成します。リクエストが成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更する必要があります。

  • YOUR_PROJECT_ID を実際のプロジェクト ID に置き換えます。
  • LOCATION は、任意の Dataflow リージョン エンドポイントに置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters を Key-Value ペアのリストに設定します。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "outputFile": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

例 2: カスタム テンプレート ストリーミング ジョブの作成

この projects.templates.launch リクエストの例では、Pub/Sub トピックから読み取って BigQuery テーブルに書き込むテンプレートから、ストリーミング ジョブを作成します。BigQuery テーブルは、正しいスキーマですでに存在している必要があります。成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更する必要があります。

  • YOUR_PROJECT_ID を実際のプロジェクト ID に置き換えます。
  • LOCATION は、任意の Dataflow リージョン エンドポイントに置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_DATASET を BigQuery データセットに置き換え、YOUR_TABLE_NAME を BigQuery テーブル名に置き換えます。
  • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters を Key-Value ペアのリストに設定します。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
            "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

例 3: カスタム テンプレート ストリーミング ジョブの更新

この例の projects.templates.launch リクエストは、テンプレート ストリーミング ジョブを更新する方法を示しています。

  1. 例 2: カスタム テンプレート ストリーミング ジョブの作成の手順を行って、ストリーミング テンプレート ジョブを開始します。
  2. 次のように値を変更して、下記の HTTP POST リクエストを送信します。
    • YOUR_PROJECT_ID を実際のプロジェクト ID に置き換えます。
    • LOCATION は、任意の Dataflow リージョン エンドポイントに置き換えます。
    • JOB_NAME は、任意のジョブ名に置き換えます。
    • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
    • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
    • YOUR_DATASET を BigQuery データセットに置き換え、YOUR_TABLE_NAME を BigQuery テーブル名に置き換えます。
    • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
    • parameters を Key-Value ペアのリストに設定します。
    • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
                "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                "zone": "us-central1-f"
            }
            "update": true
        }
    
  3. Dataflow モニタリング インターフェースにアクセスして、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。

Google API クライアント ライブラリの使用

Google API クライアント ライブラリを使用すると、Dataflow REST API を簡単に呼び出せます。このサンプル スクリプトでは、Python 用 Google API クライアント ライブラリを使用しています。

この例では、次の変数を設定する必要があります。

  • project: プロジェクト ID に設定します。
  • job: 選択した一意のジョブ名に設定します。
  • template: テンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters: テンプレート パラメータを含む辞書に設定します。
from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
    projectId=project,
    gcsPath=template,
    body={
        'jobName': job,
        'parameters': parameters,
    }
)

response = request.execute()

使用可能なオプションについては、Cloud Dataflow REST API リファレンスの projects.templates.launch メソッドをご覧ください。

gcloud の使用

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

gcloud コマンドライン ツールでは、gcloud dataflow jobs run コマンドを使用してカスタムまたは Google 提供のいずれかのテンプレートを実行できます。Google 提供のテンプレートの実行例については、Google 提供のテンプレート ページをご覧ください。

次のカスタム テンプレートの例では、次の値を設定します。

  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • --gcs-location フラグを使用する必要があります。--gcs-location をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • --parameters を、ジョブに渡すパラメータのカンマ区切りのリストに設定します。カンマと値の間にスペースは使用できません。

例 1: カスタム テンプレートとバッチジョブ

この例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートから、バッチジョブを作成します。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,outputFile=gs://YOUR_BUCKET_NAME/output/my_output

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

例 2: カスタム テンプレートとストリーミング ジョブ

この例では、Pub/Sub トピックから読み取りを行い、BigQuery テーブルに書き込みを行うテンプレートを使用して、ストリーミング ジョブを作成します。BigQuery テーブルは、正しいスキーマですでに存在している必要があります。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

gcloud dataflow jobs run コマンドのフラグの一覧については、gcloud ツールのリファレンスをご覧ください。

モニタリングとトラブルシューティング

Dataflow モニタリング インターフェースを使用して Dataflow ジョブをモニタリングできます。ジョブが失敗した場合は、パイプラインのトラブルシューティング ガイドで、トラブルシューティングのヒント、デバッグの方法、一般的なエラーのカタログをご確認いただけます。