クラシック テンプレートの実行

Dataflow テンプレートを作成してステージングしたら、 Google Cloud コンソール、REST API、または Google Cloud CLI でテンプレートを実行します。Dataflow テンプレート ジョブは、App Engine スタンダード環境、Cloud Run functions、その他の制限された環境など、さまざまな環境からデプロイできます。

Google Cloud コンソールを使用する

Google Cloud コンソールを使用して、Google 提供およびカスタムの Dataflow テンプレートを実行できます。

Google 提供のテンプレート

Google 提供のテンプレートを実行するには:

  1. Google Cloud コンソールで、[Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Google Cloud コンソールの [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから、実行する Google 提供のテンプレートを選択します。
  6. WordCount テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。
  8. 表示されるパラメータ フィールドにパラメータ値を入力します。Google 提供のテンプレートを使用する場合、[追加のパラメータ] セクションは必要ありません。
  9. [ジョブを実行] をクリックします。

カスタム テンプレート

カスタム テンプレートを実行するには:

  1. Google Cloud コンソールで、[Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Google Cloud コンソールの [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから [カスタム テンプレート] を選択します。
  6. カスタム テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。
  8. テンプレートの Cloud Storage パスのフィールドに、テンプレート ファイルへの Cloud Storage のパスを入力します。
  9. テンプレートにパラメータが必要な場合は、[追加のパラメータ] セクションの [パラメータを追加] をクリックします。パラメータの [名前] と [] に入力します。必要なパラメータについて、この手順を繰り返します。
  10. [ジョブを実行] をクリックします。

REST API を使用する

REST API リクエストでテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

利用可能なパラメータの詳細については、REST API リファレンスの projects.locations.templates.launch をご覧ください。

カスタム テンプレート バッチジョブを作成する

この projects.locations.templates.launch リクエストの例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートを使用して、バッチジョブを作成します。リクエストが成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更します。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • LOCATION は、任意の Dataflow リージョンに置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters を Key-Value ペアのリストに設定します。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

カスタム テンプレート ストリーミング ジョブを作成する

この projects.locations.templates.launch リクエストの例では、Pub/Sub サブスクリプションから読み取って BigQuery テーブルに書き込むクラシック テンプレートを使用して、ストリーミング ジョブを作成します。Flex テンプレートを起動する場合は、代わりに projects.locations.flexTemplates.launch を使用します。サンプル テンプレートは、Google が提供するテンプレートです。カスタム テンプレートを参照するようにテンプレート内のパスを変更できます。Google 提供のテンプレートとカスタム テンプレートの起動にも同じロジックが使用されます。この例では、BigQuery テーブルは適切なスキーマを定義した上で、すでに存在している必要があります。成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更します。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • LOCATION は、任意の Dataflow リージョンに置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • GCS_PATH は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。
  • parameters を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。
    • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
    • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

カスタム テンプレート ストリーミング ジョブを更新する

この例の projects.locations.templates.launch リクエストは、テンプレート ストリーミング ジョブを更新する方法を示しています。Flex テンプレートを更新する場合は、代わりに projects.locations.flexTemplates.launch を使用します。

  1. 例 2: カスタム テンプレート ストリーミング ジョブの作成の手順を行って、ストリーミング テンプレート ジョブを開始します。
  2. 次のように値を変更して、下記の HTTP POST リクエストを送信します。
    • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • LOCATION は、更新するジョブの Dataflow リージョンに置き換えます。
    • JOB_NAME は、更新するジョブの正確な名前に置き換えます。
    • GCS_PATH は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。
    • parameters を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。
      • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
      • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
    • environment パラメータを使用して、マシンタイプなどの環境設定を変更します。この例では、デフォルトのマシンタイプよりもワーカーあたりのメモリと CPU が多い n2-highmem-2 マシンタイプを使用します。
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Dataflow モニタリング インターフェースにアクセスして、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。

Google API クライアント ライブラリを使用する

Google API クライアント ライブラリを使用すると、Dataflow REST API を簡単に呼び出せます。このサンプル スクリプトでは、Python 用 Google API クライアント ライブラリを使用しています。

この例では、次の変数を設定する必要があります。

  • project: プロジェクト ID に設定します。
  • job: 選択した一意のジョブ名に設定します。
  • template: テンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters: テンプレート パラメータを含む辞書に設定します。

リージョンを設定するには、location パラメータを含めます。

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

使用可能なオプションについて詳しくは、Dataflow REST API リファレンスの projects.locations.templates.launch メソッドをご覧ください。

gcloud CLI を使用する

gcloud CLI では、gcloud dataflow jobs run コマンドを使用してカスタム テンプレートまたは Google 提供のテンプレートを実行できます。Google 提供のテンプレートの実行例については、Google 提供のテンプレート ページをご覧ください。

次のカスタム テンプレートの例では、次の値を設定します。

  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • --gcs-location をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • --parameters を、ジョブに渡すパラメータのカンマ区切りのリストに設定します。カンマと値の間にスペースは使用できません。
  • プロジェクト メタデータに保存された SSH 認証鍵を VM が受け入れないようにするには、--additional-experiments=block_project_ssh_keys のように、block_project_ssh_keys サービス オプションを指定して additional-experiments フラグを使用します。

カスタム テンプレート バッチジョブを作成する

この例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートを使用して、バッチジョブを作成します。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

カスタム テンプレート ストリーミング ジョブを作成する

この例では、Pub/Sub トピックから読み取りを行い、BigQuery テーブルに書き込みを行うテンプレートを使用して、ストリーミング ジョブを作成します。BigQuery テーブルは、適切なスキーマを定義した上で、すでに存在している必要があります。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

gcloud dataflow jobs run コマンドのフラグの完全なリストについては、gcloud CLI リファレンスをご覧ください。

モニタリングとトラブルシューティング

Dataflow モニタリング インターフェースを使用して Dataflow ジョブをモニタリングできます。ジョブが失敗した場合は、パイプラインのトラブルシューティング ガイドで、トラブルシューティングのヒント、デバッグの方法、一般的なエラーのカタログをご確認いただけます。