Google 提供のストリーミング テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページでは、ストリーミング テンプレートについて説明します。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Pub/Sub Subscription to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Subscription to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
outputDeadletterTable 出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Pub/Sub Topic to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Topic to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

このパイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 未処理の Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。

テンプレートのパラメータ

パラメータ 説明
schemaPath Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table>createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。
writeDisposition (省略可)BigQuery の WriteDisposition。例: WRITE_APPENDWRITE_EMPTY、または WRITE_TRUNCATE。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVER。デフォルト: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Avro to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名(例: us-central1
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。

このパイプラインの要件:

  • 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
  • 実行前にコピー先の Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Pub/Sub to Pub/Sub テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Pub/Sub template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。このテンプレートの最も一般的な使用例は Splunk へのログのエクスポートです。基盤となるワークフローの例については、Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイするをご覧ください。

Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub の未処理のトピックに転送されます。

HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に、Pub/Sub に未処理のトピックが存在している必要があります。
  • Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
  • Splunk の HEC トークンが生成済みで、利用可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
token Splunk の HEC 認証トークン。この base64 エンコードされた文字列は、セキュリティ強化のために Cloud KMS 鍵で暗号化できます。
url Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (省略可)すべての JavaScript コードを含む Cloud Storage パス。例: gs://mybucket/mytransforms/*.js
javascriptTextTransformFunctionName (省略可)呼び出す JavaScript 関数の名前。たとえば、JavaScript 関数が function myTransform(inJson) { ...dostuff...} の場合、関数名は myTransform です。
batchCount (省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。
parallelism (省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。
disableCertificateValidation (省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。
includePubsubMessage (省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。
tokenKMSEncryptionKey (省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります

Pub/Sub to Splunk テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Splunk template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: JavaScript 関数名
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: JavaScript 関数名
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。

このパイプラインの要件:

  • パイプラインの実行前に入力 Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを処理するために購読する Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力の Avro ファイルがアーカイブされる出力ディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイルのためのディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (オプション)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字「S」または「N」の繰り返しシーケンスで指定します(例: SSS-NNN)。これらはそれぞれシャード番号またはシャード数に置き換えられます。このパラメータを省略した場合のデフォルトのテンプレート フォーマットは「W-P-SS-of-NN」です。
numShards (省略可)書き込み時に生成される出力シャードの最大数。シャードのデフォルトの最大数は 1 です。

Pub/Sub to Cloud Storage Avro テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Cloud Storage Avro template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート
  • NUM_SHARDS: 出力シャードの数
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート
  • NUM_SHARDS: 出力シャードの数
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Pub/Sub to Text Files on Cloud Storage テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Text Files on Cloud Storage template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。

このパイプラインの要件:

  • Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
  • MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription Pub/Sub サブスクリプションの名前。例: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017
database コレクションを格納する MongoDB のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
deadletterTable エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name
javascriptTextTransformGcsPath (省略可)UDF 変換を行う JavaScript ファイルがある Cloud Storage の場所。例: gs://mybucket/filename.json
javascriptTextTransformFunctionName (省略可)JavaScript UDF の名前。例: transform
batchSize (省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト単位)。デフォルト: 5242880
maxConnectionIdleTime (省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000
sslEnabled (省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true
ignoreSSLCertificate (省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true
withOrdered (省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true
withSSLInvalidHostNameAllowed (省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true

Pub/Sub to MongoDB テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから Pub/Sub to MongoDB template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • LOCATION: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Datastream to Cloud Spanner

Datastream to Cloud Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Cloud Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Cloud Spanner へのデータ移行を目的としています。

テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Cloud Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Cloud Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Cloud Spanner データベースに伝播されません。

データの整合性が保証されるのは、すべてのデータが Cloud Spanner に書き込まれ、移行が終了したときだけです。Cloud Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Cloud Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。

オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。

このパイプラインの要件:

  • ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
  • Datastream イベントが複製される Cloud Storage バケット。
  • 既存のテーブルを持つ Cloud Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、これはストリームのルートパスです。
streamName スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。
instanceId 変更が複製される Cloud Spanner インスタンス。
databaseId 変更が複製される Cloud Spanner データベース。
projectId Cloud Spanner プロジェクト ID。
deadLetterQueueDirectory (省略可)エラーキューの出力を保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。
inputFileFormat (省略可)Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
shadowTablePrefix (省略可)シャドウ テーブルの名前に使用される接頭辞。デフォルト: shadow_

Datastream to Cloud Spanner テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから Datastream to Cloud Spanner template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Cloud Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Cloud Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner",
   }
}
  

Text Files on Cloud Storage to BigQuery(Stream)

Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)を使用して変換して、結果を BigQuery に追加するストリーミング パイプラインです。

パイプラインは無期限に実行され、ドレインではなくキャンセルによって手動で終了させる必要があります。これは Watch 変換を使用しているためで、この変換は分割可能な DoFn であり、ドレインはサポートしていません。

このパイプラインの要件:

  • BigQuery で出力テーブルのスキーマを記述する JSON ファイルを作成します。

    BigQuery Schema というタイトルになっているトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。例:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath JavaScript UDF の Cloud Storage の場所。例: gs://my_bucket/my_function.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName UDF として呼び出す JavaScript 関数の名前。例: transform
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Cloud Storage Text to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: UDF の名前
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: UDF の名前
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Text Files on Cloud Storage to Pub/Sub(Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

パイプラインは無期限に実行され、「ドレイン」ではなく「キャンセル」によって手動で終了させる必要があります。これは「Watch」変換を使用しているためで、この変換は「SplittableDoFn」であり、ドレインはサポートしていません。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。たとえば、gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv です。
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

Text Files on Cloud Storage to Pub/Sub(Stream)テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Text Files on Cloud Storage to Pub/Sub (Stream) template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、匿名化されたデータを指定された BigQuery テーブルへ書き込むストリーミング パイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて、検査を必要としない構造化データを匿名化したりできます。

このパイプラインの要件:

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。 例: my_dlp_api_project
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100

Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

次のように置き換えます。

  • TEMPLATE_PROJECT_ID: テンプレートのプロジェクト ID
  • DLP_API_PROJECT_ID: Cloud DLP API プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_DATA: 入力ファイルのパス
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify テンプレート番号
  • DATASET_NAME: BigQuery データセット名
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect テンプレート番号
  • BATCH_SIZE_VALUE: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • TEMPLATE_PROJECT_ID: テンプレートのプロジェクト ID
  • DLP_API_PROJECT_ID: Cloud DLP API プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_DATA: 入力ファイルのパス
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify テンプレート番号
  • DATASET_NAME: BigQuery データセット名
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect テンプレート番号
  • BATCH_SIZE_VALUE: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。

このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。

このパイプラインの要件:

  • Debezium コネクタがデプロイされる必要があります。
  • Pub/Sub メッセージは Beam Row でシリアル化される必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscriptions 読み込まれる Pub/Sub 入力サブスクリプションのカンマ区切りのリスト。<subscription>,<subscription>, ... の形式で指定します。
changeLogDataset ステージング テーブルを格納する BigQuery データセット。<my-dataset> の形式で指定します。
replicaDataset レプリカ テーブルを格納する BigQuery データセットの場所。<my-dataset> の形式で指定します。
updateFrequencySecs (省略可)MySQL データベースを複製する BigQuery テーブルを、パイプラインが更新する間隔。

Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery テンプレートの実行

このテンプレートを実行するには、次の手順を行います。

  1. ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
  2. v2/cdc-parent ディレクトリに移動します。
  3. Debezium コネクタがデプロイされていることを確認します。
  4. Maven を使用して、Dataflow テンプレートを実行します。

    次の値を置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • YOUR_SUBSCRIPTIONS: Pub/Sub サブスクリプション名のカンマ区切りのリスト。
    • YOUR_CHANGELOG_DATASET: 変更履歴データの BigQuery データセット。
    • YOUR_REPLICA_DATASET: レプリカ テーブルの BigQuery データセット。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka to BigQuery

Apache Kafka to BigQuery テンプレートは、Apache Kafka からテキストデータを取り込み、ユーザー定義関数(UDF)を実行して、結果のレコードを BigQuery に出力するストリーミング パイプラインです。データの変換、UDF の実行、出力テーブルへの挿入で発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。実行前にエラーテーブルが存在しない場合は、作成されます。

このパイプラインの要件

  • 出力 BigQuery テーブルが存在している必要があります。
  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
  • Apache Kafka トピックが存在していて、有効な JSON 形式でメッセージがエンコードされている必要があります。

テンプレートのパラメータ

パラメータ 説明
outputTableSpec Apache Kafka メッセージを書き込む BigQuery 出力テーブルの場所。my-project:dataset.table の形式で指定します。
inputTopics 読み取る Apache Kafka 入力トピック(カンマ区切りのリスト)。例: messages
bootstrapServers 実行中の Apache Kafka ブローカー サーバーのホストアドレス(カンマ区切りのリスト)。各ホストアドレスは 35.70.252.199:9092 の形式で指定します。
javascriptTextTransformGcsPath (省略可)JavaScript UDF への Cloud Storage ロケーション パス。例: gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (省略可)UDF として呼び出す JavaScript の名前。例: transform
outputDeadletterTable (省略可)出力テーブルに到達できなかったメッセージの BigQuery テーブル。my-project:dataset.my-deadletter-table の形式で指定します。テーブルが存在していない場合、パイプラインの実行時に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Apache Kafka to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Apache Kafka to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • BIGQUERY_TABLE: BigQuery テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、こちらの手順でカンマをエスケープしてください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • YOUR_JAVASCRIPT_FUNCTION: UDF の名前
  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名(例: us-central1
  • BIGQUERY_TABLE: BigQuery テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、こちらの手順でカンマをエスケープしてください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • YOUR_JAVASCRIPT_FUNCTION: UDF の名前
  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}
  

Datastream to BigQuery(Stream)

Datastream to BigQuery テンプレートは、Datastream データを読み取り、BigQuery に複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、時間でパーティション分割された BigQuery ステージング テーブルに複製します。レプリケーションに続いて、このテンプレートは BigQuery で MERGE を実行し、すべての変更データ キャプチャ(CDC)の変更をソーステーブルのレプリカに upsert します。

このテンプレートは、レプリケーションによって管理される BigQuery テーブルの作成と更新を処理します。データ定義言語(DDL)が必要な場合、Datastream に対するコールバックによってソーステーブル スキーマが抽出され、BigQuery のデータ型に変換されます。サポートされているオペレーションは次のとおりです。

  • データが挿入されると、新しいテーブルが作成される。
  • null の初期値を持つ BigQuery テーブルに新しい列が追加される。
  • ドロップされた列は BigQuery で無視され、将来の値が null になる。
  • 名前が変更された列が BigQuery に新しい列として追加される。
  • 型の変更が BigQuery に伝播されない。

このパイプラインの要件:

  • データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • BigQuery の宛先データセットが作成され、Compute Engine サービス アカウントにこれらのデータセットへの管理者権限が付与されている。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。
gcsPubSubSubscription Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
inputFileFormat Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
outputStagingDatasetTemplate ステージング テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset}_log)。
outputDatasetTemplate レプリカ テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset})。
outputStagingTableNameTemplate (省略可)ステージング テーブルの名前のテンプレート。デフォルトは {_metadata_table}_log です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table}_log をおすすめします。
outputTableNameTemplate (省略可)レプリカ テーブルの名前のテンプレート。デフォルトは {_metadata_table} です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table} をおすすめします。
outputProjectId (省略可)データを出力する BigQuery データセットのプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
deadLetterQueueDirectory (省略可)処理されなかった理由とともに、未処理のメッセージを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
streamName (省略可)スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。
mergeFrequencyMinutes (省略可)特定のテーブルのマージ間隔の分数。デフォルトは 5 です。
dlqRetryMinutes (省略可)デッドレター キュー(DLQ)の再試行間隔の分数。デフォルトは 10 です。

Datastream to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから Datastream to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET: BigQuery データセット名。
  • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET: BigQuery データセット名。
  • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Datastream to PostgreSQL(Stream)

Datastream to PostgreSQL テンプレートは、Datastream データを読み取り、それを PostgreSQL データベースに複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、このデータを PostgreSQL レプリカ テーブルに複製します。

このテンプレートはデータ定義言語(DDL)をサポートしていないため、すべてのテーブルがすでに PostgreSQL に存在することを想定しています。レプリケーションでは、Dataflow ステートフル変換を使用して最新でないデータをフィルタし、順不同データの整合性を確保します。たとえば、ある行のより新しいバージョンの読み取りがすでに行われている場合、その行の遅れて到着したバージョンは無視されます。データ操作言語(DML)を実行することは、ソースデータとターゲット データを完全に複製するための最善の方法です。実行される DML ステートメントは、次のルールに従います。

  • 主キーが存在する場合、insert と update のオペレーションでは upsert 構文を使用します(例: INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 主キーが存在する場合、削除 DML として delete が複製されます。
  • 主キーが存在しない場合、insert と update の両方のオペレーションがテーブルに挿入されます。
  • 主キーが存在しない場合、delete は無視されます。

Oracle から Postgres へのユーティリティを使用する場合で、主キーが存在しない場合は、PostgreSQL に ROWID を主キーとして追加します。

このパイプラインの要件:

  • データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • PostgreSQL データベースが必要なスキーマでシードされている。
  • Dataflow ワーカーと PostgreSQL 間のネットワーク アクセスが設定されている。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。
gcsPubSubSubscription Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
inputFileFormat Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
databaseHost 接続先の PostgreSQL ホスト。
databaseUser レプリケーションのすべてのテーブルへの書き込みに必要なすべての権限を持つ PostgreSQL ユーザー。
databasePassword 特定の PostgreSQL ユーザーのパスワード。
databasePort (省略可)接続する PostgreSQL データベース ポート。デフォルトは 5432 です。
databaseName (省略可)接続する PostgreSQL データベースの名前。デフォルトは postgres です。
streamName (省略可)スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。

Datastream to PostgreSQL テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから Datastream to PostgreSQL template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST: PostgreSQL ホスト IP。
  • DATABASE_USER: PostgreSQL ユーザー。
  • DATABASE_PASSWORD: PostgreSQL パスワード。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認証が必要です。

次のように置き換えます。

  • YOUR_PROJECT_ID: テンプレートのプロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow リージョン名。例: us-central1
  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST: PostgreSQL ホスト IP。
  • DATABASE_USER: PostgreSQL ユーザー。
  • DATABASE_PASSWORD: PostgreSQL パスワード。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres",
   }
}