Google 提供の Dataflow ストリーミング テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。

これらの Dataflow テンプレートは、データのインポート、エクスポート、バックアップ、復元、API の一括オペレーションなど、大規模なデータタスクの解決に役立ちます。専用の開発環境を使用しなくても、これらの処理を実行できます。テンプレートは Apache Beam 上に構築され、Dataflow を使用してデータを変換します。

テンプレートに関する一般的な情報については、Dataflow テンプレートをご覧ください。Google が提供するテンプレートのリストについては、Google 提供のテンプレートの概要をご覧ください。

このガイドでは、ストリーミング テンプレートについて説明します。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージの data フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data フィールドの値が {"k1":"v1", "k2":"v2"} としてフォーマットされたメッセージは、文字列データ型の k1k2 という 2 つの列を持つ BigQuery テーブルに挿入できます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、OUTPUT_TABLE_SPEC_error_records が代わりに使用されます。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

Pub/Sub Subscription to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Subscription to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージの data フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data フィールドの値が {"k1":"v1", "k2":"v2"} としてフォーマットされたメッセージは、文字列データ型の k1k2 という 2 つの列を持つ BigQuery テーブルに挿入できます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

Pub/Sub Topic to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

このパイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 未処理の Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。

テンプレートのパラメータ

パラメータ 説明
schemaPath Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table>createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。
writeDisposition (省略可)BigQuery の WriteDisposition。例: WRITE_APPENDWRITE_EMPTY、または WRITE_TRUNCATE。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVER。デフォルト: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Avro to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック

Pub/Sub Proto to BigQuery

Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

データを変換するための JavaScript ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。

このパイプラインの要件:

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 出力 Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。
  • BigQuery テーブルが存在する場合は、createDisposition 値にかかわらず、proto データに一致するスキーマが必要です。

テンプレートのパラメータ

