Google 提供のストリーミング テンプレート

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、Dataflow テンプレートをご覧ください。Google が提供するテンプレートのリストについては、Google 提供のテンプレートの概要をご覧ください。

このガイドでは、ストリーミング テンプレートについて説明します。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージの data フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data フィールドの値が {"k1":"v1", "k2":"v2"} としてフォーマットされたメッセージは、文字列データ型の k1k2 という 2 つの列を持つ BigQuery テーブルに挿入できます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、OUTPUT_TABLE_SPEC_error_records が代わりに使用されます。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

Pub/Sub Subscription to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Subscription to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージの data フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data フィールドの値が {"k1":"v1", "k2":"v2"} としてフォーマットされたメッセージは、文字列データ型の k1k2 という 2 つの列を持つ BigQuery テーブルに挿入できます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

Pub/Sub Topic to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

このパイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 未処理の Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。

テンプレートのパラメータ

パラメータ 説明
schemaPath Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table>createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。
writeDisposition (省略可)BigQuery の WriteDisposition。例: WRITE_APPENDWRITE_EMPTY、または WRITE_TRUNCATE。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVER。デフォルト: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Avro to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック

Pub/Sub Proto to BigQuery

Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

データを変換するための JavaScript ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。

このパイプラインの要件:

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 出力 Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。
  • BigQuery テーブルが存在する場合は、createDisposition 値にかかわらず、proto データに一致するスキーマが必要です。

テンプレートのパラメータ

