Google 提供のテンプレートを使ってみる

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。開始するには、WordCount テンプレートを使用してください。他の Google 提供のテンプレートをご覧ください。

ストリーミング テンプレート - データを継続的に処理するためのテンプレート

バッチ テンプレート - データを一括で処理するためのテンプレート

ユーティリティ テンプレート:

WordCount

WordCount テンプレートは、Cloud Storage からテキストを読み取り、テキスト行を個別の単語にトークン化して各単語の出現頻度をカウントするバッチ パイプラインです。WordCount の詳細については、サンプルの WordCount パイプラインをご覧ください。

テンプレートのパラメータ

パラメータ 説明
inputFile Cloud Storage 入力ファイルのパス。
output Cloud Storage 出力ファイルのパスとプレフィックス。

WordCount テンプレートの実行

Console

Google Cloud Console を使用して実行します。
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the WordCount template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールを使用して実行します。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/latest/Word_Count

以下のコマンドを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Word_Count \
    --parameters \
    inputFile=gs://dataflow-samples/shakespeare/kinglear.txt,\
    output=gs://BUCKET_NAME/output/my_output

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用して実行します。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/latest/Word_Count

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Word_Count
{
    "jobName": "JOB_NAME",
    "parameters": {
       "inputFile" : "gs://dataflow-samples/shakespeare/kinglear.txt",
       "output": "gs://BUCKET_NAME/output/my_output"
    },
    "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。
  • BUCKET_NAME: Cloud Storage バケットの名前