Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。
このページでは、ストリーミング テンプレートについて説明します。
- Pub/Sub Subscription to BigQuery
- Pub/Sub Topic to BigQuery
- Pub/Sub Avro to BigQuery
- Pub/Sub to Pub/Sub
- Pub/Sub to Splunk
- Pub/Sub to Avro Files on Cloud Storage
- Pub/Sub to Text Files on Cloud Storage
- Pub/Sub to MongoDB
- Text Files on Cloud Storage to BigQuery (Stream)
- Text Files on Cloud Storage to Pub/Sub (Stream)
- Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)
- Change Data Capture to BigQuery (Stream)
- Apache Kafka to BigQuery
Pub/Sub Subscription to BigQuery
Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。
このパイプラインの要件:
- Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、
{"k1":"v1", "k2":"v2"}
という形式でフォーマットしたメッセージは、BigQuery テーブルのk1
とk2
の 2 列に文字列データ型で挿入されます。 - パイプラインの実行前に出力テーブルが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。 |
outputTableSpec |
BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。 |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
Pub/Sub Subscription to BigQuery テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Subscription to BigQuery template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
Pub/Sub Topic to BigQuery
Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。
このパイプラインの要件:
- Pub/Sub メッセージは JSON 形式となっている必要があります。JSON 形式については、こちらをご覧ください。たとえば、
{"k1":"v1", "k2":"v2"}
という形式でフォーマットしたメッセージは、BigQuery テーブルのk1
とk2
の 2 列に文字列データ型で挿入されます。 - パイプラインの実行前に出力テーブルが存在する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。 |
outputTableSpec |
BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。 |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
Pub/Sub Topic to BigQuery テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Topic to BigQuery template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/PubSub_to_BigQuery
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/PubSub_to_BigQuery
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
Pub/Sub Avro to BigQuery
Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
このパイプラインの要件
- 入力 Pub/Sub サブスクリプションが存在していること。
- Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 未処理の Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
schemaPath |
Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc 。 |
inputSubscription |
読み取り元の Pub/Sub 入力サブスクリプション。例:projects/<project>/subscriptions/<subscription> |
outputTopic |
未処理レコードに使用する Pub/Sub トピック。例:projects/<project-id>/topics/<topic-name> |
outputTableSpec |
BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table> 。createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。 |
writeDisposition |
(省略可)BigQuery の WriteDisposition。例: WRITE_APPEND 、WRITE_EMPTY 、または WRITE_TRUNCATE 。デフォルト: WRITE_APPEND |
createDisposition |
(省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDED 、CREATE_NEVER 。デフォルト: CREATE_IF_NEEDED |
Pub/Sub Avro to BigQuery テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub Avro to BigQuery template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: gcloud
コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。
このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery
以下を置き換えます。
JOB_NAME
: 任意のジョブ名。REGION_NAME
: Dataflow リージョン名(例:us-central1
)SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
API
REST API からの実行このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
JOB_NAME
: 任意のジョブ名。LOCATION
: Dataflow リージョン名(例:us-central1
)SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Pub/Sub to Pub/Sub
Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。
このパイプラインの要件:
- 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
- 実行前にコピー先の Pub/Sub トピックが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name> 。 |
outputTopic |
書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> 。 |
filterKey |
(省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。 |
filterValue |
(省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。 |
Pub/Sub to Pub/Sub テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Pub/Sub template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOPIC_NAME
: Pub/Sub トピック名FILTER_KEY
: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。FILTER_VALUE
: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOPIC_NAME
: Pub/Sub トピック名FILTER_KEY
: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。FILTER_VALUE
: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)ではフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
Pub/Sub to Splunk
Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub 未処理トピックに転送されます。
HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。
このパイプラインの要件:
- パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
- Pub/Sub 未処理トピックは、パイプラインを実行する前に存在している必要があります。
- Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
- Splunk の HEC トークンが生成済みで、利用可能である必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name> 。 |
token |
Splunk の HEC 認証トークン。この base64 エンコードされた文字列は、セキュリティ強化のために Cloud KMS 鍵で暗号化できます。 |
url |
Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。 |
outputDeadletterTopic |
配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> 。 |
javascriptTextTransformGcsPath |
(省略可)すべての JavaScript コードを含む Cloud Storage パス。例: gs://mybucket/mytransforms/*.js |
javascriptTextTransformFunctionName |
(省略可)呼び出す JavaScript 関数の名前。たとえば、JavaScript 関数が function myTransform(inJson) { ...dostuff...} の場合、関数名は myTransform です。 |
batchCount |
(省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。 |
parallelism |
(省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。 |
disableCertificateValidation |
(省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。 |
includePubsubMessage |
(省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。 |
tokenKMSEncryptionKey |
(省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります。 |
Pub/Sub to Splunk テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Splunk template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOKEN
: Splunk の Http Event Collector トークンURL
: Splunk の Http Event Collector の URL パス(例:https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: Pub/Sub トピック名JAVASCRIPT_FUNCTION
: JavaScript 関数名PATH_TO_JAVASCRIPT_UDF_FILE
: JavaScript コードを含む.js
ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。BATCH_COUNT
: Splunk に複数のイベントを送信するために使用するバッチサイズPARALLELISM
: Splunk にイベントを送信するために使用する並列リクエストの数DISABLE_VALIDATION
: SSL 証明書の検証を無効にする場合、true
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOKEN
: Splunk の Http Event Collector トークンURL
: Splunk の Http Event Collector の URL パス(例:https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: Pub/Sub トピック名JAVASCRIPT_FUNCTION
: JavaScript 関数名PATH_TO_JAVASCRIPT_UDF_FILE
: JavaScript コードを含む.js
ファイルへの Cloud Storage パス(例: gs://your-bucket/your-function.js)。BATCH_COUNT
: Splunk に複数のイベントを送信するために使用するバッチサイズPARALLELISM
: Splunk にイベントを送信するために使用する並列リクエストの数DISABLE_VALIDATION
: SSL 証明書の検証を無効にする場合、true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION" } }
Pub/Sub to Avro Files on Cloud Storage
Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。
このパイプラインの要件:
- パイプラインの実行前に入力 Pub/Sub トピックが存在すること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
メッセージを処理するために購読する Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 |
outputDirectory |
出力の Avro ファイルがアーカイブされる出力ディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/ 。 |
avroTempDirectory |
一時的な Avro ファイルのためのディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/ 。 |
outputFilenamePrefix |
(省略可)Avro ファイルの出力ファイル名接頭辞。 |
outputFilenameSuffix |
(オプション)Avro ファイルの出力ファイル名接尾辞。 |
outputShardTemplate |
(省略可)出力ファイルのシャード テンプレート。文字「S」または「N」の繰り返しシーケンスで指定します(例: SSS-NNN)。これらはそれぞれシャード番号またはシャード数に置き換えられます。このパラメータを省略した場合のデフォルトのテンプレート フォーマットは「W-P-SS-of-NN」です。 |
numShards |
(省略可)書き込み時に生成される出力シャードの最大数。シャードのデフォルトの最大数は 1 です。 |
Pub/Sub to Cloud Storage Avro テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Cloud Storage Avro template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILENAME_PREFIX
: 使用したい出力ファイル名接頭辞FILENAME_SUFFIX
: 使用したい出力ファイル名接尾辞SHARD_TEMPLATE
: 使用したい出力シャード テンプレートNUM_SHARDS
: 出力シャードの数
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ numShards=NUM_SHARDS,\ avroTempDirectory=gs://BUCKET_NAME/temp/
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILENAME_PREFIX
: 使用したい出力ファイル名接頭辞FILENAME_SUFFIX
: 使用したい出力ファイル名接尾辞SHARD_TEMPLATE
: 使用したい出力シャード テンプレートNUM_SHARDS
: 出力シャードの数
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE", "numShards": "NUM_SHARDS", } }
Pub/Sub to Text Files on Cloud Storage
Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。
このパイプラインの要件:
- 実行前に Pub/Sub トピックが存在している必要があります。
- トピックに公開するメッセージは、テキスト形式となる必要があります。
- トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 |
outputDirectory |
出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/ この値は「/」で終わる必要があります。 |
outputFilenamePrefix |
ウィンドウ処理されたファイルの接頭辞。例: output- |
outputFilenameSuffix |
ウィンドウ処理されたファイルの接尾辞。通常は、.txt や .csv などの拡張子です。 |
outputShardTemplate |
シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplate は W-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplate の SS-of-NN 部分は 00-of-01 になります。
|
Pub/Sub to Text Files on Cloud Storage テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Pub/Sub to Text Files on Cloud Storage template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
Pub/Sub to MongoDB
Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。
このパイプラインの要件:
- Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
- MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
Pub/Sub サブスクリプションの名前。例:
|
mongoDBUri |
MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017 |
database |
コレクションを格納する MongoDB のデータベース。例: my-db 。 |
collection |
MongoDB データベース内のコレクションの名前。例: my-collection 。 |
deadletterTable |
エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name 。 |
javascriptTextTransformGcsPath |
(省略可)UDF 変換を行う JavaScript ファイルがある Cloud Storage の場所。例: gs://mybucket/filename.json 。 |
javascriptTextTransformFunctionName |
(省略可)JavaScript UDF の名前。例: transform 。 |
batchSize |
(省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000 。 |
batchSizeBytes |
(省略可)バッチサイズ(バイト単位)。デフォルト: 5242880 。 |
maxConnectionIdleTime |
(省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000 。 |
sslEnabled |
(省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true 。 |
ignoreSSLCertificate |
(省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true 。 |
withOrdered |
(省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true 。 |
withSSLInvalidHostNameAllowed |
(省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true 。 |
Pub/Sub to MongoDB テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから Pub/Sub to MongoDB template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 284.0.0 以降が必要です。
このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB
以下を置き換えます。
PROJECT_ID
: プロジェクト IDREGION_NAME
: Dataflow リージョン名(例:us-central1
)JOB_NAME
: 任意のジョブ名。INPUT_SUBSCRIPTION
: Pub/Sub サブスクリプション(例:
)projects/<project-id>/subscriptions/<subscription-name>
MONGODB_URI
: MongoDB サーバーのアドレス(例:192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: MongoDB データベースの名前(例:users
)COLLECTION
: MongoDB コレクションの名前(例:profiles
)UNPROCESSED_TABLE
: BigQuery テーブルの名前(例:your-project:your-dataset.your-table-name
)
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
API
REST API からの実行このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDLOCATION
: Dataflow リージョン名(例:us-central1
)JOB_NAME
: 任意のジョブ名。INPUT_SUBSCRIPTION
: Pub/Sub サブスクリプション(例:
)projects/<project-id>/subscriptions/<subscription-name>
MONGODB_URI
: MongoDB サーバーのアドレス(例:192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: MongoDB データベースの名前(例:users
)COLLECTION
: MongoDB コレクションの名前(例:profiles
)UNPROCESSED_TABLE
: BigQuery テーブルの名前(例:your-project:your-dataset.your-table-name
)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB", } }
Text Files on Cloud Storage to BigQuery (Stream)
Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)を使用して変換して、結果を BigQuery に出力するストリーミング パイプラインです。
このパイプラインの要件:
- 出力テーブルを記述する JSON 形式の BigQuery スキーマ ファイルを作成します。
{ 'fields': [{ 'name': 'location', 'type': 'STRING' }, { 'name': 'name', 'type': 'STRING' }, { 'name': 'age', 'type': 'STRING', }, { 'name': 'color', 'type': 'STRING' }, { 'name': 'coffee', 'type': 'STRING', 'mode': 'REQUIRED' }, { 'name': 'cost', 'type': 'NUMERIC', 'mode': 'REQUIRED' }] }
- JavaScript(
.js
)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
javascriptTextTransformGcsPath |
JavaScript UDF の Cloud Storage の場所。例: gs://my_bucket/my_function.js 。 |
JSONPath |
BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json 。 |
javascriptTextTransformFunctionName |
UDF として呼び出す JavaScript 関数の名前。例: transform 。 |
outputTable |
完全修飾された BigQuery テーブル。例: my-project:dataset.table |
inputFilePattern |
処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt 。 |
bigQueryLoadingTemporaryDirectory |
BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table 。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
Cloud Storage Text to BigQuery (Stream) テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Cloud Storage Text to BigQuery template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: UDF の名前PATH_TO_BIGQUERY_SCHEMA_JSON
: スキーマ定義を含む JSON ファイルへの Cloud Storage パスPATH_TO_JAVASCRIPT_UDF_FILE
: JavaScript コードを含む.js
ファイルへの Cloud Storage パスPATH_TO_TEXT_DATA
: テキスト データセットへの Cloud Storage パスBIGQUERY_TABLE
: BigQuery テーブル名BIGQUERY_UNPROCESSED_TABLE
: 未処理のメッセージ用の BigQuery テーブルの名前PATH_TO_TEMP_DIR_ON_GCS
: 一時ディレクトリへの Cloud Storage パス
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: UDF の名前PATH_TO_BIGQUERY_SCHEMA_JSON
: スキーマ定義を含む JSON ファイルへの Cloud Storage パスPATH_TO_JAVASCRIPT_UDF_FILE
: JavaScript コードを含む.js
ファイルへの Cloud Storage パスPATH_TO_TEXT_DATA
: テキスト データセットへの Cloud Storage パスBIGQUERY_TABLE
: BigQuery テーブル名BIGQUERY_UNPROCESSED_TABLE
: 未処理のメッセージ用の BigQuery テーブルの名前PATH_TO_TEMP_DIR_ON_GCS
: 一時ディレクトリへの Cloud Storage パス
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" } }
Text Files on Cloud Storage to Pub/Sub (Stream)
このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。
現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。
このパイプラインの要件:
- 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
- 実行前に Pub/Sub トピックが存在している必要があります。
- このパイプラインは無期限で実行されるため、手動で終了する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
読み込み元の入力ファイルのパターン。例: gs://bucket-name/files/*.json 、または gs://bucket-name/path/*.csv 。 |
outputTopic |
書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。 |
Text Files on Cloud Storage to Pub/Sub(Stream)テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Text Files on Cloud Storage to Pub/Sub (Stream) template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILE_PATTERN
: Cloud Storage バケットから読み取るファイル パターン glob(例:path/*.csv
)
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION\ --staging-location TEMP_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
PROJECT_ID
: プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILE_PATTERN
: Cloud Storage バケットから読み取るファイル パターン glob(例:path/*.csv
)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)
Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、匿名化されたデータを指定された BigQuery テーブルへ書き込むストリーミング パイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて、検査を必要としない構造化データを匿名化したりできます。
このパイプラインの要件:
- トークン化する入力データが存在している必要があります
- Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
- BigQuery データセットが存在している必要があります
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csv 、gs://mybucket/file-*.csv |
dlpProjectId |
Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。
例: my_dlp_api_project |
deidentifyTemplateName |
API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100 |
datasetName |
トークン化された結果を送信するための BigQuery データセット。 |
batchSize |
検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。 |
inspectTemplateName |
(省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100 |
Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: テンプレートを実行する gcloud
コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。
このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
以下を置き換えます。
TEMPLATE_PROJECT_ID
: テンプレートのプロジェクト IDDLP_API_PROJECT_ID
: Cloud DLP API プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_DATA
: 入力ファイルのパスDEIDENTIFY_TEMPLATE
: Cloud DLPDeidentify テンプレート番号DATASET_NAME
: BigQuery データセット名INSPECT_TEMPLATE_NUMBER
: Cloud DLPInspect テンプレート番号BATCH_SIZE_VALUE
: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputFilePattern=INPUT_DATA,\ datasetName=DATASET_NAME,\ batchSize=BATCH_SIZE_VALUE,\ dlpProjectId=DLP_API_PROJECT_ID,\ deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\ inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER
API
REST API からの実行このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
TEMPLATE_PROJECT_ID
: テンプレートのプロジェクト IDDLP_API_PROJECT_ID
: Cloud DLP API プロジェクト IDJOB_NAME
: 任意のジョブ名。REGION
: リージョン エンドポイント(例:us-west1
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_DATA
: 入力ファイルのパスDEIDENTIFY_TEMPLATE
: Cloud DLPDeidentify テンプレート番号DATASET_NAME
: BigQuery データセット名INSPECT_TEMPLATE_NUMBER
: Cloud DLPInspect テンプレート番号BATCH_SIZE_VALUE
: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern":INPUT_DATA, "datasetName": "DATASET_NAME", "batchSize": "BATCH_SIZE_VALUE", "dlpProjectId": "DLP_API_PROJECT_ID", "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE", "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER" } }
Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)
Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。
このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。
このパイプラインの要件:
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscriptions |
読み込まれる Pub/Sub 入力サブスクリプションのカンマ区切りのリスト。<subscription>,<subscription>, ... の形式で指定します。 |
changeLogDataset |
ステージング テーブルを格納する BigQuery データセット。<my-dataset> の形式で指定します。 |
replicaDataset |
レプリカ テーブルを格納する BigQuery データセットの場所。<my-dataset> の形式で指定します。 |
updateFrequencySecs |
(省略可)MySQL データベースを複製する BigQuery テーブルを、パイプラインが更新する間隔。 |
Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery テンプレートの実行
このテンプレートを実行するには、次の手順を実行します。
- ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
v2/cdc-parent
ディレクトリに移動します。- Debezium コネクタがデプロイされていることを確認します。
- Maven を使用して、Dataflow テンプレートを実行します。
次の値を置き換えます。
PROJECT_ID
: プロジェクト ID。YOUR_SUBSCRIPTIONS
: Pub/Sub サブスクリプション名のカンマ区切りのリスト。YOUR_CHANGELOG_DATASET
: 変更履歴データ用の BigQuery データセット。YOUR_REPLICA_DATASET
: レプリカ テーブル用の BigQuery データセット。
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=YOUR_SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=YOUR_CHANGELOG_DATASET \ --replicaDataset=YOUR_REPLICA_DATASET \ --project=PROJECT_ID"
Apache Kafka to BigQuery
Apache Kafka to BigQuery テンプレートは、Apache Kafka からテキストデータを取り込み、ユーザー定義関数(UDF)を実行して、結果のレコードを BigQuery に出力するストリーミング パイプラインです。データの変換、UDF の実行、出力テーブルへの挿入で発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。実行前にエラーテーブルが存在しない場合は、作成されます。
このパイプラインの要件
- 出力 BigQuery テーブルが存在している必要があります。
- Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
- Apache Kafka トピックが存在していて、有効な JSON 形式でメッセージがエンコードされている必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
outputTableSpec |
Apache Kafka メッセージを書き込む BigQuery 出力テーブルの場所。my-project:dataset.table の形式で指定します。 |
inputTopics |
読み取る Apache Kafka 入力トピック(カンマ区切りのリスト)。例: messages |
bootstrapServers |
実行中の Apache Kafka ブローカー サーバーのホストアドレス(カンマ区切りのリスト)。各ホストアドレスは 35.70.252.199:9092 の形式で指定します。 |
javascriptTextTransformGcsPath |
(省略可)JavaScript UDF への Cloud Storage ロケーション パス。例: gs://my_bucket/my_function.js |
javascriptTextTransformFunctionName |
(省略可)UDF として呼び出す JavaScript の名前。例: transform |
outputDeadletterTable |
(省略可)出力テーブルに到達できなかったメッセージの BigQuery テーブル。my-project:dataset.my-deadletter-table の形式で指定します。テーブルが存在していない場合、パイプラインの実行時に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
Apache Kafka to BigQuery テンプレートの実行
コンソール
Google Cloud Console からの実行- Cloud Console の [Dataflow] ページに移動します。 Dataflow ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから the Apache Kafka to BigQuery template を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。
- [ジョブを実行] をクリックします。
![Cloud Platform Console の [テンプレートからジョブを作成] ボタン](https://cloud.google.com/dataflow/images/console_run_job_button.png?hl=ja)
gcloud
gcloud
コマンドライン ツールからの実行
注: gcloud
コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 284.0.0 以降が必要です。
このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery
以下を置き換えます。
YOUR_PROJECT_ID
: テンプレートのプロジェクト IDJOB_NAME
: 任意のジョブ名。REGION_NAME
: Dataflow リージョン名(例:us-central1
)BIGQUERY_TABLE
: BigQuery テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする方法の手順に従ってください。PATH_TO_JAVASCRIPT_UDF_FILE
: JavaScript コードを含む.js
ファイルへの Cloud Storage パスYOUR_JAVASCRIPT_FUNCTION
: UDF の名前KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
gcloud beta dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
API
REST API からの実行このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。
gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery
REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。
以下を置き換えます。
YOUR_PROJECT_ID
: テンプレートのプロジェクト IDJOB_NAME
: 任意のジョブ名。LOCATION
: Dataflow リージョン名(例:us-central1
)BIGQUERY_TABLE
: BigQuery テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする方法の手順に従ってください。PATH_TO_JAVASCRIPT_UDF_FILE
: JavaScript コードを含む.js
ファイルへの Cloud Storage パスYOUR_JAVASCRIPT_FUNCTION
: UDF の名前KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery", } }