Dataflow コマンドライン インターフェースを使用する

Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow コマンドライン インターフェースを使用して Dataflow ジョブに関する情報を取得できます。Dataflow コマンドライン インターフェースは、Google Cloud CLI のコマンドライン ツールの一部です。

Google Cloud コンソールを使用して Dataflow ジョブの表示と操作を行う場合は、Dataflow モニタリング インターフェースを使用してください。

Dataflow コマンドライン コンポーネントのインストール

ローカル ターミナルから Dataflow コマンドライン インターフェースを使用するには、Google Cloud CLI をインストールして構成します。

Cloud Shell では、Dataflow コマンドライン インターフェースが自動的に使用可能になります。

コマンドの実行

使用可能なコマンドを実行して、Dataflow コマンドライン インターフェースを操作します。使用可能な Dataflow コマンドの一覧を表示するには、シェルまたはターミナルで次のコマンドを入力します。

  gcloud dataflow --help

出力でわかるように、Dataflow コマンドには flex-templatejobssnapshotssql の 4 つのグループがあります。

Flex Template コマンド

flex-template サブコマンド グループを使用すると、Dataflow フレックス テンプレートを操作できます。次のオペレーションがサポートされています。

  • build: 指定したパラメータからフレックス テンプレート ファイルを作成します。
  • run: 指定されたパスからジョブを実行します。

テンプレートを実行するには、テンプレート仕様ファイルを作成して Cloud Storage バケットに保存する必要があります。テンプレート仕様ファイルには、SDK 情報やメタデータなど、ジョブの実行に必要なすべての情報が含まれています。また、metadata.json ファイルには、テンプレートの名前、説明、入力パラメータなどに関する情報が含まれます。テンプレート仕様ファイルを作成した後、Java または Python を使用してフレックス テンプレートを作成できます。

Google Cloud CLI を使用して Flex テンプレートを作成して実行する方法については、チュートリアル Flex テンプレートの作成と実行をご覧ください。

jobs コマンド

jobs サブコマンド グループを使用すると、プロジェクトの Dataflow ジョブを操作できます。次のオペレーションがサポートされています。

  • cancel: コマンドライン引数に一致するすべてのジョブをキャンセルします。
  • describe: Get API から生成されたジョブ オブジェクトを出力します。
  • drain: コマンドライン引数に一致するすべてのジョブをドレインします。
  • list: 特定のプロジェクトに含まれるジョブの一覧を取得します。リージョンでフィルタすることもできます。
  • run: 指定されたパスからジョブを実行します。
  • show: 特定のジョブに関する簡単な説明を表示します。

プロジェクト内の Dataflow ジョブの一覧を取得するには、シェルまたはターミナルで次のコマンドを実行します。

gcloud dataflow jobs list

このコマンドは、現在のジョブのリストを返します。出力例は次のとおりです。

  ID                                        NAME                                    TYPE   CREATION_TIME        STATE   REGION
  2015-06-03_16_39_22-4020553808241078833   wordcount-janedoe-0603233849            Batch  2015-06-03 16:39:22  Done    us-central1
  2015-06-03_16_38_28-4363652261786938862   wordcount-johndoe-0603233820            Batch  2015-06-03 16:38:28  Done    us-central1
  2015-05-21_16_24_11-17823098268333533078  bigquerytornadoes-johndoe-0521232402    Batch  2015-05-21 16:24:11  Done    europe-west1
  2015-05-21_13_38_06-16409850040969261121  bigquerytornadoes-johndoe-0521203801    Batch  2015-05-21 13:38:06  Done    us-central1
  2015-05-21_13_17_18-18349574013243942260  bigquerytornadoes-johndoe-0521201710    Batch  2015-05-21 13:17:18  Done    europe-west1
  2015-05-21_12_49_37-9791290545307959963   wordcount-johndoe-0521194928            Batch  2015-05-21 12:49:37  Done    us-central1
  2015-05-20_15_54_51-15905022415025455887  wordcount-johndoe-0520225444            Batch  2015-05-20 15:54:51  Failed  us-central1
  2015-05-20_15_47_02-14774624590029708464  wordcount-johndoe-0520224637            Batch  2015-05-20 15:47:02  Done    us-central1

ジョブごとに表示されるジョブ ID を使用して describe コマンドを実行すると、ジョブに関する詳細な情報を表示できます。

gcloud dataflow jobs describe JOB_ID

JOB_ID は、プロジェクトにある Dataflow ジョブのいずれかのジョブ ID に置き換えます。

たとえば、ジョブ ID 2015-02-09_11_39_40-15635991037808002875 でコマンドを実行すると、出力は次のようになります。

createTime: '2015-02-09T19:39:41.140Z'
currentState: JOB_STATE_DONE
currentStateTime: '2015-02-09T19:56:39.510Z'
id: 2015-02-09_11_39_40-15635991037808002875
name: tfidf-bchambers-0209193926
projectId: google.com:clouddfe
type: JOB_TYPE_BATCH

結果を JSON 形式にフォーマットする場合は、--format=json オプションを指定してコマンドを実行します。

gcloud --format=json dataflow jobs describe JOB_ID

JOB_ID は、プロジェクトにある Dataflow ジョブのいずれかのジョブ ID に置き換えます。

次のサンプル出力は JSON 形式になっています。

{
  "createTime": "2015-02-09T19:39:41.140Z",
  "currentState": "JOB_STATE_DONE",
  "currentStateTime": "2015-02-09T19:56:39.510Z",
  "id": "2015-02-09_11_39_40-15635991037808002875",
  "name": "tfidf-bchambers-0209193926",
  "projectId": "google.com:clouddfe",
  "type": "JOB_TYPE_BATCH"
}

snapshots コマンド

snapshots サブコマンド グループを使用すると、Dataflow のスナップショットを操作できます。次のオペレーションがサポートされています。

  • create: Dataflow ジョブのスナップショットを作成します。
  • delete: Dataflow のスナップショットを削除します。
  • describe: Dataflow のスナップショットの詳細を取得します。
  • list: 指定したリージョンのプロジェクト内の Dataflow スナップショットを一覧表示します。必要に応じてジョブ ID でフィルタリングできます。

Dataflow でスナップショットを使用する方法については、Dataflow スナップショットの使用をご覧ください。

SQL コマンド

sql サブコマンド グループを使用すると、Dataflow SQL を操作できます。gcloud Dataflow sql query コマンドは、ユーザーが指定した SQL クエリを受け入れて、Dataflow で実行します。

たとえば、BigQuery データセットから読み取って別の BigQuery データセットに書き込む Dataflow ジョブで簡単な SQL クエリを実行するには、次のコマンドをシェルまたはターミナルで実行します。

gcloud dataflow sql query 'SELECT word FROM
bigquery.table.PROJECT_ID.input_dataset.input_table
where count > 3'
    --job-name=JOB_NAME \
    --region=us-west1 \
    --bigquery-dataset=OUTPUT_DATASET \
    --bigquery-table=OUTPUT_TABLE

次のように置き換えます。

  • PROJECT_ID: 実際のプロジェクトのグローバルに一意の名前
  • JOB_NAME: 実際の Dataflow ジョブの名前
  • OUTPUT_DATASET: 出力データセットの名前
  • OUTPUT_TABLE: 出力テーブルの名前

Dataflow SQL ジョブが開始するまでに数分かかることがあります。ジョブは作成後に更新できません。Dataflow SQL ジョブは自動スケーリングを使用します。Dataflow は実行モード(バッチまたはストリーミング)を自動的に選択します。Dataflow SQL ジョブのこの動作は制御できません。Dataflow SQL ジョブを停止するには、cancel コマンドを使用します。drain で Dataflow SQL ジョブを停止することはできません。

Dataflow で SQL コマンドを使用する方法について詳しくは、Dataflow SQL リファレンスgcloud Dataflow sql query のドキュメントをご覧ください。

リージョンでのコマンドの使用

Dataflow コマンドライン インターフェースは、gcloud CLI バージョン 176 以降でリージョンをサポートしています。ジョブを管理するリージョンを指定するには、--region オプションを任意のコマンドで使用します。

たとえば、gcloud dataflow jobs list はすべてのリージョンのジョブを一覧表示しますが、gcloud dataflow jobs list --region=europe-west1 は、europe-west1 で管理されるジョブだけを一覧表示します。