Google 提供のストリーミング テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページでは、ストリーミング テンプレートについて説明します。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件は、次のとおりです。

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Pub/Sub Subscription to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Subscription to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件は、次のとおりです。

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Pub/Sub Topic to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Topic to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。

このパイプラインの要件は、次のとおりです。

  • 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
  • 実行前にコピー先の Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Pub/Sub to Pub/Sub テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Pub/Sub template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_TOPIC_NAME は、Pub/Sub トピック名に置き換えます。
  • FILTER_KEY は、メッセージのフィルタリングに使用する属性キーの名前に置き換えます。
  • FILTER_VALUE は、属性値に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_TOPIC_NAME は、Pub/Sub トピック名に置き換えます。
  • FILTER_KEY は、メッセージのフィルタリングに使用する属性キーの名前に置き換えます。
  • FILTER_VALUE は、属性値に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Pub/Sub to Splunk

Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub dead-letter トピックに転送されます。

HEC トークンの保護を強化するために、Cloud KMS 鍵で暗号化された Base64 エンコードされた HEC トークン パラメータとともに Cloud KMS 鍵を渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインの実行前に Pub/Sub dead-letter トピックが存在している必要があります。
  • Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
  • Splunk の HEC トークンが生成済みで、利用可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Pub/Sub サブスクリプション。例:projects/<project-id>/subscriptions/<subscription-name>
token Splunk の HEC 認証トークン。 この base64 エンコードされた文字列は、セキュリティ強化のために Cloud KMS 鍵で暗号化できます。
url Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (省略可)すべての JavaScript コードを含む Cloud Storage パス。例: gs://mybucket/mytransforms/*.js
javascriptTextTransformFunctionName (省略可)呼び出す JavaScript 関数の名前。たとえば、JavaScript 関数が function myTransform(inJson) { ...dostuff...} の場合、関数名は myTransform です。
batchCount (省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。
parallelism (省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。
disableCertificateValidation (省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。
includePubsubMessage (省略可)Pub/Sub メッセージ全体をペイロードに含めます。デフォルトは false です(データ要素のみがペイロードに含まれます)。
tokenKMSEncryptionKey (省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります。

Pub/Sub to Splunk テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Splunk template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • INPUT_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
  • TOKENは 、Splunk の HTTP Event Collector トークンに置き換えます。
  • URL は、Splunk の HTTP Event Collector の URL パスに置き換えます。例: https://splunk-hec-host:8088。
  • DEADLETTER_TOPIC_NAME は、Pub/Sub トピック名に置き換えます。
  • YOUR_JAVASCRIPT_FUNCTION は、JavaScript 関数の名前に置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • BATCH_COUNT は、複数のイベントを Splunk に送信するために使用するバッチサイズに置き換えます。
  • PARALLELISM は、Splunk へのイベント送信に使用する並列リクエストの数に置き換えます。
  • SSL 証明書の検証を無効にする場合は、DISABLE_VALIDATION を true に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/YOUR_PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • INPUT_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
  • TOKENは 、Splunk の HTTP Event Collector トークンに置き換えます。
  • URL は、Splunk の HTTP Event Collector の URL パスに置き換えます。例: https://splunk-hec-host:8088。
  • DEADLETTER_TOPIC_NAME は、Pub/Sub トピック名に置き換えます。
  • YOUR_JAVASCRIPT_FUNCTION は、JavaScript 関数の名前に置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • BATCH_COUNT は、複数のイベントを Splunk に送信するために使用するバッチサイズに置き換えます。
  • PARALLELISM は、Splunk へのイベント送信に使用する並列リクエストの数に置き換えます。
  • SSL 証明書の検証を無効にする場合は、DISABLE_VALIDATION を true に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/YOUR_PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   },
   "environment": { "zone": "us-central1-f" }
}

Pub/Sub to Cloud Storage Avro

Pub/Sub to Cloud Storage Avro テンプレートは、Pub/Sub トピックからデータを読み取り、Avro ファイルを指定の Cloud Storage バケットに書き込むストリーミング パイプラインです。このパイプラインは、ウィンドウ処理された書き込みを実行するために使用されるオプションのユーザー指定ウィンドウ期間をサポートしています。

このパイプラインの要件は、次のとおりです。

  • パイプラインの実行前に入力 Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを処理するために購読する Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力の Avro ファイルがアーカイブされる出力ディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイルのためのディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (オプション)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字「S」または「N」の繰り返しシーケンスで指定します(例: SSS-NNN)。これらはそれぞれシャード番号またはシャード数に置き換えられます。このパラメータを省略した場合のデフォルトのテンプレート フォーマットは「W-P-SS-of-NN」です。
numShards (省略可)書き込み時に生成される出力シャードの最大数。シャードのデフォルトの最大数は 1 です。
windowDuration (省略可)データの書き込みが行われるウィンドウ期間。デフォルトは 5m です。使用できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時間が Nh(例: 2h)です。

Pub/Sub to Cloud Storage Avro テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Cloud Storage Avro template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME は、Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を、Cloud Storage バケットの名前に置き換えます。
  • FILENAME_PREFIX は、使用する出力ファイル名接頭辞に置き換えます。
  • FILENAME_SUFFIX は、使用する出力ファイル名接尾辞に置き換えます。
  • SHARD_TEMPLATE は、使用する出力シャード テンプレートに置き換えます。
  • NUM_SHARDS は、出力シャードの数に置き換えます。
  • WINDOW_DURATION は、出力ウィンドウ期間に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Avro \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
avroTempDirectory=gs://YOUR_BUCKET_NAME/temp/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
windowDuration=WINDOW_DURATION

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME は、Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を、Cloud Storage バケットの名前に置き換えます。
  • FILENAME_PREFIX は、使用する出力ファイル名接頭辞に置き換えます。
  • FILENAME_SUFFIX は、使用する出力ファイル名接尾辞に置き換えます。
  • SHARD_TEMPLATE は、使用する出力シャード テンプレートに置き換えます。
  • NUM_SHARDS は、出力シャードの数に置き換えます。
  • WINDOW_DURATION は、出力ウィンドウ期間に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "avroTempDirectory": "gs://YOUR_BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
       "windowDuration": "WINDOW_DURATION"
   },
   "environment": { "zone": "us-central1-f" }
}

Pub/Sub to Cloud Storage Text

Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件は、次のとおりです。

  • 実行前に Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Pub/Sub to Cloud Storage Text テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Cloud Storage Text template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を、Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を、Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to BigQuery (Stream)

Cloud Storage Text to BigQuery (Stream) パイプラインは、Cloud Storage に格納されているテキスト ファイルをストリーミングし、指定された JavaScript ユーザー定義関数(UDF)を使用して変換して、結果を BigQuery に出力するストリーミング パイプラインです。

このパイプラインの要件は、次のとおりです。

  • 出力テーブルを記述する JSON 形式の BigQuery スキーマ ファイルを作成します。
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath JavaScript UDF の Cloud Storage の場所。例: gs://my_bucket/my_function.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName UDF として呼び出す JavaScript 関数の名前。例: transform
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル(「Deadletter」テーブル)。例: my-project:dataset.my-deadletter-table。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Cloud Storage Text to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • PATH_TO_BIGQUERY_SCHEMA_JSON を、スキーマ定義が含まれる JSON ファイルの Cloud Storage パスで置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_YOUR_TEXT_DATA をテキスト データセットの Cloud Storage パスに置き換えます。
  • BIGQUERY_TABLE を BigQuery テーブル名に置き換えます。
  • BIGQUERY_DEADLETTER_TABLE は、BigQuery deadletter テーブル名に置き換えます。
  • PATH_TO_TEMP_DIR_ON_GCS を一時ディレクトリの Cloud Storage パスに置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_DEADLETTER_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • PATH_TO_BIGQUERY_SCHEMA_JSON を、スキーマ定義が含まれる JSON ファイルの Cloud Storage パスで置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_YOUR_TEXT_DATA をテキスト データセットの Cloud Storage パスに置き換えます。
  • BIGQUERY_TABLE を BigQuery テーブル名に置き換えます。
  • BIGQUERY_DEADLETTER_TABLE は、BigQuery deadletter テーブル名に置き換えます。
  • PATH_TO_TEMP_DIR_ON_GCS を一時ディレクトリの Cloud Storage パスに置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_YOUR_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_DEADLETTER_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Pub/Sub (Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件は、次のとおりです。

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/files/*.json
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

Cloud Storage Text to Pub/Sub(Stream)テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Cloud Storage Text to Pub/Sub (Stream) template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream)

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、指定された BigQuery テーブルに匿名化されたデータを書き込むパイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて、検査を必要としない構造化データを匿名化したりできます。

このパイプラインの要件は、次のとおりです。

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。 例: my_dlp_api_project
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID は、テンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID は、Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_INPUT_DATA は、入力ファイルパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE は、Cloud DLP DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME は、BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE は、Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE は、バッチサイズ(csv の場合は API ごとの行数)に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID は、テンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID は、Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_INPUT_DATA は、入力ファイルパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE は、Cloud DLP DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME は、BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE は、Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE は、バッチサイズ(csv の場合は API ごとの行数)に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。

このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。

このパイプラインの要件は、次のとおりです。

  • Debezium コネクタがデプロイされる必要があります。
  • Pub/Sub メッセージは Beam Row でシリアル化される必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscriptions 読み込まれる Pub/Sub 入力サブスクリプションのカンマ区切りのリスト。<subscription>,<subscription>, ... の形式で指定します。
changeLogDataset ステージング テーブルを格納する BigQuery データセット。<my-dataset> の形式で指定します。
replicaDataset レプリカ テーブルを格納する BigQuery データセットの場所。<my-dataset> の形式で指定します。
省略可: updateFrequencySecs MySQL データベースを複製する BigQuery テーブルを、パイプラインが更新する間隔。

Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery テンプレートの実行

このテンプレートを実行するには、次の手順に従います。

  1. ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
  2. v2/cdc-parent ディレクトリに移動します。
  3. Debezium コネクタがデプロイされていることを確認します。
  4. Maven を使用して、Dataflow テンプレートを実行します。

    このサンプルの次の値は置き換える必要があります。

    • PROJECT_ID は、プロジェクト ID に置き換えます。
    • YOUR_SUBSCRIPTIONS は、Pub/Sub サブスクリプション名のカンマ区切りリストに置き換えます。
    • YOUR_CHANGELOG_DATASET は変更履歴データ用の BigQuery データセットに置き換え、YOUR_REPLICA_DATASET はレプリカ テーブル用の BigQuery データセットに置き換えます。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Pub/Sub へのストリーミング データ生成ツール

Dataflow パイプラインの開発で一般的な要件の 1 つは、特定の秒間クエリ数(QPS)でパフォーマンス ベンチマークを実行することです。このテンプレートを使用すると、ユーザーが指定したスキーマに基づいて、Pub/Sub トピックに偽の JSON メッセージを生成できます。

パイプラインで使用されるJSON データ生成ツール ライブラリでは、各スキーマ フィールドでさまざまなフェーカー関数を使用できます。フェーカー関数とスキーマ形式の詳細については、ドキュメントをご覧ください。

このパイプラインの要件:

  • スキーマ ファイルを作成し、Cloud Storage のロケーションに保存します。
  • 実行前に出力 Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
schemaLocation スキーマ ファイルの場所。例: gs://mybucket/filename.json
topic パイプラインがデータをパブリッシュする Pub/Sub トピックの名前。例: projects/<project-id>/topics/<topic-name>
qps 1 秒あたりに公開されるメッセージの数。例: 100
autoscalingAlgorithm (省略可)ワーカーの自動スケーリングに使用されるアルゴリズム。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。
maxNumWorkers (省略可)ワーカーマシンの最大数。例: 10

ストリーミング データ生成ツール テンプレートの実行

コンソール

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、プロジェクト ID に置き換えます。
  • REGION_NAME は、Dataflow リージョン名に置き換えます。例: us-central1
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は、有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • SCHEMA_LOCATION は、Cloud Storage 内のスキーマ ファイルのパスに置き換えます。例: gs://mybucket/filename.json
  • PUBSUB_TOPIC は、出力 Pub/Sub トピックに置き換えます。例: projects/<project-id>/topics/<topic-name>
  • QPS は、1 秒あたりに公開されるメッセージの数に置き換えます。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
topic=PUBSUB_TOPIC,\
qps=QPS
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、プロジェクト ID に置き換えます。
  • REGION_NAME は、Dataflow リージョン名に置き換えます。例: us-central1
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は、有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • SCHEMA_LOCATION は、Cloud Storage 内のスキーマ ファイルのパスに置き換えます。例: gs://mybucket/filename.json
  • PUBSUB_TOPIC は、出力 Pub/Sub トピックに置き換えます。例: projects/<project-id>/topics/<topic-name>
  • QPS は、1 秒あたりに公開されるメッセージの数に置き換えます。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "topic": "PUBSUB_TOPIC",
          "qps": "QPS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}