Google 提供のストリーミング テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページでは、ストリーミング テンプレートについて説明します。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Pub/Sub Subscription to BigQuery テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Subscription to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Pub/Sub Topic to BigQuery テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

このパイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 未処理の Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。

テンプレートのパラメータ

パラメータ 説明
schemaPath Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table>createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。
writeDisposition (省略可)BigQuery の WriteDisposition。例: WRITE_APPENDWRITE_EMPTY、または WRITE_TRUNCATE。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVER。デフォルト: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Avro to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名(例: us-central1
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。

このパイプラインの要件:

  • 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
  • 実行前にコピー先の Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Pub/Sub to Pub/Sub テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Pub/Sub template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。このテンプレートの最も一般的な使用例は Splunk へのログのエクスポートです。基盤となるワークフローの例については、Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイするをご覧ください。

Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub の未処理のトピックに転送されます。

HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に、Pub/Sub に未処理のトピックが存在している必要があります。
  • Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
  • Splunk の HEC トークンが生成済みで、利用可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
token Splunk の HEC 認証トークン。この base64 エンコードされた文字列は、セキュリティ強化のために Cloud KMS 鍵で暗号化できます。
url Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (省略可)すべての JavaScript コードを含む Cloud Storage パス。例: gs://mybucket/mytransforms/*.js
javascriptTextTransformFunctionName (省略可)呼び出す JavaScript 関数の名前。たとえば、JavaScript 関数が function myTransform(inJson) { ...dostuff...} の場合、関数名は myTransform です。
batchCount (省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。
parallelism (省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。
disableCertificateValidation (省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。
includePubsubMessage (省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。
tokenKMSEncryptionKey (省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります

Pub/Sub to Splunk テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Splunk template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: JavaScript 関数名
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: JavaScript 関数名
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。

このパイプラインの要件:

  • パイプラインの実行前に入力 Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを使用するために購読する Cloud Pub/Sub トピック。トピック名は、projects/<project-id>/topics/<topic-name> の形式にする必要があります。
outputDirectory 出力 Avro ファイルがアーカイブされる出力ディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイル用のディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (省略可)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字 SN の繰り返しシーケンスで指定します。例: SSS-NNNこれらはそれぞれシャード番号やシャードの総数に置き換えられます。このパラメータを指定しない場合、デフォルトのテンプレートの形式は W-P-SS-of-NN です。

Pub/Sub to Cloud Storage Avro テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Avro Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Pub/Sub to Text Files on Cloud Storage テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。

このパイプラインの要件:

  • Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
  • MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription Pub/Sub サブスクリプションの名前。例: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017
database コレクションを格納する MongoDB のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
deadletterTable エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name
javascriptTextTransformGcsPath (省略可)UDF 変換を行う JavaScript ファイルがある Cloud Storage の場所。例: gs://mybucket/filename.json
javascriptTextTransformFunctionName (省略可)JavaScript UDF の名前。例: transform
batchSize (省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト単位)。デフォルト: 5242880
maxConnectionIdleTime (省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000
sslEnabled (省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true
ignoreSSLCertificate (省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true
withOrdered (省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true
withSSLInvalidHostNameAllowed (省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true

Pub/Sub to MongoDB テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to MongoDB template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • LOCATION: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、ユーザー定義関数(UDF)を実行して、それらをドキュメントとして Elasticsearch に書き込むストリーミング パイプラインです。Dataflow テンプレートは、Elasticsearch のデータ ストリーム機能を使用して、複数のインデックスにまたがる時系列データを保存し、リクエストに対して単一の名前付きリソースを提供します。データ ストリームは、ログ、指標、トレース、その他のPub/Sub に保存されている継続的に生成されたデータに適しています。

このパイプラインの要件

  • 参照元のPub/Sub サブスクリプションが存在し、メッセージが有効な JSON 形式でエンコードされている必要があります。
  • GCP インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上で一般公開されている到達可能な Elasticsearch ホスト。詳細については、Elastic のための Google Cloud 統合をご覧ください。
  • エラー出力の Pub/Sub トピック。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 使用する Cloud Pub/Sub サブスクリプション。名前は projects/<project-id>/subscriptions/<subscription-name> の形式にします。
connectionUrl https://hostname:[port] 形式の Elasticsearch URL、またはElastic Cloud を使用する場合は CloudID を指定します。
apiKey 認証に使用される Base64 エンコード API キー。
errorOutputTopic projects/<project-id>/topics/<topic-name> の形式で失敗したレコードを公開するための Pub/Sub 出力トピック。
dataset (省略可)Pub/Sub 経由で送信されるログのタイプで、すぐに使えるダッシュボードが用意されています。既知のログタイプ値は、audit、vpcflow、firewall です。デフォルト: pubsub
namespace (省略可)環境(開発、生産、QA)のように任意のグループ、チーム、または戦略事業部門。デフォルト: default
batchSize (省略可)ドキュメント数のバッチサイズ。デフォルト: 1000
batchSizeBytes (省略可)バイト数のバッチサイズ。デフォルト: 5242880(5 MB)
maxRetryAttempts (省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: no retries
maxRetryDuration (省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: no retries
javascriptTextTransformGcsPath (省略可).js ファイルの完全な URL。例:gs://your-bucket/your-function.js
javascriptTextTransformFunctionName (省略可)呼び出す JavaScript 関数の名前。たとえば、JavaScript 関数が 関数myTransform(inJson) { ...dostuff...} の場合、関数名は myTransform です。

Pub/Sub to Elasticsearch テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Elasticsearch template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Pubsub_to_Elasticsearch

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名。例: us-central1
  • ERROR_OUTPUT_TOPIC: エラー出力の Pub/Sub トピック
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • CONNECTION_URL: Elasticsearch の URL
  • DATASET: ログタイプ
  • NAMESPACE: データセットの名前空間
  • APIKEY: 認証用に Base64 でエンコードされた API キー
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Pubsub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Pubsub_to_Elasticsearch

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名。例: us-central1
  • ERROR_OUTPUT_TOPIC: エラー出力の Pub/Sub トピック
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • CONNECTION_URL: Elasticsearch の URL
  • DATASET: ログタイプ
  • NAMESPACE: データセットの名前空間
  • APIKEY: 認証用に Base64 でエンコードされた API キー
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Pubsub_to_Elasticsearch",
   }
}
  

Datastream to Cloud Spanner

Datastream to Cloud Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Cloud Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Cloud Spanner へのデータ移行を目的としています。

テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Cloud Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Cloud Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Cloud Spanner データベースに伝播されません。

データの整合性が保証されるのは、すべてのデータが Cloud Spanner に書き込まれ、移行が終了したときだけです。Cloud Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Cloud Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。

オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。

このパイプラインの要件:

  • ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
  • Datastream イベントが複製される Cloud Storage バケット。
  • 既存のテーブルを持つ Cloud Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、これはストリームのルートパスです。
streamName スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。
instanceId 変更が複製される Cloud Spanner インスタンス。
databaseId 変更が複製される Cloud Spanner データベース。
projectId Cloud Spanner プロジェクト ID。
deadLetterQueueDirectory (省略可)エラーキューの出力を保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。
inputFileFormat (省略可)Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
shadowTablePrefix (省略可)シャドウ テーブルの名前に使用される接頭辞。デフォルト: shadow_

Datastream to Cloud Spanner テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Datastream to Spanner template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner",
   }
}
  

Text Files on Cloud Storage to BigQuery(Stream)

Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)で変換して、結果を BigQuery に追加するストリーミング パイプラインです。

パイプラインは無期限に実行され、ドレインではなくキャンセルによって手動で終了させる必要があります。これは、分割可能な DoFn で、ドレインをサポートしていない Watch 変換を使用しているためです。

このパイプラインの要件:

  • BigQuery で出力テーブルのスキーマを記述する JSON ファイルを作成します。

    BigQuery Schema というタイトルになっているトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。例:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath JavaScript UDF の Cloud Storage の場所。例: gs://my_bucket/my_function.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName UDF として呼び出す JavaScript 関数の名前。例: transform
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: UDF の名前
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: UDF の名前
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Text Files on Cloud Storage to Pub/Sub(Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

パイプラインは無期限に実行され、「ドレイン」ではなく「キャンセル」によって手動で終了させる必要があります。これは「Watch」変換を使用しているためで、この変換は「SplittableDoFn」であり、ドレインはサポートしていません。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。たとえば、gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv です。
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

Text Files on Cloud Storage to Pub/Sub(Stream)テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Pub/Sub (Stream) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、匿名化されたデータを指定された BigQuery テーブルへ書き込むストリーミング パイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて、検査を必要としない構造化データを匿名化したりできます。

このパイプラインの要件:

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。 例: my_dlp_api_project
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

次のように置き換えます。

  • TEMPLATE_PROJECT_ID: テンプレートのプロジェクト ID
  • DLP_API_PROJECT_ID: Cloud DLP API プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_DATA: 入力ファイルのパス
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify テンプレート番号
  • DATASET_NAME: BigQuery データセット名
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect テンプレート番号
  • BATCH_SIZE_VALUE: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • TEMPLATE_PROJECT_ID: テンプレートのプロジェクト ID
  • DLP_API_PROJECT_ID: Cloud DLP API プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_DATA: 入力ファイルのパス
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify テンプレート番号
  • DATASET_NAME: BigQuery データセット名
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect テンプレート番号
  • BATCH_SIZE_VALUE: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}