パラメータ 説明
protoSchemaPath 自己完結型の proto スキーマ ファイルの Cloud Storage の場所。例: gs://path/to/my/file.pbこのファイルは、protoc コマンドの --descriptor_set_out フラグを使用して生成できます。--include_imports フラグを使用すると、ファイルが自己完結することを保証できます。
fullMessageName proto メッセージの完全な名前。たとえば、package.name.MessageName の場合、package.name は、java_package ステートメントではなく、package ステートメントに指定された値です。
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: my-project:my_dataset.my_tablecreateDisposition の指定によっては、入力スキーマ ファイルを使用して出力テーブルが自動的に作成されます。
preserveProtoFieldNames (省略可)JSON で元の Proto フィールド名を保持するには truefalse: 標準の JSON 名を使用します。たとえば、false と指定すると field_namefieldName に変更されます。(デフォルト: false
bigQueryTableSchemaPath (省略可)BigQuery スキーマパスへの Cloud Storage パス。例: gs://path/to/my/schema.json指定されていない場合、スキーマは Proto スキーマから推測されます。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
udfOutputTopic (省略可)UDF エラーを格納する Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> です。指定されていない場合、UDF エラーは outputTopic と同じトピックに送信されます。
writeDisposition (省略可)BigQuery WriteDispositionWRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE など。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVERデフォルト: CREATE_IF_NEEDED

Pub/Sub Proto to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Proto to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。

このパイプラインの要件:

  • 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
  • コピー元の Pub/Sub サブスクリプションが pull サブスクリプションである必要があります。
  • 実行前にコピー先の Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Pub/Sub to Pub/Sub テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Pub/Sub template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。

Pub/Sub to Splunk

Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。このテンプレートの最も一般的な使用例は Splunk へのログのエクスポートです。基盤となるワークフローの例については、Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイするをご覧ください。

Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub の未処理のトピックに転送されます。

HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に、Pub/Sub に未処理のトピックが存在している必要があります。
  • Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
  • Splunk の HEC トークンが生成済みで、利用可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
token (省略可)Splunk の HEC 認証トークン。tokenSource が PLAINTEXT または KMS に設定されている場合は必須です。
url Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
batchCount (省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。
parallelism (省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。
disableCertificateValidation (省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。true の場合、証明書は検証されず(すべての証明書が信頼されます)、rootCaCertificatePath パラメータは無視されます。
includePubsubMessage (省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。
tokenSource トークンのソース。PLAINTEXT、KMS、SECRET_MANAGER のいずれかです。Secret Manager を使用する場合は、このパラメータを指定する必要があります。tokenSource が KMS に設定されている場合、tokenKMSEncryptionKey と暗号化された token を指定する必要がありますtokenSource を SECRET_MANAGER に設定する場合は、tokenSecretId を指定する必要がありますtokenSource が PLAINTEXT に設定されている場合は、token を指定する必要があります
tokenKMSEncryptionKey (省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。tokenSource が KMS に設定されている場合、このパラメータを指定する必要がありますCloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります
tokenSecretId (省略可)トークンの Secret Manager シークレット ID。tokenSource が SECRET_MANAGER に設定されている場合、このパラメータを指定する必要がありますprojects/<project-id>/secrets/<secret-name>/versions/<secret-version> 形式で指定する必要があります。
rootCaCertificatePath (省略可)Cloud Storage のルート CA 証明書の完全 URL。例: gs://mybucket/mycerts/privateCA.crtCloud Storage で提供される証明書は DER でエンコードする必要があります。この証明書はバイナリまたは印刷可能(Base64)エンコードで提供できます。証明書を Base64 エンコードで提供する場合は、証明書を -----BEGIN CERTIFICATE----- と -----END CERTIFICATE----- で囲む必要があります。このパラメータを指定すると、このプライベート CA 証明書ファイルが取得され、Splunk HEC エンドポイントの SSL 証明書の検証用に Dataflow ワーカーのトラストストアに追加されます。このパラメータを指定しない場合は、デフォルトのトラストストアが使用されます。
enableBatchLogs (省略可)Splunk に書き込まれたバッチに対してログを有効にするかどうかを指定します。デフォルト: true
enableGzipHttpCompression (省略可)Splunk HEC に送信される HTTP リクエストを圧縮するかどうか(gzip コンテンツをエンコードするかどうか)を指定します。デフォルト: true

Pub/Sub to Splunk テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Splunk template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage のルート CA 証明書へのパス(例: gs://your-bucket/privateCA.crt)。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage のルート CA 証明書へのパス(例: gs://your-bucket/privateCA.crt)。

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。

このパイプラインの要件:

  • パイプラインの実行前に入力 Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを使用するために購読する Cloud Pub/Sub トピック。トピック名は、projects/<project-id>/topics/<topic-name> の形式にする必要があります。
outputDirectory 出力 Avro ファイルがアーカイブされる出力ディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイル用のディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (省略可)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字 SN の繰り返しシーケンスで指定します。例: SSS-NNNこれらはそれぞれシャード番号やシャードの総数に置き換えられます。このパラメータを指定しない場合、デフォルトのテンプレートの形式は W-P-SS-of-NN です。

Pub/Sub to Cloud Storage Avro テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Avro Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート

Pub/Sub Topic to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub トピックからレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Pub/Sub Topic to Text Files on Cloud Storage テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前

Pub/Sub Topic or Subscription to Text Files on Cloud Storage

Pub/Sub Topic or Subscription to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックまたはサブスクリプションが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 このパラメータが指定されている場合は、inputSubscription を指定しないでください。
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。サブスクリプション名は projects/<project-id>/subscription/<subscription-name> の形式にする必要があります。このパラメータが指定されている場合は、inputTopic を指定しないでください。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。
windowDuration (省略可)ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。

Pub/Sub Topic or Subscription to Text Files on Cloud Storage テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • BUCKET_NAME: Cloud Storage バケットの名前

Pub/Sub to MongoDB

Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。

このパイプラインの要件:

  • Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
  • MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription Pub/Sub サブスクリプションの名前。例: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017
database コレクションを格納する MongoDB のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
deadletterTable エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
batchSize (省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト単位)。デフォルト: 5242880
maxConnectionIdleTime (省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000
sslEnabled (省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true
ignoreSSLCertificate (省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true
withOrdered (省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true
withSSLInvalidHostNameAllowed (省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true

Pub/Sub to MongoDB テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to MongoDB template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、ユーザー定義関数(UDF)を実行して、それらをドキュメントとして Elasticsearch に書き込むストリーミング パイプラインです。Dataflow テンプレートは、Elasticsearch のデータ ストリーム機能を使用して、複数のインデックスにまたがる時系列データを保存し、リクエストに対して単一の名前付きリソースを提供します。データ ストリームは、ログ、指標、トレースに適しています。また、継続的に生成され、Pub/Sub に保存されるデータにも適しています。

このパイプラインの要件

  • 参照元の Pub/Sub サブスクリプションが存在し、メッセージが有効な JSON 形式でエンコードされている必要があります。
  • GCP インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上で一般公開されている到達可能な Elasticsearch ホスト。詳細については、Elastic のための Google Cloud 統合をご覧ください。
  • エラー出力用の Pub/Sub トピック。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 使用する Pub/Sub サブスクリプション。名前は projects/<project-id>/subscriptions/<subscription-name> の形式にします。
connectionUrl https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。
apiKey 認証に使用される Base64 エンコードの API キー。
errorOutputTopic 失敗したレコードを公開するための Pub/Sub 出力トピック。projects/<project-id>/topics/<topic-name> の形式にします。
dataset (省略可)Pub/Sub 経由で送信されるログのタイプ。すぐに使えるダッシュボードが用意されています。既知のログタイプ値は、audit、vpcflow、firewall です。デフォルト: pubsub
namespace (省略可)環境(開発、生産、QA)、チーム、戦略事業部門などの任意のグループ。デフォルト: default
batchSize (省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト数)。デフォルト: 5242880(5 MB)。
maxRetryAttempts (省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: no retries
maxRetryDuration (省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: no retries
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
propertyAsIndex (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none
propertyAsId (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none
javaScriptIndexFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIndexFnName (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIdFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIdFnName (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptTypeFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptTypeFnName (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIsDeleteFnGcsPath (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
javaScriptIsDeleteFnName (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
usePartialUpdate (省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false
bulkInsertMethod (省略可)INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE

Pub/Sub to Elasticsearch テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Elasticsearch template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • ERROR_OUTPUT_TOPIC: エラー出力用の Pub/Sub トピック。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • CONNECTION_URL: Elasticsearch の URL
  • DATASET: ログタイプ
  • NAMESPACE: データセットの名前空間
  • APIKEY: 認証用に Base64 でエンコードされた API キー

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • ERROR_OUTPUT_TOPIC: エラー出力用の Pub/Sub トピック。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • CONNECTION_URL: Elasticsearch の URL
  • DATASET: ログタイプ
  • NAMESPACE: データセットの名前空間
  • APIKEY: 認証用に Base64 でエンコードされた API キー

Datastream to Cloud Spanner

Datastream to Cloud Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Cloud Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Cloud Spanner へのデータ移行を目的としています。

テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Cloud Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Cloud Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Cloud Spanner データベースに伝播されません。

データの整合性が保証されるのは、すべてのデータが Cloud Spanner に書き込まれ、移行が終了したときだけです。Cloud Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Cloud Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。

オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。

このパイプラインの要件:

  • ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
  • Datastream イベントが複製される Cloud Storage バケット。
  • 既存のテーブルを持つ Cloud Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、これはストリームのルートパスです。
streamName スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。
instanceId 変更が複製される Cloud Spanner インスタンス。
databaseId 変更が複製される Cloud Spanner データベース。
projectId Cloud Spanner プロジェクト ID。
deadLetterQueueDirectory (省略可)エラーキューの出力を保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。
inputFileFormat (省略可)Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
shadowTablePrefix (省略可)シャドウ テーブルの名前に使用される接頭辞。デフォルト: shadow_

Datastream to Cloud Spanner テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Datastream to Spanner template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

Text Files on Cloud Storage to BigQuery(Stream)

Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)で変換して、結果を BigQuery に追加するストリーミング パイプラインです。

パイプラインは無期限に実行され、ドレインではなくキャンセルによって手動で終了させる必要があります。これは、分割可能な DoFn で、ドレインをサポートしていない Watch 変換を使用しているためです。

このパイプラインの要件:

  • BigQuery で出力テーブルのスキーマを記述する JSON ファイルを作成します。

    fields というタイトルになっているトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。例:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath : 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

Text Files on Cloud Storage to Pub/Sub(Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

パイプラインは無期限に実行され、「ドレイン」ではなく「キャンセル」によって手動で終了させる必要があります。これは「Watch」変換を使用しているためで、この変換は「SplittableDoFn」であり、ドレインはサポートしていません。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。たとえば、gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv です。
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

Text Files on Cloud Storage to Pub/Sub(Stream)テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Pub/Sub (Stream) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント(例: us-central1
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント(例: us-central1
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、匿名化されたデータを指定された BigQuery テーブルへ書き込むストリーミング パイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて検査が不要な構造化データを匿名化できます。また、このテンプレートでは、匿名化テンプレートの場所用のリージョンパスはサポートされていません。グローバルパスのみがサポートされています。

このパイプラインの要件:

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。 例: my_dlp_api_project
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

次のように置き換えます。

  • DLP_API_PROJECT_ID: Cloud DLP API プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_DATA: 入力ファイルのパス
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify テンプレート番号
  • DATASET_NAME: BigQuery データセット名
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect テンプレート番号
  • BATCH_SIZE_VALUE: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • DLP_API_PROJECT_ID: Cloud DLP API プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_DATA: 入力ファイルのパス
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify テンプレート番号
  • DATASET_NAME: BigQuery データセット名
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect テンプレート番号
  • BATCH_SIZE_VALUE: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。

このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。

このパイプラインの要件:

  • Debezium コネクタがデプロイされる必要があります。
  • Pub/Sub メッセージは Beam Row でシリアル化される必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscriptions 読み込まれる Pub/Sub 入力サブスクリプションのカンマ区切りのリスト。<subscription>,<subscription>, ... の形式で指定します。
changeLogDataset ステージング テーブルを格納する BigQuery データセット。<my-dataset> の形式で指定します。
replicaDataset レプリカ テーブルを格納する BigQuery データセットの場所。<my-dataset> の形式で指定します。
updateFrequencySecs (省略可)MySQL データベースを複製する BigQuery テーブルを、パイプラインが更新する間隔。

Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery テンプレートの実行

このテンプレートを実行するには、次の手順を行います。

  1. ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
  2. v2/cdc-parent ディレクトリに移動します。
  3. Debezium コネクタがデプロイされていることを確認します。
  4. Maven を使用して、Dataflow テンプレートを実行します。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
    • SUBSCRIPTIONS: Pub/Sub サブスクリプション名のカンマ区切りリスト。
    • CHANGELOG_DATASET: 変更履歴データの BigQuery データセット。
    • REPLICA_DATASET: レプリカ テーブルの BigQuery データセット。

Apache Kafka to BigQuery

Apache Kafka to BigQuery テンプレートは、Apache Kafka からテキストデータを取り込み、ユーザー定義関数(UDF)を実行して、結果のレコードを BigQuery に出力するストリーミング パイプラインです。データの変換、UDF の実行、出力テーブルへの挿入で発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。実行前にエラーテーブルが存在しない場合は、作成されます。

このパイプラインの要件

  • 出力 BigQuery テーブルが存在している必要があります。
  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
  • Apache Kafka トピックが存在していて、有効な JSON 形式でメッセージがエンコードされている必要があります。

テンプレートのパラメータ

パラメータ 説明
outputTableSpec Apache Kafka メッセージを書き込む BigQuery 出力テーブルの場所。my-project:dataset.table の形式で指定します。
inputTopics 読み取る Apache Kafka 入力トピック(カンマ区切りのリスト)。例: messages
bootstrapServers 実行中の Apache Kafka ブローカー サーバーのホストアドレス(カンマ区切りのリスト)。各ホストアドレスは 35.70.252.199:9092 の形式で指定します。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
outputDeadletterTable (省略可)出力テーブルに到達できなかったメッセージの BigQuery テーブル。my-project:dataset.my-deadletter-table の形式で指定します。テーブルが存在していない場合、パイプラインの実行時に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Apache Kafka to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Kafka to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • BIGQUERY_TABLE: BigQuery テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、こちらの手順でカンマをエスケープしてください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • BIGQUERY_TABLE: BigQuery テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、こちらの手順でカンマをエスケープしてください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に沿って操作してください。

詳細については、Dataflow で Kafka から BigQuery にデータを書き込むをご覧ください。

Datastream to BigQuery(Stream)

Datastream to BigQuery テンプレートは、Datastream データを読み取り、BigQuery に複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、時間でパーティション分割された BigQuery ステージング テーブルに複製します。レプリケーションに続いて、このテンプレートは BigQuery で MERGE を実行し、すべての変更データ キャプチャ(CDC)の変更をソーステーブルのレプリカに upsert します。

このテンプレートは、レプリケーションによって管理される BigQuery テーブルの作成と更新を処理します。データ定義言語(DDL)が必要な場合、Datastream に対するコールバックによってソーステーブル スキーマが抽出され、BigQuery のデータ型に変換されます。サポートされているオペレーションは次のとおりです。

  • データが挿入されると、新しいテーブルが作成される。
  • null の初期値を持つ BigQuery テーブルに新しい列が追加される。
  • ドロップされた列は BigQuery で無視され、将来の値が null になる。
  • 名前が変更された列が BigQuery に新しい列として追加される。
  • 型の変更が BigQuery に伝播されない。

このパイプラインの要件:

  • データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • BigQuery の宛先データセットが作成され、Compute Engine サービス アカウントにこれらのデータセットへの管理者権限が付与されている。
  • 作成される宛先レプリカ テーブルでは、ソーステーブルに主キーが必要です。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。
gcsPubSubSubscription Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
outputStagingDatasetTemplate ステージング テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset}_log)。
outputDatasetTemplate レプリカ テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset})。
deadLetterQueueDirectory 処理されなかった理由とともに、未処理のメッセージを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
outputStagingTableNameTemplate (省略可)ステージング テーブルの名前のテンプレート。デフォルトは {_metadata_table}_log です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table}_log をおすすめします。
outputTableNameTemplate (省略可)レプリカ テーブルの名前のテンプレート。デフォルトは {_metadata_table} です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table} をおすすめします。
outputProjectId (省略可)データを出力する BigQuery データセットのプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
streamName (省略可)スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。
mergeFrequencyMinutes (省略可)特定のテーブルのマージ間隔の分数。デフォルトは 5 です。
dlqRetryMinutes (省略可)デッドレター キュー(DLQ)の再試行間隔の分数。デフォルトは 10 です。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

Datastream to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Datastream to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
  • BIGQUERY_DATASET: BigQuery データセット名。
  • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
  • BIGQUERY_DATASET: BigQuery データセット名。
  • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log

Datastream to MySQL or PostgreSQL(ストリーミング)

Datastream to SQL テンプレートは、Datastream データを読み取り、そのデータを MySQL データベースまたは PostgreSQL データベースに複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、このデータを SQL レプリカ テーブルに複製します。

このテンプレートはデータ定義言語(DDL)をサポートしていないため、すべてのテーブルがすでにデータベースに存在することを想定しています。レプリケーションでは、Dataflow ステートフル変換を使用して最新でないデータをフィルタし、順不同データの整合性を確保します。たとえば、ある行のより新しいバージョンの読み取りがすでに行われている場合、その行の遅れて到着したバージョンは無視されます。データ操作言語(DML)を実行することは、ソースデータとターゲット データを完全に複製するための最善の方法です。実行される DML ステートメントは、次のルールに従います。

  • 主キーが存在する場合、insert と update のオペレーションでは upsert 構文を使用します(例: INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 主キーが存在する場合、削除 DML として delete が複製されます。
  • 主キーが存在しない場合、insert と update の両方のオペレーションがテーブルに挿入されます。
  • 主キーが存在しない場合、delete は無視されます。

Oracle から Postgres へのユーティリティを使用する場合で、主キーが存在しない場合は、SQL に ROWID を主キーとして追加します。

このパイプラインの要件:

  • データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • PostgreSQL データベースが必要なスキーマでシードされている。
  • Dataflow ワーカーと PostgreSQL 間のネットワーク アクセスが設定されている。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。
gcsPubSubSubscription Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
databaseHost 接続先の SQL ホスト。
databaseUser レプリケーションのすべてのテーブルへの書き込みに必要なすべての権限を持つ SQL ユーザー。
databasePassword 特定の SQL ユーザーのパスワード。
databasePort (省略可)接続する SQL データベース ポート。デフォルトは 5432 です。
databaseName (省略可)接続する SQL データベースの名前。デフォルトは postgres です。
streamName (省略可)スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。

Datastream to SQL テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Datastream to SQL template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: SQL ホスト IP。
  • DATABASE_USER: SQL ユーザー。
  • DATABASE_PASSWORD: SQL パスワード。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: SQL ホスト IP。
  • DATABASE_USER: SQL ユーザー。
  • DATABASE_PASSWORD: SQL パスワード。

Pub/Sub to Java Database Connectivity(JDBC)

Pub/Sub to Java Database Connectivity(JDBC)テンプレートは、既存の Cloud Pub/Sub サブスクリプションから JSON 文字列としてデータを取り込み、結果のレコードを JDBC に書き込むストリーミング パイプラインです。

このパイプラインの要件:

  • パイプラインを実行する前に Cloud Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に JDBC ソースが存在している必要があります。
  • パイプラインを実行する前に Cloud Pub/Sub 出力デッドレター トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
driverClassName JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver
connectionUrl JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledbBase64 でエンコードされ、Cloud KMS 鍵で暗号化される文字列として渡すことができます。
driverJars JDBC ドライバのカンマ区切りの Cloud Storage パス。例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username (省略可)JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
password (省略可)JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
connectionProperties (省略可)JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8
statement データベースに対して実行するステートメント。このステートメントには、テーブルの列名を任意の順序で指定する必要があります。指定した列名の値のみが JSON から読み取られ、ステートメントに追加されます。例: INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例:projects/<project-id>/topics/<topic-name>
KMSEncryptionKey (省略可)ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。
extraFilesToStage ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

Pub/Sub to Java Database Connectivity(JDBC)テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to JDBC template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SQL_STATEMENT: データベースに対して実行される SQL ステートメント
  • INPUT_SUBSCRIPTION: 読み取り元の Pub/Sub 入力サブスクリプション
  • OUTPUT_DEADLETTER_TOPIC: 配信不能メッセージを転送するための Pub/Sub トピック
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SQL_STATEMENT: データベースに対して実行される SQL ステートメント
  • INPUT_SUBSCRIPTION: 読み取り元の Pub/Sub 入力サブスクリプション
  • OUTPUT_DEADLETTER_TOPIC: 配信不能メッセージを転送するための Pub/Sub トピック
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

Cloud Spanner change streams to Cloud Storage

Cloud Spanner change streams to Cloud Storage テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Cloud Storage バケットに書き込むストリーミング パイプラインです。

パイプラインは、タイムスタンプに基づいて Spanner の変更ストリーム レコードをウィンドウにグループ化します。各ウィンドウは、このテンプレートで構成できる期間を表します。タイムスタンプがウィンドウに属するすべてのレコードが、ウィンドウ内に存在することが保証されます。遅延は発生しません。複数の出力シャードを定義することもできます。パイプラインはシャードごと、ウィンドウごとに 1 つの Cloud Storage 出力ファイルを作成します。出力ファイル内では、レコードは順序付けされていません。出力ファイルは、ユーザーの構成に応じて JSON 形式または AVRO 形式で記述できます。

Cloud Spanner インスタンスまたは Cloud Storage バケットと同じリージョンから Dataflow ジョブを実行することで、ネットワークのレイテンシやネットワーク転送の費用を最小限に抑えることができます。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。Dataflow リージョン エンドポイントの詳細をご覧ください。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前に、Cloud Spanner インスタンスが存在している。
  • パイプラインの実行前に、Cloud Spanner データベースが存在している。
  • パイプラインの実行前に、Cloud Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に、Cloud Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に、Cloud Spanner の変更ストリームが存在している。
  • パイプラインの実行前に、Cloud Storage 出力バケットが存在している。

テンプレートのパラメータ

パラメータ 説明
spannerInstanceId 変更ストリーム データの読み取り元の Cloud Spanner インスタンス ID。
spannerDatabase 変更ストリーム データの読み取り元の Cloud Spanner データベース。
spannerMetadataInstanceId 変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner インスタンス ID。
spannerMetadataDatabase 変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner データベース。
spannerChangeStreamName 読み取り元の Cloud Spanner 変更ストリームの名前。
gcsOutputDirectory 変更ストリームのファイルの場所は、Cloud Storage に「gs://${BUCKET}/${ROOT_PATH}/」の形式で出力されます。
outputFilenamePrefix (省略可)書き込み先ファイルのファイル名の接頭辞。ファイルの接頭辞はデフォルトで「output」に設定されています。
spannerProjectId (省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
startTimestamp (省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
endTimestamp (省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、現在よりも先の無限の時間です。
outputFileFormat (省略可)出力 Cloud Storage ファイルの形式。使用可能な形式は TEXT、AVRO です。デフォルトは AVRO です。
windowDuration (省略可)ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。
rpcPriority (省略可)Cloud Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高)
numShards (省略可)書き込み時に生成される出力シャードの最大数。デフォルト値は 20 です。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。
spannerMetadataTableName (省略可)使用する Cloud Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フロー中に Cloud Spanner 変更ストリームのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は指定しないでください。

Cloud Spanner change streams to Cloud Storage テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to Google Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • GCS_OUTPUT_DIRECTORY: 変更ストリームの出力用のファイルの場所

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • GCS_OUTPUT_DIRECTORY: 変更ストリームの出力用のファイルの場所

Cloud Spanner change streams to BigQuery

Cloud Spanner change streams to BigQuery テンプレートは、Cloud Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して BigQuery テーブルに書き込むストリーミング パイプラインです。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、Cloud Spanner テーブルの対応する追跡対象列と、「ignoreFields」オプションによって明示的に無視されない追加のメタデータ列(以下のリストのメタデータ フィールドの説明を参照)が含まれている必要があります。新しい各 BigQuery 行には、変更レコードのタイムスタンプで、Cloud Spanner テーブルの対応する行から変更ストリームが監視するすべての列が含まれます。

変更ストリームの監視対象列はすべて、Cloud Spanner トランザクションによって変更されたかどうかにかかわらず、BigQuery の各テーブル行に含まれます。監視されていない列は BigQuery 行に含まれません。Dataflow のウォーターマークよりも小さい Cloud Spanner の変更は、BigQuery テーブルに正常に適用されるか、再試行のためにデッドレター キューに保存されます。BigQuery の行は、元の Cloud Spanner commit タイムスタンプの順序と比較して順不同で挿入されます。

BigQuery テーブルに次のメタデータ フィールドが追加されます。

  • _metadata_spanner_mod_type: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_table_name: Cloud Spanner テーブル名。これは、コネクタのメタデータ テーブル名ではありません。
  • _metadata_spanner_commit_timestamp: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_server_transaction_id: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_record_sequence: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_is_last_record_in_transaction_in_partition: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_number_of_records_in_transaction: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_number_of_partitions_in_transaction: 変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_big_query_commit_timestamp: 行が BigQuery に挿入されたときの commit タイムスタンプ。

注:

  • このテンプレートでは、スキーマの変更が Cloud Spanner から BigQuery に伝播されません。Cloud Spanner でスキーマの変更を行うとパイプラインが壊れる可能性があります。スキーマの変更後にパイプラインの再作成が必要になることがあります。
  • OLD_AND_NEW_VALUESNEW_VALUES 値のキャプチャ タイプで、データ変更レコードに UPDATE 変更が含まれている場合、テンプレートはデータ変更レコードの commit タイムスタンプで Cloud Spanner に対してステイル読み取りを実行し、変更されていない監視対象の列を取得する必要があります。ステイル読み取りに対してデータベース「version_retention_period」が正しく構成されていることを確認してください。NEW_ROW 値のキャプチャ タイプの場合、データ変更レコードは UPDATE で更新されない列を含む新しい行をすべてキャプチャするため、テンプレートのほうが効率的です。テンプレートでは、ステイル読み取りを行う必要がありません。
  • Cloud Spanner インスタンスまたは BigQuery テーブルと同じリージョンから Dataflow ジョブを実行すると、ネットワークのレイテンシとネットワーク転送の費用を最小限に抑えることができます。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。Dataflow リージョン エンドポイントの詳細をご覧ください。
  • このテンプレートは有効な Cloud Spanner のデータ型をすべてサポートしていますが、BigQuery の型が Cloud Spanner の型より精度が高い場合、変換中に精度が失われる可能性があります。具体的には次のとおりです。
    • Cloud Spanner JSON 型では、オブジェクトのメンバーの順序は辞書順に並べ替えられますが、BigQuery JSON 型はこのような保証はありません。
    • Cloud Spanner はナノ秒単位の TIMESTAMP 型をサポートしますが、BigQuery はマイクロ秒 TIMESTAMP 型のみをサポートします。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前に、Cloud Spanner インスタンスが存在している。
  • パイプラインの実行前に、Cloud Spanner データベースが存在している。
  • パイプラインの実行前に、Cloud Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に、Cloud Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に、Cloud Spanner の変更ストリームが存在している。
  • パイプラインの実行前に BigQuery データセットが存在している。

テンプレートのパラメータ

パラメータ 説明
spannerInstanceId 変更ストリームの読み取り元の Cloud Spanner インスタンス。
spannerDatabase 変更ストリームの読み取り元の Cloud Spanner データベース。
spannerMetadataInstanceId 変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner インスタンス。
spannerMetadataDatabase 変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner データベース。
spannerChangeStreamName 読み取り元の Cloud Spanner 変更ストリームの名前。
bigQueryDataSet 変更ストリーム出力の BigQuery データセット
spannerProjectId (省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
spannerMetadataTableName (省略可)使用する Cloud Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フロー中に Cloud Spanner 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は指定しないでください。
rpcPriority (省略可)Cloud Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高)
startTimestamp (省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
endTimestamp (省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、現在よりも先の無限の時間です。
bigQueryProjectId (省略可)BigQuery プロジェクト。デフォルトは Dataflow ジョブのプロジェクトです。
bigQueryChangelogTableNameTemplate (省略可)BigQuery 変更履歴テーブルの名前のテンプレート。デフォルトは {_metadata_spanner_table_name}_changelog です。
deadLetterQueueDirectory (省略可)処理されなかった理由とともに、未処理のレコードを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
dlqRetryMinutes (省略可)デッドレター キューの再試行間隔の分数。デフォルトは 10 です。
ignoreFields (省略可)無視するフィールドのカンマ区切りリスト(大文字と小文字が区別されます)。監視対象のテーブルのフィールドや、パイプラインによって追加されたメタデータ フィールドなどが考えられます。無視されるフィールドは BigQuery に挿入されません。

Cloud Spanner change streams to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

Cloud Spanner change streams to Pub/Sub

Cloud Spanner change streams to Pub/Sub テンプレートは、Cloud Dataflow データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Pub/Sub トピックに書き込むストリーミング パイプラインです。

データを新しい Pub/Sub トピックに出力するには、まずトピックを作成する必要があります。作成後、Pub/Sub はサブスクリプションを自動的に生成して新しいトピックに接続します。存在しない Pub/Sub トピックにデータを出力しようとすると、Dataflow パイプラインは例外をスローし、パイプラインは継続的に接続しようとして停止します。

必要な Pub/Sub トピックがすでに存在する場合、そのトピックにデータを出力できます。

詳細については、変更ストリームについてDataflow で変更ストリームの接続を構築する変更ストリームのベスト プラクティスをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前に、Cloud Spanner インスタンスが存在している。
  • パイプラインの実行前に、Cloud Spanner データベースが存在している。
  • パイプラインの実行前に、Cloud Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に、Cloud Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に、Cloud Spanner の変更ストリームが存在している。
  • Pub/Sub トピックは、パイプラインを実行する前に存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
spannerInstanceId 変更ストリームの読み取り元の Cloud Spanner インスタンス。
spannerDatabase 変更ストリームの読み取り元の Cloud Spanner データベース。
spannerMetadataInstanceId 変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner インスタンス。
spannerMetadataDatabase 変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner データベース。
spannerChangeStreamName 読み取り元の Cloud Spanner 変更ストリームの名前。
pubsubTopic 変更ストリーム出力の Pub/Sub トピック。
spannerProjectId (省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
spannerMetadataTableName (省略可)使用する Cloud Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フローの変更中に Cloud Spanner によってストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。このパラメータは他のケースでは使用しないでください。
rpcPriority (省略可)Cloud Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高)
startTimestamp (省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
endTimestamp (省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
outputFileFormat (省略可)出力の形式。出力は多くの PubsubMessage でラップされ、Pub/Sub トピックに送信されます。許可されている形式は JSON と AVRO です。デフォルトは JSON です。
pubsubAPI (省略可)パイプラインの実装に使用される Pub/Sub API。許可される API は pubsubionative_client です。秒間クエリ数(QPS)が少ない場合、native_client はレイテンシが低くなります。QPS が大きい場合、pubsubio はより優れた安定したパフォーマンスを提供します。デフォルト値は pubsubio です。

Cloud Spanner change streams to Pub/Sub テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to Pub/Sub template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

    gcloud beta dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • PUBSUB_TOPIC: 変更ストリーム出力の Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • PUBSUB_TOPIC: 変更ストリーム出力の Pub/Sub トピック

MongoDB to BigQuery(CDC)

MongoDB to BigQuery CDC(変更データ キャプチャ)テンプレートは、MongoDB 変更ストリームと連携するストリーミング パイプラインです。パイプラインは、MongoDB 変更ストリームを介して Pub/Sub に push された JSON レコードを読み取り、userOption パラメータで指定されたとおりに BigQuery に書き込みます。

このパイプラインの要件

  • ターゲット BigQuery データセットが存在すること。
  • ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。
  • MongoDB から Pub/Sub に変更を push する変更ストリームが実行されている必要があります。

テンプレートのパラメータ

パラメータ 説明
mongoDbUri MongoDB 接続 URI。形式は mongodb+srv://:@
database コレクションを読み取る MongoDB 内のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
outputTableSpec 書き込み先の BigQuery テーブル。例: bigquery-project:dataset.output_table
userOption FLATTEN または NONEFLATTEN: ドキュメントを第 1 レベルでフラット化します。NONE は、ドキュメント全体を JSON 文字列として格納します。
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。

MongoDB to BigQuery(CDC)テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the MongoDB to BigQuery (CDC) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION,\
inputTopic=INPUT_TOPIC

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
  • MONGO_DB_URI: MongoDB URI。
  • DATABASE: MongoDB データベース。
  • COLLECTION: MongoDB コレクション。
  • USER_OPTION: FLATTEN または NONE。
  • INPUT_TOPIC: Pub/Sub 入力トピック。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION",
          "inputTopic": "INPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
  • MONGO_DB_URI: MongoDB URI。
  • DATABASE: MongoDB データベース。
  • COLLECTION: MongoDB コレクション。
  • USER_OPTION: FLATTEN または NONE。
  • INPUT_TOPIC: Pub/Sub 入力トピック。