パラメータ 説明
protoSchemaPath 自己完結型の proto スキーマ ファイルの Cloud Storage の場所。例: gs://path/to/my/file.pbこのファイルは、protoc コマンドの --descriptor_set_out フラグを使用して生成できます。--include_imports フラグを使用すると、ファイルが自己完結することを保証できます。
fullMessageName proto メッセージの完全な名前。たとえば、package.name.MessageName の場合、package.name は、java_package ステートメントではなく、package ステートメントに指定された値です。
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: my-project:my_dataset.my_tablecreateDisposition の指定によっては、入力スキーマ ファイルを使用して出力テーブルが自動的に作成されます。
preserveProtoFieldNames (省略可)JSON で元の Proto フィールド名を保持するには truefalse: 標準の JSON 名を使用します。たとえば、false と指定すると field_namefieldName に変更されます。(デフォルト: false
bigQueryTableSchemaPath (省略可)BigQuery スキーマパスへの Cloud Storage パス。例: gs://path/to/my/schema.json指定されていない場合、スキーマは Proto スキーマから推測されます。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
udfOutputTopic (省略可)UDF エラーを格納する Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> です。指定されていない場合、UDF エラーは outputTopic と同じトピックに送信されます。
writeDisposition (省略可)BigQuery WriteDisposition。例: WRITE_APPENDWRITE_EMPTY、または WRITE_TRUNCATE。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVER。デフォルト: CREATE_IF_NEEDED

Pub/Sub Proto to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Proto to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。

このパイプラインの要件:

  • 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
  • 読み取り元の Pub/Sub サブスクリプションが push 配信を使用する必要があります。
  • 実行前にコピー先の Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Pub/Sub to Pub/Sub テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Pub/Sub template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。

Pub/Sub to Splunk

Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。このテンプレートの最も一般的な使用例は Splunk へのログのエクスポートです。基盤となるワークフローの例については、Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイするをご覧ください。

Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub の未処理のトピックに転送されます。

HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に、Pub/Sub に未処理のトピックが存在している必要があります。
  • Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
  • Splunk の HEC トークンが生成済みで、利用可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
token (省略可)Splunk の HEC 認証トークン。tokenSource が PLAINTEXT または KMS に設定されている場合は必須です。
url Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
batchCount (省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。
parallelism (省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。
disableCertificateValidation (省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。true の場合、証明書は検証されず(すべての証明書が信頼されます)、rootCaCertificatePath パラメータは無視されます。
includePubsubMessage (省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。
tokenSource トークンのソース。PLAINTEXT、KMS、SECRET_MANAGER のいずれかです。Secret Manager を使用する場合は、このパラメータを指定する必要があります。tokenSource が KMS に設定されている場合、tokenKMSEncryptionKey と暗号化された token を指定する必要がありますtokenSource を SECRET_MANAGER に設定する場合は、tokenSecretId を指定する必要がありますtokenSource が PLAINTEXT に設定されている場合は、token を指定する必要があります
tokenKMSEncryptionKey (省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。tokenSource が KMS に設定されている場合、このパラメータを指定する必要がありますCloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります
tokenSecretId (省略可)トークンの Secret Manager シークレット ID。tokenSource が SECRET_MANAGER に設定されている場合、このパラメータを指定する必要がありますprojects/<project-id>/secrets/<secret-name>/versions/<secret-version> 形式で指定する必要があります。
rootCaCertificatePath (省略可)Cloud Storage のルート CA 証明書の完全 URL。例: gs://mybucket/mycerts/privateCA.crtCloud Storage で提供される証明書は DER でエンコードする必要があります。この証明書はバイナリまたは印刷可能(Base64)エンコードで提供できます。証明書を Base64 エンコードで提供する場合は、証明書を -----BEGIN CERTIFICATE----- と -----END CERTIFICATE----- で囲む必要があります。このパラメータを指定すると、このプライベート CA 証明書ファイルが取得され、Splunk HEC エンドポイントの SSL 証明書の検証用に Dataflow ワーカーのトラストストアに追加されます。このパラメータを指定しない場合は、デフォルトのトラストストアが使用されます。
enableBatchLogs (省略可)Splunk に書き込まれたバッチに対してログを有効にするかどうかを指定します。デフォルト: true
enableGzipHttpCompression (省略可)Splunk HEC に送信される HTTP リクエストを圧縮するかどうか(gzip コンテンツをエンコードするかどうか)を指定します。デフォルト: true

Pub/Sub to Splunk テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Splunk template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage のルート CA 証明書へのパス(例: gs://your-bucket/privateCA.crt)。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage のルート CA 証明書へのパス(例: gs://your-bucket/privateCA.crt)。

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。

このパイプラインの要件:

  • パイプラインの実行前に入力 Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを使用するために購読する Cloud Pub/Sub トピック。トピック名は、projects/<project-id>/topics/<topic-name> の形式にする必要があります。
outputDirectory 出力 Avro ファイルがアーカイブされる出力ディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイル用のディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (省略可)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字 SN の繰り返しシーケンスで指定します。例: SSS-NNNこれらはそれぞれシャード番号やシャードの総数に置き換えられます。このパラメータを指定しない場合、デフォルトのテンプレートの形式は W-P-SS-of-NN です。

Pub/Sub to Cloud Storage Avro テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Avro Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート

Pub/Sub Topic to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub トピックからレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Pub/Sub Topic to Text Files on Cloud Storage テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前

Pub/Sub Topic or Subscription to Text Files on Cloud Storage

Pub/Sub Topic or Subscription to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックまたはサブスクリプションが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。このパラメータが指定されている場合は、inputSubscription を指定しないでください。
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。サブスクリプション名は projects/<project-id>/subscription/<subscription-name> の形式にする必要があります。このパラメータが指定されている場合は、inputTopic を指定しないでください。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。
windowDuration (省略可)ウィンドウ期間は、出力ディレクトリにデータを書き込む間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットが高い場合は、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。

Pub/Sub Topic or Subscription to Text Files on Cloud Storage テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • BUCKET_NAME: Cloud Storage バケットの名前

Pub/Sub to MongoDB

Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。

このパイプラインの要件:

  • Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
  • MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription Pub/Sub サブスクリプションの名前。例: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017
database コレクションを格納する MongoDB のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
deadletterTable エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
batchSize (省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト単位)。デフォルト: 5242880
maxConnectionIdleTime (省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000
sslEnabled (省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true
ignoreSSLCertificate (省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true
withOrdered (省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true
withSSLInvalidHostNameAllowed (省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true

Pub/Sub to MongoDB テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to MongoDB template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、ユーザー定義関数(UDF)を実行して、それらをドキュメントとして Elasticsearch に書き込むストリーミング パイプラインです。Dataflow テンプレートは、Elasticsearch のデータ ストリーム機能を使用して、複数のインデックスにまたがる時系列データを保存し、リクエストに対して単一の名前付きリソースを提供します。データ ストリームは、ログ、指標、トレースに適しています。また、継続的に生成され、Pub/Sub に保存されるデータにも適しています。

このパイプラインの要件

  • 参照元の Pub/Sub サブスクリプションが存在し、メッセージが有効な JSON 形式でエンコードされている必要があります。
  • GCP インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上で一般公開されている到達可能な Elasticsearch ホスト。詳細については、Elastic のための Google Cloud 統合をご覧ください。
  • エラー出力用の Pub/Sub トピック。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 使用する Pub/Sub サブスクリプション。名前は projects/<project-id>/subscriptions/<subscription-name> の形式にします。
connectionUrl https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。
apiKey 認証に使用される Base64 エンコードの API キー。
errorOutputTopic 失敗したレコードを公開するための Pub/Sub 出力トピック。projects/<project-id>/topics/<topic-name> の形式にします。
dataset (省略可)Pub/Sub 経由で送信されるログのタイプ。すぐに使えるダッシュボードが用意されています。既知のログタイプ値は、audit、vpcflow、firewall です。デフォルト: pubsub
namespace (省略可)環境(開発、生産、QA)、チーム、戦略事業部門などの任意のグループ。デフォルト: default
batchSize (省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト数)。デフォルト: 5242880(5 MB)。
maxRetryAttempts (省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: no retries
maxRetryDuration (省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: no retries
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
propertyAsIndex (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none
propertyAsId (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none
javaScriptIndexFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIndexFnName (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIdFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIdFnName (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptTypeFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptTypeFnName (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIsDeleteFnGcsPath (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
javaScriptIsDeleteFnName (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
usePartialUpdate (省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false
bulkInsertMethod (省略可)INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE

Pub/Sub to Elasticsearch テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Elasticsearch template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • ERROR_OUTPUT_TOPIC: エラー出力用の Pub/Sub トピック。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • CONNECTION_URL: Elasticsearch の URL
  • DATASET: ログタイプ
  • NAMESPACE: データセットの名前空間
  • APIKEY: 認証用に Base64 でエンコードされた API キー

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • ERROR_OUTPUT_TOPIC: エラー出力用の Pub/Sub トピック。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • CONNECTION_URL: Elasticsearch の URL
  • DATASET: ログタイプ
  • NAMESPACE: データセットの名前空間
  • APIKEY: 認証用に Base64 でエンコードされた API キー

Datastream to Cloud Spanner

Datastream to Cloud Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Cloud Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Cloud Spanner へのデータ移行を目的としています。

テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Cloud Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Cloud Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Cloud Spanner データベースに伝播されません。

データの整合性が保証されるのは、すべてのデータが Cloud Spanner に書き込まれ、移行が終了したときだけです。Cloud Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Cloud Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。

オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。

このパイプラインの要件:

  • ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
  • Datastream イベントが複製される Cloud Storage バケット。
  • 既存のテーブルを持つ Cloud Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、これはストリームのルートパスです。
streamName スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。
instanceId 変更が複製される Cloud Spanner インスタンス。
databaseId 変更が複製される Cloud Spanner データベース。
projectId Cloud Spanner プロジェクト ID。
deadLetterQueueDirectory (省略可)エラーキューの出力を保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。
inputFileFormat (省略可)Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
shadowTablePrefix (省略可)シャドウ テーブルの名前に使用される接頭辞。デフォルト: shadow_

Datastream to Cloud Spanner テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Datastream to Spanner template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

Text Files on Cloud Storage to BigQuery(Stream)

Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)で変換して、結果を BigQuery に追加するストリーミング パイプラインです。

パイプラインは無期限に実行され、ドレインではなくキャンセルによって手動で終了させる必要があります。これは、分割可能な DoFn で、ドレインをサポートしていない Watch 変換を使用しているためです。

このパイプラインの要件:

  • BigQuery で出力テーブルのスキーマを記述する JSON ファイルを作成します。

    fields というタイトルになっているトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。例:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath : 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

Text Files on Cloud Storage to Pub/Sub(Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

パイプラインは無期限に実行され、「ドレイン」ではなく「キャンセル」によって手動で終了させる必要があります。これは「Watch」変換を使用しているためで、この変換は「SplittableDoFn」であり、ドレインはサポートしていません。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプライ