Google 提供のストリーミング テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページでは、ストリーミング テンプレートについて説明します。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Pub/Sub Subscription to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Subscription to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Pub/Sub Topic to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Topic to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • DATASET: BigQuery データセット
  • TABLE_NAME: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub デッドレター トピックにストリーミングされます。

このパイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • Pub/Sub デッドレター トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。

テンプレートのパラメータ

パラメータ 説明
schemaPath Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 失敗したレコードのデッドレターとして使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table>createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。
writeDisposition (省略可)BigQuery の WriteDisposition。例: WRITE_APPENDWRITE_EMPTY、または WRITE_TRUNCATE。デフォルト: WRITE_APPEND
createDisposition (省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVER。デフォルト: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Avro to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: デッドレター キューに使用する Pub/Sub トピック
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • LOCATION: Dataflow リージョン名(例: us-central1
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: デッドレター キューに使用する Pub/Sub トピック
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。

このパイプラインの要件:

  • 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
  • 実行前にコピー先の Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Pub/Sub to Pub/Sub テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Pub/Sub template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOPIC_NAME: Pub/Sub トピック名
  • FILTER_KEY: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。
  • FILTER_VALUE: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub デッドレター トピックに転送されます。

HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

このパイプラインの要件:

  • パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインの実行前に Pub/Sub dead-letter トピックが存在している必要があります。
  • Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
  • Splunk の HEC トークンが生成済みで、利用可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
token Splunk の HEC 認証トークン。この base64 エンコードされた文字列は、セキュリティ強化のために Cloud KMS 鍵で暗号化できます。
url Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (省略可)すべての JavaScript コードを含む Cloud Storage パス。例: gs://mybucket/mytransforms/*.js
javascriptTextTransformFunctionName (省略可)呼び出す JavaScript 関数の名前。たとえば、JavaScript 関数が function myTransform(inJson) { ...dostuff...} の場合、関数名は myTransform です。
batchCount (省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。
parallelism (省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。
disableCertificateValidation (省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。
includePubsubMessage (省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。
tokenKMSEncryptionKey (省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります

Pub/Sub to Splunk テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Splunk template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: JavaScript 関数名
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • TOKEN: Splunk の Http Event Collector トークン
  • URL: Splunk の Http Event Collector の URL パス(例: https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME: Pub/Sub トピック名
  • JAVASCRIPT_FUNCTION: JavaScript 関数名
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。
  • BATCH_COUNT: Splunk に複数のイベントを送信するために使用するバッチサイズ
  • PARALLELISM: Splunk にイベントを送信するために使用する並列リクエストの数
  • DISABLE_VALIDATION: SSL 証明書の検証を無効にする場合、true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。

このパイプラインの要件:

  • パイプラインの実行前に入力 Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを処理するために購読する Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力の Avro ファイルがアーカイブされる出力ディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイルのためのディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (オプション)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字「S」または「N」の繰り返しシーケンスで指定します(例: SSS-NNN)。これらはそれぞれシャード番号またはシャード数に置き換えられます。このパラメータを省略した場合のデフォルトのテンプレート フォーマットは「W-P-SS-of-NN」です。
numShards (省略可)書き込み時に生成される出力シャードの最大数。シャードのデフォルトの最大数は 1 です。

Pub/Sub to Cloud Storage Avro テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Cloud Storage Avro template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート
  • NUM_SHARDS: 出力シャードの数
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILENAME_PREFIX: 使用したい出力ファイル名接頭辞
  • FILENAME_SUFFIX: 使用したい出力ファイル名接尾辞
  • SHARD_TEMPLATE: 使用したい出力シャード テンプレート
  • NUM_SHARDS: 出力シャードの数
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値は「/」で終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Pub/Sub to Text Files on Cloud Storage テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Text Files on Cloud Storage template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や JSON の形式の誤りにより、または変換の実行中に発生したエラーは、入力メッセージとともに BigQuery の deadletter テーブルに記録されます。実行前にテーブルが存在しない場合、パイプラインは自動的に deadletter テーブルを作成します。

このパイプラインの要件:

  • Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
  • MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription Pub/Sub サブスクリプションの名前。例: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017
database コレクションを格納する MongoDB のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
deadletterTable エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name
javascriptTextTransformGcsPath (省略可)UDF 変換を行う JavaScript ファイルがある Cloud Storage の場所。例: gs://mybucket/filename.json
javascriptTextTransformFunctionName (省略可)JavaScript UDF の名前。例: transform
batchSize (省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト単位)。デフォルト: 5242880
maxConnectionIdleTime (省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000
sslEnabled (省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true
ignoreSSLCertificate (省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true
withOrdered (省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true
withSSLInvalidHostNameAllowed (省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true

Pub/Sub to MongoDB テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから Pub/Sub to MongoDB template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • LOCATION: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Text Files on Cloud Storage to BigQuery (Stream)

Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)を使用して変換して、結果を BigQuery に出力するストリーミング パイプラインです。

このパイプラインの要件:

  • 出力テーブルを記述する JSON 形式の BigQuery スキーマ ファイルを作成します。
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath JavaScript UDF の Cloud Storage の場所。例: gs://my_bucket/my_function.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName UDF として呼び出す JavaScript 関数の名前。例: transform
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Cloud Storage Text to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: UDF の名前
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • REGION: リージョン エンドポイント(例: us-west-1
  • TEMP_LOCATION: 一時ファイルを書き込む場所(例: gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: UDF の名前
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: JavaScript コードを含む .js ファイルへの Cloud Storage パス
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Cloud Storage Text to Pub/Sub (Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/files/*.json
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

Cloud Storage Text to Pub/Sub(Stream)テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Cloud Storage Text to Pub/Sub (Stream) template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Pub/Sub トピック名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream)

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、指定された BigQuery テーブルに匿名化されたデータを書き込むパイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて、検査を必要としない構造化データを匿名化したりできます。

このパイプラインの要件:

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。 例: my_dlp_api_project
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID は、テンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID は、Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_INPUT_DATA は、入力ファイルパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE は、Cloud DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME は、BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE は、Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE は、バッチサイズ(csv の場合は API ごとの行数)に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID は、テンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID は、Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_INPUT_DATA は、入力ファイルパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE は、Cloud DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME は、BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE は、Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE は、バッチサイズ(csv の場合は API ごとの行数)に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。

このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。

このパイプラインの要件:

  • Debezium コネクタがデプロイされる必要があります。
  • Pub/Sub メッセージは Beam Row でシリアル化される必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscriptions 読み込まれる Pub/Sub 入力サブスクリプションのカンマ区切りのリスト。<subscription>,<subscription>, ... の形式で指定します。
changeLogDataset ステージング テーブルを格納する BigQuery データセット。<my-dataset> の形式で指定します。
replicaDataset レプリカ テーブルを格納する BigQuery データセットの場所。<my-dataset> の形式で指定します。
省略可: updateFrequencySecs MySQL データベースを複製する BigQuery テーブルを、パイプラインが更新する間隔。

Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery テンプレートの実行

このテンプレートを実行するには、次の手順に従います。

  1. ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
  2. v2/cdc-parent ディレクトリに移動します。
  3. Debezium コネクタがデプロイされていることを確認します。
  4. Maven を使用して、Dataflow テンプレートを実行します。

    このサンプルの次の値は置き換える必要があります。

    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • YOUR_SUBSCRIPTIONS は、Pub/Sub サブスクリプション名のカンマ区切りのリストに置き換えます。
    • YOUR_CHANGELOG_DATASET を変更履歴データの BigQuery データセットに置き換え、YOUR_REPLICA_DATASET をレプリカ テーブルの BigQuery データセットに置き換えます。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka to BigQuery

Apache Kafka to BigQuery テンプレートは、Apache Kafka からテキストデータを取り込み、ユーザー定義関数(UDF)を実行して、結果のレコードを BigQuery に出力するストリーミング パイプラインです。データの変換、UDF の実行、出力テーブルへの挿入で発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。実行前にエラーテーブルが存在しない場合は、作成されます。

このパイプラインの要件

  • 出力 BigQuery テーブルが存在している必要があります。
  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
  • Apache Kafka トピックが存在していて、有効な JSON 形式でメッセージがエンコードされている必要があります。

テンプレートのパラメータ

パラメータ 説明
outputTableSpec Apache Kafka メッセージを書き込む BigQuery 出力テーブルの場所。my-project:dataset.table の形式で指定します。
inputTopics 読み取る Apache Kafka 入力トピック(カンマ区切りのリスト)。例: messages
bootstrapServers 実行中の Apache Kafka ブローカー サーバーのホストアドレス(カンマ区切りのリスト)。各ホストアドレスは 35.70.252.199:9092 の形式で指定します。
javascriptTextTransformGcsPath (省略可)JavaScript UDF への Cloud Storage ロケーション パス。例: gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (省略可)UDF として呼び出す JavaScript の名前。例: transform
outputDeadletterTable (省略可)deadletter レコードを書き込む BigQuery 出力テーブルの場所。my-project:dataset.my-deadletter-table の形式で指定します。テーブルが存在していない場合、パイプラインの実行時に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Apache Kafka to BigQuery テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Apache Kafka to BigQuery template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • REGION_NAME は、Dataflow リージョン名に置き換えます。例: us-central1.
  • BIGQUERY_TABLE を BigQuery テーブル名に置き換えます。
  • KAFKA_TOPICS は、Apache Kafka トピックリストに置き換えます。複数のトピックを指定する場合は、カンマをエスケープする方法の手順に従ってください。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • KAFKA_SERVER_ADDRESSES は、Apache Kafka ブローカー サーバーの IP アドレスリストに置き換えます。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • LOCATION は、Dataflow リージョン名に置き換えます。例: us-central1.
  • BIGQUERY_TABLE を BigQuery テーブル名に置き換えます。
  • KAFKA_TOPICS は、Apache Kafka トピックリストに置き換えます。複数のトピックを指定する場合は、カンマをエスケープする方法の手順に従ってください。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • KAFKA_SERVER_ADDRESSES は、Apache Kafka ブローカー サーバーの IP アドレスリストに置き換えます。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}