Google はオープンソースの Dataflow テンプレートを提供しています。
これらの Dataflow テンプレートは、データのインポート、エクスポート、バックアップ、復元、API の一括オペレーションなど、大規模なデータタスクの解決に役立ちます。専用の開発環境を使用しなくても、これらの処理を実行できます。テンプレートは Apache Beam 上に構築され、Dataflow を使用してデータを変換します。
テンプレートに関する一般的な情報については、Dataflow テンプレートをご覧ください。Google が提供するテンプレートのリストについては、Google 提供のテンプレートの概要をご覧ください。このガイドでは、ストリーミング テンプレートについて説明します。
Pub/Sub Subscription to BigQuery
Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。
このパイプラインの要件:
- Pub/Sub メッセージの
data
フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data
フィールドの値が{"k1":"v1", "k2":"v2"}
としてフォーマットされたメッセージは、文字列データ型のk1
とk2
という 2 つの列を持つ BigQuery テーブルに挿入できます。 - パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。 |
outputTableSpec |
BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。 |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、OUTPUT_TABLE_SPEC_error_records が代わりに使用されます。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
Pub/Sub Subscription to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Subscription to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
Pub/Sub Topic to BigQuery
Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。
このパイプラインの要件:
- Pub/Sub メッセージの
data
フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data
フィールドの値が{"k1":"v1", "k2":"v2"}
としてフォーマットされたメッセージは、文字列データ型のk1
とk2
という 2 つの列を持つ BigQuery テーブルに挿入できます。 - パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。 |
outputTableSpec |
BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。 |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
Pub/Sub Topic to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
Pub/Sub Avro to BigQuery
Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
このパイプラインの要件
- 入力 Pub/Sub サブスクリプションが存在していること。
- Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 未処理の Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
schemaPath |
Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc 。 |
inputSubscription |
読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription> |
outputTopic |
未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
outputTableSpec |
BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table> 。createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。 |
writeDisposition |
(省略可)BigQuery の WriteDisposition。例: WRITE_APPEND 、WRITE_EMPTY 、または WRITE_TRUNCATE 。デフォルト: WRITE_APPEND |
createDisposition |
(省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDED 、CREATE_NEVER 。デフォルト: CREATE_IF_NEEDED |
Pub/Sub Avro to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Avro to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
Pub/Sub Proto to BigQuery
Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
データを変換するための JavaScript ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。
このパイプラインの要件:
- 入力 Pub/Sub サブスクリプションが存在していること。
- Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 出力 Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
- BigQuery テーブルが存在する場合は、
createDisposition
値にかかわらず、proto データに一致するスキーマが必要です。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
protoSchemaPath |
自己完結型の proto スキーマ ファイルの Cloud Storage の場所。例: gs://path/to/my/file.pb このファイルは、protoc コマンドの --descriptor_set_out フラグを使用して生成できます。--include_imports フラグを使用すると、ファイルが自己完結することを保証できます。 |
fullMessageName |
proto メッセージの完全な名前。たとえば、package.name.MessageName の場合、package.name は、java_package ステートメントではなく、package ステートメントに指定された値です。 |
inputSubscription |
読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription> |
outputTopic |
未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
outputTableSpec |
BigQuery 出力テーブルの場所。例: my-project:my_dataset.my_table createDisposition の指定によっては、入力スキーマ ファイルを使用して出力テーブルが自動的に作成されます。 |
preserveProtoFieldNames |
(省略可)JSON で元の Proto フィールド名を保持するには true 。false : 標準の JSON 名を使用します。たとえば、false と指定すると field_name が fieldName に変更されます。(デフォルト: false ) |
bigQueryTableSchemaPath |
(省略可)BigQuery スキーマパスへの Cloud Storage パス。例: gs://path/to/my/schema.json 指定されていない場合、スキーマは Proto スキーマから推測されます。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
udfOutputTopic |
(省略可)UDF エラーを格納する Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> です。指定されていない場合、UDF エラーは outputTopic と同じトピックに送信されます。 |
writeDisposition |
(省略可)BigQuery WriteDisposition 。WRITE_APPEND 、WRITE_EMPTY 、WRITE_TRUNCATE など。デフォルト: WRITE_APPEND |
createDisposition |
(省略可)BigQuery CreateDisposition 。例: CREATE_IF_NEEDED 、CREATE_NEVER デフォルト: CREATE_IF_NEEDED |
Pub/Sub Proto to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Proto to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Proto スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: Proto メッセージ名(例:package.name.MessageName
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名UNPROCESSED_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Proto スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: Proto メッセージ名(例:package.name.MessageName
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名UNPROCESSED_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
Pub/Sub to Pub/Sub
Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。
このパイプラインの要件:
- 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
- コピー元の Pub/Sub サブスクリプションが pull サブスクリプションである必要があります。
- 実行前にコピー先の Pub/Sub トピックが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name> |
outputTopic |
書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
filterKey |
(省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。 |
filterValue |
(省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。 |
Pub/Sub to Pub/Sub テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Pub/Sub template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOPIC_NAME
: Pub/Sub トピック名FILTER_KEY
: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。FILTER_VALUE
: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOPIC_NAME
: Pub/Sub トピック名FILTER_KEY
: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。FILTER_VALUE
: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
Pub/Sub to Splunk
Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。このテンプレートの最も一般的な使用例は Splunk へのログのエクスポートです。基盤となるワークフローの例については、Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイするをご覧ください。
Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub の未処理のトピックに転送されます。
HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。
このパイプラインの要件:
- パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
- パイプラインを実行する前に、Pub/Sub に未処理のトピックが存在している必要があります。
- Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
- Splunk の HEC トークンが生成済みで、利用可能である必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name> |
token |
(省略可)Splunk の HEC 認証トークン。tokenSource が PLAINTEXT または KMS に設定されている場合は必須です。 |
url |
Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。 |
outputDeadletterTopic |
配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
batchCount |
(省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。 |
parallelism |
(省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。 |
disableCertificateValidation |
(省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。true の場合、証明書は検証されず(すべての証明書が信頼されます)、rootCaCertificatePath パラメータは無視されます。 |
includePubsubMessage |
(省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。 |
tokenSource |
トークンのソース。PLAINTEXT、KMS、SECRET_MANAGER のいずれかです。Secret Manager を使用する場合は、このパラメータを指定する必要があります。tokenSource が KMS に設定されている場合、tokenKMSEncryptionKey と暗号化された token を指定する必要があります。tokenSource を SECRET_MANAGER に設定する場合は、tokenSecretId を指定する必要があります。tokenSource が PLAINTEXT に設定されている場合は、token を指定する必要があります。 |
tokenKMSEncryptionKey |
(省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。tokenSource が KMS に設定されている場合、このパラメータを指定する必要があります。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります。 |
tokenSecretId |
(省略可)トークンの Secret Manager シークレット ID。tokenSource が SECRET_MANAGER に設定されている場合、このパラメータを指定する必要があります。projects/<project-id>/secrets/<secret-name>/versions/<secret-version> 形式で指定する必要があります。 |
rootCaCertificatePath |
(省略可)Cloud Storage のルート CA 証明書の完全 URL。例: gs://mybucket/mycerts/privateCA.crt Cloud Storage で提供される証明書は DER でエンコードする必要があります。この証明書はバイナリまたは印刷可能(Base64)エンコードで提供できます。証明書を Base64 エンコードで提供する場合は、証明書を -----BEGIN CERTIFICATE----- と -----END CERTIFICATE----- で囲む必要があります。このパラメータを指定すると、このプライベート CA 証明書ファイルが取得され、Splunk HEC エンドポイントの SSL 証明書の検証用に Dataflow ワーカーのトラストストアに追加されます。このパラメータを指定しない場合は、デフォルトのトラストストアが使用されます。 |
enableBatchLogs |
(省略可)Splunk に書き込まれたバッチに対してログを有効にするかどうかを指定します。デフォルト: true |
enableGzipHttpCompression |
(省略可)Splunk HEC に送信される HTTP リクエストを圧縮するかどうか(gzip コンテンツをエンコードするかどうか)を指定します。デフォルト: true |
Pub/Sub to Splunk テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Splunk template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION,\ rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOKEN
: Splunk の Http Event Collector トークンURL
: Splunk の Http Event Collector の URL パス(例:https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: Pub/Sub トピック名JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
: Splunk に複数のイベントを送信するために使用するバッチサイズPARALLELISM
: Splunk にイベントを送信するために使用する並列リクエストの数DISABLE_VALIDATION
: SSL 証明書の検証を無効にする場合、true
ROOT_CA_CERTIFICATE_PATH
: Cloud Storage のルート CA 証明書へのパス(例:gs://your-bucket/privateCA.crt
)。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOKEN
: Splunk の Http Event Collector トークンURL
: Splunk の Http Event Collector の URL パス(例:https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: Pub/Sub トピック名JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
: Splunk に複数のイベントを送信するために使用するバッチサイズPARALLELISM
: Splunk にイベントを送信するために使用する並列リクエストの数DISABLE_VALIDATION
: SSL 証明書の検証を無効にする場合、true
ROOT_CA_CERTIFICATE_PATH
: Cloud Storage のルート CA 証明書へのパス(例:gs://your-bucket/privateCA.crt
)。
Pub/Sub to Avro Files on Cloud Storage
Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。
このパイプラインの要件:
- パイプラインの実行前に入力 Pub/Sub トピックが存在すること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
メッセージを使用するために購読する Cloud Pub/Sub トピック。トピック名は、projects/<project-id>/topics/<topic-name> の形式にする必要があります。 |
outputDirectory |
出力 Avro ファイルがアーカイブされる出力ディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/ |
avroTempDirectory |
一時的な Avro ファイル用のディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/ |
outputFilenamePrefix |
(省略可)Avro ファイルの出力ファイル名接頭辞。 |
outputFilenameSuffix |
(省略可)Avro ファイルの出力ファイル名接尾辞。 |
outputShardTemplate |
(省略可)出力ファイルのシャード テンプレート。文字 S や N の繰り返しシーケンスで指定します。例: SSS-NNN これらはそれぞれシャード番号やシャードの総数に置き換えられます。このパラメータを指定しない場合、デフォルトのテンプレートの形式は W-P-SS-of-NN です。 |
Pub/Sub to Cloud Storage Avro テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Avro Files on Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ avroTempDirectory=gs://BUCKET_NAME/temp/
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILENAME_PREFIX
: 使用したい出力ファイル名接頭辞FILENAME_SUFFIX
: 使用したい出力ファイル名接尾辞SHARD_TEMPLATE
: 使用したい出力シャード テンプレート
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILENAME_PREFIX
: 使用したい出力ファイル名接頭辞FILENAME_SUFFIX
: 使用したい出力ファイル名接尾辞SHARD_TEMPLATE
: 使用したい出力シャード テンプレート
Pub/Sub Topic to Text Files on Cloud Storage
Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub トピックからレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。
このパイプラインの要件:
- 実行前に Pub/Sub トピックが存在している必要があります。
- トピックに公開するメッセージは、テキスト形式となる必要があります。
- トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 |
outputDirectory |
出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/ この値は「/」で終わる必要があります。 |
outputFilenamePrefix |
ウィンドウ処理されたファイルの接頭辞。例: output- |
outputFilenameSuffix |
ウィンドウ処理されたファイルの接尾辞。通常は、.txt や .csv などの拡張子です。 |
outputShardTemplate |
シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplate は W-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplate の SS-of-NN 部分は 00-of-01 になります。 |
Pub/Sub Topic to Text Files on Cloud Storage テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Text Files on Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
Pub/Sub Topic or Subscription to Text Files on Cloud Storage
Pub/Sub Topic or Subscription to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。
このパイプラインの要件:
- 実行前に Pub/Sub トピックまたはサブスクリプションが存在している必要があります。
- トピックに公開するメッセージは、テキスト形式となる必要があります。
- トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 このパラメータが指定されている場合は、inputSubscription を指定しないでください。 |
inputSubscription |
読み取り元の入力 Pub/Sub サブスクリプション。サブスクリプション名は projects/<project-id>/subscription/<subscription-name> の形式にする必要があります。このパラメータが指定されている場合は、inputTopic を指定しないでください。 |
outputDirectory |
出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/ この値は「/」で終わる必要があります。 |
outputFilenamePrefix |
ウィンドウ処理されたファイルの接頭辞。例: output- |
outputFilenameSuffix |
ウィンドウ処理されたファイルの接尾辞。通常は、.txt や .csv などの拡張子です。 |
outputShardTemplate |
シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplate は W-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplate の SS-of-NN 部分は 00-of-01 になります。 |
windowDuration |
(省略可)ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。 |
Pub/Sub Topic or Subscription to Text Files on Cloud Storage テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template jobs run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名BUCKET_NAME
: Cloud Storage バケットの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名BUCKET_NAME
: Cloud Storage バケットの名前
Pub/Sub to MongoDB
Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。
このパイプラインの要件:
- Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
- MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
Pub/Sub サブスクリプションの名前。例:
|
mongoDBUri |
MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017 |
database |
コレクションを格納する MongoDB のデータベース。例: my-db 。 |
collection |
MongoDB データベース内のコレクションの名前。例: my-collection 。 |
deadletterTable |
エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name 。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
batchSize |
(省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000 |
batchSizeBytes |
(省略可)バッチサイズ(バイト単位)。デフォルト: 5242880 |
maxConnectionIdleTime |
(省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000 |
sslEnabled |
(省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true |
ignoreSSLCertificate |
(省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true |
withOrdered |
(省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true |
withSSLInvalidHostNameAllowed |
(省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true |
Pub/Sub to MongoDB テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to MongoDB template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDREGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
INPUT_SUBSCRIPTION
: Pub/Sub サブスクリプション(例:
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: MongoDB サーバーのアドレス(例:192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: MongoDB データベースの名前(例:users
)COLLECTION
: MongoDB コレクションの名前(例:profiles
)UNPROCESSED_TABLE
: BigQuery テーブルの名前(例:your-project:your-dataset.your-table-name
)
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDLOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
INPUT_SUBSCRIPTION
: Pub/Sub サブスクリプション(例:
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: MongoDB サーバーのアドレス(例:192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: MongoDB データベースの名前(例:users
)COLLECTION
: MongoDB コレクションの名前(例:profiles
)UNPROCESSED_TABLE
: BigQuery テーブルの名前(例:your-project:your-dataset.your-table-name
)
Pub/Sub to Elasticsearch
Pub/Sub to Elasticsearch テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、ユーザー定義関数(UDF)を実行して、それらをドキュメントとして Elasticsearch に書き込むストリーミング パイプラインです。Dataflow テンプレートは、Elasticsearch のデータ ストリーム機能を使用して、複数のインデックスにまたがる時系列データを保存し、リクエストに対して単一の名前付きリソースを提供します。データ ストリームは、ログ、指標、トレースに適しています。また、継続的に生成され、Pub/Sub に保存されるデータにも適しています。
このパイプラインの要件
- 参照元の Pub/Sub サブスクリプションが存在し、メッセージが有効な JSON 形式でエンコードされている必要があります。
- GCP インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上で一般公開されている到達可能な Elasticsearch ホスト。詳細については、Elastic のための Google Cloud 統合をご覧ください。
- エラー出力用の Pub/Sub トピック。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
使用する Pub/Sub サブスクリプション。名前は projects/<project-id>/subscriptions/<subscription-name> の形式にします。 |
connectionUrl |
https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。 |
apiKey |
認証に使用される Base64 エンコードの API キー。 |
errorOutputTopic |
失敗したレコードを公開するための Pub/Sub 出力トピック。projects/<project-id>/topics/<topic-name> の形式にします。 |
dataset |
(省略可)Pub/Sub 経由で送信されるログのタイプ。すぐに使えるダッシュボードが用意されています。既知のログタイプ値は、audit、vpcflow、firewall です。デフォルト: pubsub |
namespace |
(省略可)環境(開発、生産、QA)、チーム、戦略事業部門などの任意のグループ。デフォルト: default |
batchSize |
(省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000 |
batchSizeBytes |
(省略可)バッチサイズ(バイト数)。デフォルト: 5242880 (5 MB)。 |
maxRetryAttempts |
(省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: no retries |
maxRetryDuration |
(省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: no retries |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
propertyAsIndex |
(省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none |
propertyAsId |
(省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none |
javaScriptIndexFnGcsPath |
(省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none |
javaScriptIndexFnName |
(省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none |
javaScriptIdFnGcsPath |
(省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none |
javaScriptIdFnName |
(省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none |
javaScriptTypeFnGcsPath |
(省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none |
javaScriptTypeFnName |
(省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none |
javaScriptIsDeleteFnGcsPath |
(省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none |
javaScriptIsDeleteFnName |
(省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none |
usePartialUpdate |
(省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false |
bulkInsertMethod |
(省略可)INDEX (インデックス、upserts を許可する)または CREATE (作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE |
Pub/Sub to Elasticsearch テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Elasticsearch template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
ERROR_OUTPUT_TOPIC
: エラー出力用の Pub/Sub トピック。SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名CONNECTION_URL
: Elasticsearch の URLDATASET
: ログタイプNAMESPACE
: データセットの名前空間APIKEY
: 認証用に Base64 でエンコードされた API キー
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
ERROR_OUTPUT_TOPIC
: エラー出力用の Pub/Sub トピック。SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名CONNECTION_URL
: Elasticsearch の URLDATASET
: ログタイプNAMESPACE
: データセットの名前空間APIKEY
: 認証用に Base64 でエンコードされた API キー
Datastream to Cloud Spanner
Datastream to Cloud Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Cloud Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Cloud Spanner へのデータ移行を目的としています。
テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Cloud Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Cloud Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Cloud Spanner データベースに伝播されません。
データの整合性が保証されるのは、すべてのデータが Cloud Spanner に書き込まれ、移行が終了したときだけです。Cloud Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Cloud Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。
オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。
このパイプラインの要件:
- ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
- Datastream イベントが複製される Cloud Storage バケット。
- 既存のテーブルを持つ Cloud Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
複製する Cloud Storage 内の Datastream ファイルの場所。通常、これはストリームのルートパスです。 |
streamName |
スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。 |
instanceId |
変更が複製される Cloud Spanner インスタンス。 |
databaseId |
変更が複製される Cloud Spanner データベース。 |
projectId |
Cloud Spanner プロジェクト ID。 |
deadLetterQueueDirectory |
(省略可)エラーキューの出力を保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。 |
inputFileFormat |
(省略可)Datastream によって生成された出力ファイルの形式(例: avro,json )。デフォルトは avro です。 |
shadowTablePrefix |
(省略可)シャドウ テーブルの名前に使用される接頭辞。デフォルト: shadow_ 。 |
Datastream to Cloud Spanner テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Cloud Datastream to Spanner template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ streamName=STREAM_NAME,\ instanceId=CLOUDSPANNER_INSTANCE,\ databaseId=CLOUDSPANNER_DATABASE,\ deadLetterQueueDirectory=DLQ
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
GCS_FILE_PATH
: Datastream イベントの保存に使用される Cloud Storage パス。例:gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
: Cloud Spanner インスタンス。CLOUDSPANNER_DATABASE
: Cloud Spanner データベース。DLQ
: エラーキュー ディレクトリの Cloud Storage パス。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
GCS_FILE_PATH
: Datastream イベントの保存に使用される Cloud Storage パス。例:gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
: Cloud Spanner インスタンス。CLOUDSPANNER_DATABASE
: Cloud Spanner データベース。DLQ
: エラーキュー ディレクトリの Cloud Storage パス。
Text Files on Cloud Storage to BigQuery(Stream)
Text Files on Cloud Storage to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをストリーミングし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)で変換して、結果を BigQuery に追加するストリーミング パイプラインです。
パイプラインは無期限に実行され、ドレインではなくキャンセルによって手動で終了させる必要があります。これは、分割可能な DoFn
で、ドレインをサポートしていない Watch
変換を使用しているためです。
このパイプラインの要件:
- BigQuery で出力テーブルのスキーマを記述する JSON ファイルを作成します。
fields
というタイトルになっているトップレベルの JSON 配列があり、その内容が{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
のパターンに従っていることを確認します。例:{ "fields": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING", "mode": "REQUIRED" }, { "name": "coffee", "type": "STRING", "mode": "REQUIRED" } ] }
- JavaScript(
.js
)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
javascriptTextTransformGcsPath |
:
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
JSONPath |
BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json 。 |
javascriptTextTransformFunctionName |
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
outputTable |
完全修飾された BigQuery テーブル。例: my-project:dataset.table |
inputFilePattern |
処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt 。 |
bigQueryLoadingTemporaryDirectory |
BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージが記載されたテーブル。例: my-project:dataset.my-unprocessed-table 存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
Cloud Storage Text to BigQuery (Stream) テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。PATH_TO_BIGQUERY_SCHEMA_JSON
: スキーマ定義を含む JSON ファイルへの Cloud Storage パスPATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: テキスト データセットへの Cloud Storage パスBIGQUERY_TABLE
: BigQuery テーブル名BIGQUERY_UNPROCESSED_TABLE
: 未処理のメッセージ用の BigQuery テーブルの名前PATH_TO_TEMP_DIR_ON_GCS
: 一時ディレクトリへの Cloud Storage パス
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。PATH_TO_BIGQUERY_SCHEMA_JSON
: スキーマ定義を含む JSON ファイルへの Cloud Storage パスPATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: テキスト データセットへの Cloud Storage パスBIGQUERY_TABLE
: BigQuery テーブル名BIGQUERY_UNPROCESSED_TABLE
: 未処理のメッセージ用の BigQuery テーブルの名前PATH_TO_TEMP_DIR_ON_GCS
: 一時ディレクトリへの Cloud Storage パス
Text Files on Cloud Storage to Pub/Sub(Stream)
このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。
パイプラインは無期限に実行され、「ドレイン」ではなく「キャンセル」によって手動で終了させる必要があります。これは「Watch」変換を使用しているためで、この変換は「SplittableDoFn」であり、ドレインはサポートしていません。
現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。
このパイプラインの要件:
- 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
- 実行前に Pub/Sub トピックが存在している必要があります。
- このパイプラインは無期限で実行されるため、手動で終了する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
読み込み元の入力ファイルのパターン。たとえば、gs://bucket-name/files/*.json や gs://bucket-name/path/*.csv です。 |
outputTopic |
書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。 |
Text Files on Cloud Storage to Pub/Sub(Stream)テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Pub/Sub (Stream) template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント(例:us-central1
)STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILE_PATTERN
: Cloud Storage バケットから読み取るファイル パターン glob(例:path/*.csv
)
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント(例:us-central1
)STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILE_PATTERN
: Cloud Storage バケットから読み取るファイル パターン glob(例:path/*.csv
)
Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)
Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、匿名化されたデータを指定された BigQuery テーブルへ書き込むストリーミング パイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて検査が不要な構造化データを匿名化できます。また、このテンプレートでは、匿名化テンプレートの場所用のリージョンパスはサポートされていません。グローバルパスのみがサポートされています。
このパイプラインの要件:
- トークン化する入力データが存在している必要があります
- Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
- BigQuery データセットが存在している必要があります
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csv 、gs://mybucket/file-*.csv |
dlpProjectId |
Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。
例: my_dlp_api_project |
deidentifyTemplateName |
API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100 |
datasetName |
トークン化された結果を送信するための BigQuery データセット。 |
batchSize |
検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。 |
inspectTemplateName |
(省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100 |
Data Masking/Tokenization from Cloud Storage to BigQuery(using Cloud DLP)テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputFilePattern=INPUT_DATA,\ datasetName=DATASET_NAME,\ batchSize=BATCH_SIZE_VALUE,\ dlpProjectId=DLP_API_PROJECT_ID,\ deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\ inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER
次のように置き換えます。
DLP_API_PROJECT_ID
: Cloud DLP API プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_DATA
: 入力ファイルのパスDEIDENTIFY_TEMPLATE
: Cloud DLPDeidentify テンプレート番号DATASET_NAME
: BigQuery データセット名INSPECT_TEMPLATE_NUMBER
: Cloud DLPInspect テンプレート番号BATCH_SIZE_VALUE
: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern":INPUT_DATA, "datasetName": "DATASET_NAME", "batchSize": "BATCH_SIZE_VALUE", "dlpProjectId": "DLP_API_PROJECT_ID", "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE", "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDDLP_API_PROJECT_ID
: Cloud DLP API プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_DATA
: 入力ファイルのパスDEIDENTIFY_TEMPLATE
: Cloud DLPDeidentify テンプレート番号DATASET_NAME
: BigQuery データセット名INSPECT_TEMPLATE_NUMBER
: Cloud DLPInspect テンプレート番号BATCH_SIZE_VALUE
: バッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)
Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)
Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。
このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。
このパイプラインの要件:
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscriptions |
読み込まれる Pub/Sub 入力サブスクリプションのカンマ区切りのリスト。<subscription>,<subscription>, ... の形式で指定します。 |
changeLogDataset |
ステージング テーブルを格納する BigQuery データセット。<my-dataset> の形式で指定します。 |
replicaDataset |
レプリカ テーブルを格納する BigQuery データセットの場所。<my-dataset> の形式で指定します。 |
updateFrequencySecs |
(省略可)MySQL データベースを複製する BigQuery テーブルを、パイプラインが更新する間隔。 |
Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery テンプレートの実行
このテンプレートを実行するには、次の手順を行います。
- ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
v2/cdc-parent
ディレクトリに移動します。- Debezium コネクタがデプロイされていることを確認します。
- Maven を使用して、Dataflow テンプレートを実行します。
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDSUBSCRIPTIONS
: Pub/Sub サブスクリプション名のカンマ区切りリスト。CHANGELOG_DATASET
: 変更履歴データの BigQuery データセット。REPLICA_DATASET
: レプリカ テーブルの BigQuery データセット。
Apache Kafka to BigQuery
Apache Kafka to BigQuery テンプレートは、Apache Kafka からテキストデータを取り込み、ユーザー定義関数(UDF)を実行して、結果のレコードを BigQuery に出力するストリーミング パイプラインです。データの変換、UDF の実行、出力テーブルへの挿入で発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。実行前にエラーテーブルが存在しない場合は、作成されます。
このパイプラインの要件
- 出力 BigQuery テーブルが存在している必要があります。
- Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
- Apache Kafka トピックが存在していて、有効な JSON 形式でメッセージがエンコードされている必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
outputTableSpec |
Apache Kafka メッセージを書き込む BigQuery 出力テーブルの場所。my-project:dataset.table の形式で指定します。 |
inputTopics |
読み取る Apache Kafka 入力トピック(カンマ区切りのリスト)。例: messages |
bootstrapServers |
実行中の Apache Kafka ブローカー サーバーのホストアドレス(カンマ区切りのリスト)。各ホストアドレスは 35.70.252.199:9092 の形式で指定します。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
outputDeadletterTable |
(省略可)出力テーブルに到達できなかったメッセージの BigQuery テーブル。my-project:dataset.my-deadletter-table の形式で指定します。テーブルが存在していない場合、パイプラインの実行時に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
Apache Kafka to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Kafka to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
BIGQUERY_TABLE
: BigQuery テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、こちらの手順でカンマをエスケープしてください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に従ってください。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
BIGQUERY_TABLE
: BigQuery テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、こちらの手順でカンマをエスケープしてください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーがアクセスできるポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする方法の手順に沿って操作してください。
詳細については、Dataflow で Kafka から BigQuery にデータを書き込むをご覧ください。
Datastream to BigQuery(Stream)
Datastream to BigQuery テンプレートは、Datastream データを読み取り、BigQuery に複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、時間でパーティション分割された BigQuery ステージング テーブルに複製します。レプリケーションに続いて、このテンプレートは BigQuery で MERGE
を実行し、すべての変更データ キャプチャ(CDC)の変更をソーステーブルのレプリカに upsert します。
このテンプレートは、レプリケーションによって管理される BigQuery テーブルの作成と更新を処理します。データ定義言語(DDL)が必要な場合、Datastream に対するコールバックによってソーステーブル スキーマが抽出され、BigQuery のデータ型に変換されます。サポートされているオペレーションは次のとおりです。
- データが挿入されると、新しいテーブルが作成される。
- null の初期値を持つ BigQuery テーブルに新しい列が追加される。
- ドロップされた列は BigQuery で無視され、将来の値が null になる。
- 名前が変更された列が BigQuery に新しい列として追加される。
- 型の変更が BigQuery に伝播されない。
このパイプラインの要件:
- データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
- Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
- BigQuery の宛先データセットが作成され、Compute Engine サービス アカウントにこれらのデータセットへの管理者権限が付与されている。
- 作成される宛先レプリカ テーブルでは、ソーステーブルに主キーが必要です。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。 |
gcsPubSubSubscription |
Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id |
inputFileFormat |
Datastream によって生成された出力ファイルの形式(例: avro,json )。デフォルトは avro です。 |
outputStagingDatasetTemplate |
ステージング テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset}_log )。 |
outputDatasetTemplate |
レプリカ テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset} )。 |
deadLetterQueueDirectory |
処理されなかった理由とともに、未処理のメッセージを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。 |
outputStagingTableNameTemplate |
(省略可)ステージング テーブルの名前のテンプレート。デフォルトは {_metadata_table}_log です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table}_log をおすすめします。 |
outputTableNameTemplate |
(省略可)レプリカ テーブルの名前のテンプレート。デフォルトは {_metadata_table} です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table} をおすすめします。 |
outputProjectId |
(省略可)データを出力する BigQuery データセットのプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。 |
streamName |
(省略可)スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。 |
mergeFrequencyMinutes |
(省略可)特定のテーブルのマージ間隔の分数。デフォルトは 5 です。 |
dlqRetryMinutes |
(省略可)デッドレター キュー(DLQ)の再試行間隔の分数。デフォルトは 10 です。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
Datastream to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Datastream to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: Datastream データへの Cloud Storage パス。例:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例:projects/my-project-id/subscriptions/my-subscription-id
BIGQUERY_DATASET
: BigQuery データセット名。BIGQUERY_TABLE
: BigQuery テーブル テンプレート。例:{_metadata_schema}_{_metadata_table}_log
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: Datastream データへの Cloud Storage パス。例:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例:projects/my-project-id/subscriptions/my-subscription-id
BIGQUERY_DATASET
: BigQuery データセット名。BIGQUERY_TABLE
: BigQuery テーブル テンプレート。例:{_metadata_schema}_{_metadata_table}_log
Datastream to MySQL or PostgreSQL(ストリーミング)
Datastream to SQL テンプレートは、Datastream データを読み取り、そのデータを MySQL データベースまたは PostgreSQL データベースに複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、このデータを SQL レプリカ テーブルに複製します。
このテンプレートはデータ定義言語(DDL)をサポートしていないため、すべてのテーブルがすでにデータベースに存在することを想定しています。レプリケーションでは、Dataflow ステートフル変換を使用して最新でないデータをフィルタし、順不同データの整合性を確保します。たとえば、ある行のより新しいバージョンの読み取りがすでに行われている場合、その行の遅れて到着したバージョンは無視されます。データ操作言語(DML)を実行することは、ソースデータとターゲット データを完全に複製するための最善の方法です。実行される DML ステートメントは、次のルールに従います。
- 主キーが存在する場合、insert と update のオペレーションでは upsert 構文を使用します(例:
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
)。 - 主キーが存在する場合、削除 DML として delete が複製されます。
- 主キーが存在しない場合、insert と update の両方のオペレーションがテーブルに挿入されます。
- 主キーが存在しない場合、delete は無視されます。
Oracle から Postgres へのユーティリティを使用する場合で、主キーが存在しない場合は、SQL に ROWID
を主キーとして追加します。
このパイプラインの要件:
- データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
- Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
- PostgreSQL データベースが必要なスキーマでシードされている。
- Dataflow ワーカーと PostgreSQL 間のネットワーク アクセスが設定されている。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputFilePattern |
複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。 |
gcsPubSubSubscription |
Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id |
inputFileFormat |
Datastream によって生成された出力ファイルの形式(例: avro,json )。デフォルトは avro です。 |
databaseHost |
接続先の SQL ホスト。 |
databaseUser |
レプリケーションのすべてのテーブルへの書き込みに必要なすべての権限を持つ SQL ユーザー。 |
databasePassword |
特定の SQL ユーザーのパスワード。 |
databasePort |
(省略可)接続する SQL データベース ポート。デフォルトは 5432 です。 |
databaseName |
(省略可)接続する SQL データベースの名前。デフォルトは postgres です。 |
streamName |
(省略可)スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。 |
Datastream to SQL テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Cloud Datastream to SQL template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: Datastream データへの Cloud Storage パス。例:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例:projects/my-project-id/subscriptions/my-subscription-id
DATABASE_HOST
: SQL ホスト IP。DATABASE_USER
: SQL ユーザー。DATABASE_PASSWORD
: SQL パスワード。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: Datastream データへの Cloud Storage パス。例:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例:projects/my-project-id/subscriptions/my-subscription-id
DATABASE_HOST
: SQL ホスト IP。DATABASE_USER
: SQL ユーザー。DATABASE_PASSWORD
: SQL パスワード。
Pub/Sub to Java Database Connectivity(JDBC)
Pub/Sub to Java Database Connectivity(JDBC)テンプレートは、既存の Cloud Pub/Sub サブスクリプションから JSON 文字列としてデータを取り込み、結果のレコードを JDBC に書き込むストリーミング パイプラインです。
このパイプラインの要件:
- パイプラインを実行する前に Cloud Pub/Sub サブスクリプションが存在している必要があります。
- パイプラインを実行する前に JDBC ソースが存在している必要があります。
- パイプラインを実行する前に Cloud Pub/Sub 出力デッドレター トピックが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
driverClassName |
JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver |
connectionUrl |
JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledb Base64 でエンコードされ、Cloud KMS 鍵で暗号化される文字列として渡すことができます。 |
driverJars |
JDBC ドライバのカンマ区切りの Cloud Storage パス。例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar |
username |
(省略可)JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。 |
password |
(省略可)JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。 |
connectionProperties |
(省略可)JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8 |
statement |
データベースに対して実行するステートメント。このステートメントには、テーブルの列名を任意の順序で指定する必要があります。指定した列名の値のみが JSON から読み取られ、ステートメントに追加されます。例: INSERT INTO tableName (column1, column2) VALUES (?,?) |
inputSubscription |
読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。 |
outputDeadletterTopic |
配信不能メッセージを転送するための Pub/Sub トピック。例:projects/<project-id>/topics/<topic-name> |
KMSEncryptionKey |
(省略可)ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。 |
extraFilesToStage |
ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> |
Pub/Sub to Java Database Connectivity(JDBC)テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to JDBC template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \ --region REGION_NAME \ --parameters \ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ driverJars=DRIVER_PATHS,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ connectionProperties=CONNECTION_PROPERTIES,\ statement=SQL_STATEMENT,\ inputSubscription=INPUT_SUBSCRIPTION,\ outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
DRIVER_CLASS_NAME
: ドライバのクラス名JDBC_CONNECTION_URL
: JDBC 接続 URLDRIVER_PATHS
: カンマで区切った JDBC ドライバの Cloud Storage パスCONNECTION_USERNAME
: JDBC 接続のユーザー名CONNECTION_PASSWORD
: JDBC 接続パスワードCONNECTION_PROPERTIES
: JDBC 接続プロパティ(必要に応じて)SQL_STATEMENT
: データベースに対して実行される SQL ステートメントINPUT_SUBSCRIPTION
: 読み取り元の Pub/Sub 入力サブスクリプションOUTPUT_DEADLETTER_TOPIC
: 配信不能メッセージを転送するための Pub/Sub トピックKMS_ENCRYPTION_KEY
: Cloud KMS 暗号鍵
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc { "jobName": "JOB_NAME", "parameters": { "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "driverJars": "DRIVER_PATHS", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "connectionProperties": "CONNECTION_PROPERTIES", "statement": "SQL_STATEMENT", "inputSubscription": "INPUT_SUBSCRIPTION", "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" }, }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
DRIVER_CLASS_NAME
: ドライバのクラス名JDBC_CONNECTION_URL
: JDBC 接続 URLDRIVER_PATHS
: カンマで区切った JDBC ドライバの Cloud Storage パスCONNECTION_USERNAME
: JDBC 接続のユーザー名CONNECTION_PASSWORD
: JDBC 接続パスワードCONNECTION_PROPERTIES
: JDBC 接続プロパティ(必要に応じて)SQL_STATEMENT
: データベースに対して実行される SQL ステートメントINPUT_SUBSCRIPTION
: 読み取り元の Pub/Sub 入力サブスクリプションOUTPUT_DEADLETTER_TOPIC
: 配信不能メッセージを転送するための Pub/Sub トピックKMS_ENCRYPTION_KEY
: Cloud KMS 暗号鍵
Cloud Spanner change streams to Cloud Storage
Cloud Spanner change streams to Cloud Storage テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Cloud Storage バケットに書き込むストリーミング パイプラインです。
パイプラインは、タイムスタンプに基づいて Spanner の変更ストリーム レコードをウィンドウにグループ化します。各ウィンドウは、このテンプレートで構成できる期間を表します。タイムスタンプがウィンドウに属するすべてのレコードが、ウィンドウ内に存在することが保証されます。遅延は発生しません。複数の出力シャードを定義することもできます。パイプラインはシャードごと、ウィンドウごとに 1 つの Cloud Storage 出力ファイルを作成します。出力ファイル内では、レコードは順序付けされていません。出力ファイルは、ユーザーの構成に応じて JSON 形式または AVRO 形式で記述できます。
Cloud Spanner インスタンスまたは Cloud Storage バケットと同じリージョンから Dataflow ジョブを実行することで、ネットワークのレイテンシやネットワーク転送の費用を最小限に抑えることができます。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。Dataflow リージョン エンドポイントの詳細をご覧ください。
変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法とベスト プラクティスをご覧ください。
このパイプラインの要件:
- パイプラインの実行前に、Cloud Spanner インスタンスが存在している。
- パイプラインの実行前に、Cloud Spanner データベースが存在している。
- パイプラインの実行前に、Cloud Spanner メタデータ インスタンスが存在している。
- パイプラインの実行前に、Cloud Spanner メタデータ データベースが存在している。
- パイプラインの実行前に、Cloud Spanner の変更ストリームが存在している。
- パイプラインの実行前に、Cloud Storage 出力バケットが存在している。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
spannerInstanceId |
変更ストリーム データの読み取り元の Cloud Spanner インスタンス ID。 |
spannerDatabase |
変更ストリーム データの読み取り元の Cloud Spanner データベース。 |
spannerMetadataInstanceId |
変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner インスタンス ID。 |
spannerMetadataDatabase |
変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner データベース。 |
spannerChangeStreamName |
読み取り元の Cloud Spanner 変更ストリームの名前。 |
gcsOutputDirectory |
変更ストリームのファイルの場所は、Cloud Storage に「gs://${BUCKET}/${ROOT_PATH}/」の形式で出力されます。 |
outputFilenamePrefix |
(省略可)書き込み先ファイルのファイル名の接頭辞。ファイルの接頭辞はデフォルトで「output」に設定されています。 |
spannerProjectId |
(省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。 |
startTimestamp |
(省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。 |
endTimestamp |
(省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、現在よりも先の無限の時間です。 |
outputFileFormat |
(省略可)出力 Cloud Storage ファイルの形式。使用可能な形式は TEXT、AVRO です。デフォルトは AVRO です。 |
windowDuration |
(省略可)ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。 |
rpcPriority |
(省略可)Cloud Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高) |
numShards |
(省略可)書き込み時に生成される出力シャードの最大数。デフォルト値は 20 です。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。 |
spannerMetadataTableName |
(省略可)使用する Cloud Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フロー中に Cloud Spanner 変更ストリームのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は指定しないでください。 |
Cloud Spanner change streams to Cloud Storage テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to Google Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ gcsOutputDirectory=GCS_OUTPUT_DIRECTORY
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームGCS_OUTPUT_DIRECTORY
: 変更ストリームの出力用のファイルの場所
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームGCS_OUTPUT_DIRECTORY
: 変更ストリームの出力用のファイルの場所
Cloud Spanner change streams to BigQuery
Cloud Spanner change streams to BigQuery テンプレートは、Cloud Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して BigQuery テーブルに書き込むストリーミング パイプラインです。
必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、Cloud Spanner テーブルの対応する追跡対象列と、「ignoreFields」オプションによって明示的に無視されない追加のメタデータ列(以下のリストのメタデータ フィールドの説明を参照)が含まれている必要があります。新しい各 BigQuery 行には、変更レコードのタイムスタンプで、Cloud Spanner テーブルの対応する行から変更ストリームが監視するすべての列が含まれます。
変更ストリームの監視対象列はすべて、Cloud Spanner トランザクションによって変更されたかどうかにかかわらず、BigQuery の各テーブル行に含まれます。監視されていない列は BigQuery 行に含まれません。Dataflow のウォーターマークよりも小さい Cloud Spanner の変更は、BigQuery テーブルに正常に適用されるか、再試行のためにデッドレター キューに保存されます。BigQuery の行は、元の Cloud Spanner commit タイムスタンプの順序と比較して順不同で挿入されます。
BigQuery テーブルに次のメタデータ フィールドが追加されます。
- _metadata_spanner_mod_type: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_spanner_table_name: Cloud Spanner テーブル名。これは、コネクタのメタデータ テーブル名ではありません。
- _metadata_spanner_commit_timestamp: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_spanner_server_transaction_id: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_spanner_record_sequence: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_spanner_is_last_record_in_transaction_in_partition: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_spanner_number_of_records_in_transaction: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_spanner_number_of_partitions_in_transaction: 変更ストリームのデータ変更レコードから抽出されます。
- _metadata_big_query_commit_timestamp: 行が BigQuery に挿入されたときの commit タイムスタンプ。
注:
- このテンプレートでは、スキーマの変更が Cloud Spanner から BigQuery に伝播されません。Cloud Spanner でスキーマの変更を行うとパイプラインが壊れる可能性があります。スキーマの変更後にパイプラインの再作成が必要になることがあります。
OLD_AND_NEW_VALUES
とNEW_VALUES
値のキャプチャ タイプで、データ変更レコードに UPDATE 変更が含まれている場合、テンプレートはデータ変更レコードの commit タイムスタンプで Cloud Spanner に対してステイル読み取りを実行し、変更されていない監視対象の列を取得する必要があります。ステイル読み取りに対してデータベース「version_retention_period」が正しく構成されていることを確認してください。NEW_ROW
値のキャプチャ タイプの場合、データ変更レコードは UPDATE で更新されない列を含む新しい行をすべてキャプチャするため、テンプレートのほうが効率的です。テンプレートでは、ステイル読み取りを行う必要がありません。- Cloud Spanner インスタンスまたは BigQuery テーブルと同じリージョンから Dataflow ジョブを実行すると、ネットワークのレイテンシとネットワーク転送の費用を最小限に抑えることができます。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。Dataflow リージョン エンドポイントの詳細をご覧ください。
- このテンプレートは有効な Cloud Spanner のデータ型をすべてサポートしていますが、BigQuery の型が Cloud Spanner の型より精度が高い場合、変換中に精度が失われる可能性があります。具体的には次のとおりです。
- Cloud Spanner JSON 型では、オブジェクトのメンバーの順序は辞書順に並べ替えられますが、BigQuery JSON 型はこのような保証はありません。
- Cloud Spanner はナノ秒単位の TIMESTAMP 型をサポートしますが、BigQuery はマイクロ秒 TIMESTAMP 型のみをサポートします。
変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法とベスト プラクティスをご覧ください。
このパイプラインの要件:
- パイプラインの実行前に、Cloud Spanner インスタンスが存在している。
- パイプラインの実行前に、Cloud Spanner データベースが存在している。
- パイプラインの実行前に、Cloud Spanner メタデータ インスタンスが存在している。
- パイプラインの実行前に、Cloud Spanner メタデータ データベースが存在している。
- パイプラインの実行前に、Cloud Spanner の変更ストリームが存在している。
- パイプラインの実行前に BigQuery データセットが存在している。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
spannerInstanceId |
変更ストリームの読み取り元の Cloud Spanner インスタンス。 |
spannerDatabase |
変更ストリームの読み取り元の Cloud Spanner データベース。 |
spannerMetadataInstanceId |
変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner インスタンス。 |
spannerMetadataDatabase |
変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner データベース。 |
spannerChangeStreamName |
読み取り元の Cloud Spanner 変更ストリームの名前。 |
bigQueryDataSet |
変更ストリーム出力の BigQuery データセット |
spannerProjectId |
(省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。 |
spannerMetadataTableName |
(省略可)使用する Cloud Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フロー中に Cloud Spanner 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は指定しないでください。 |
rpcPriority |
(省略可)Cloud Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高) |
startTimestamp |
(省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。 |
endTimestamp |
(省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、現在よりも先の無限の時間です。 |
bigQueryProjectId |
(省略可)BigQuery プロジェクト。デフォルトは Dataflow ジョブのプロジェクトです。 |
bigQueryChangelogTableNameTemplate |
(省略可)BigQuery 変更履歴テーブルの名前のテンプレート。デフォルトは {_metadata_spanner_table_name}_changelog です。 |
deadLetterQueueDirectory |
(省略可)処理されなかった理由とともに、未処理のレコードを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。 |
dlqRetryMinutes |
(省略可)デッドレター キューの再試行間隔の分数。デフォルトは 10 です。 |
ignoreFields |
(省略可)無視するフィールドのカンマ区切りリスト(大文字と小文字が区別されます)。監視対象のテーブルのフィールドや、パイプラインによって追加されたメタデータ フィールドなどが考えられます。無視されるフィールドは BigQuery に挿入されません。 |
Cloud Spanner change streams to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームBIGQUERY_DATASET
: 変更ストリーム出力の BigQuery データセット
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームBIGQUERY_DATASET
: 変更ストリーム出力の BigQuery データセット
Cloud Spanner change streams to Pub/Sub
Cloud Spanner change streams to Pub/Sub テンプレートは、Cloud Dataflow データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Pub/Sub トピックに書き込むストリーミング パイプラインです。
データを新しい Pub/Sub トピックに出力するには、まずトピックを作成する必要があります。作成後、Pub/Sub はサブスクリプションを自動的に生成して新しいトピックに接続します。存在しない Pub/Sub トピックにデータを出力しようとすると、Dataflow パイプラインは例外をスローし、パイプラインは継続的に接続しようとして停止します。
必要な Pub/Sub トピックがすでに存在する場合、そのトピックにデータを出力できます。
詳細については、変更ストリームについて、Dataflow で変更ストリームの接続を構築する、変更ストリームのベスト プラクティスをご覧ください。
このパイプラインの要件:
- パイプラインの実行前に、Cloud Spanner インスタンスが存在している。
- パイプラインの実行前に、Cloud Spanner データベースが存在している。
- パイプラインの実行前に、Cloud Spanner メタデータ インスタンスが存在している。
- パイプラインの実行前に、Cloud Spanner メタデータ データベースが存在している。
- パイプラインの実行前に、Cloud Spanner の変更ストリームが存在している。
- Pub/Sub トピックは、パイプラインを実行する前に存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
spannerInstanceId |
変更ストリームの読み取り元の Cloud Spanner インスタンス。 |
spannerDatabase |
変更ストリームの読み取り元の Cloud Spanner データベース。 |
spannerMetadataInstanceId |
変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner インスタンス。 |
spannerMetadataDatabase |
変更ストリーム コネクタのメタデータ テーブルに使用する Cloud Spanner データベース。 |
spannerChangeStreamName |
読み取り元の Cloud Spanner 変更ストリームの名前。 |
pubsubTopic |
変更ストリーム出力の Pub/Sub トピック。 |
spannerProjectId |
(省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。 |
spannerMetadataTableName |
(省略可)使用する Cloud Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フローの変更中に Cloud Spanner によってストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。このパラメータは他のケースでは使用しないでください。 |
rpcPriority |
(省略可)Cloud Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高) |
startTimestamp |
(省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。 |
endTimestamp |
(省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。 |
outputFileFormat |
(省略可)出力の形式。出力は多くの PubsubMessage でラップされ、Pub/Sub トピックに送信されます。許可されている形式は JSON と AVRO です。デフォルトは JSON です。 |
pubsubAPI |
(省略可)パイプラインの実装に使用される Pub/Sub API。許可される API は pubsubio と native_client です。秒間クエリ数(QPS)が少ない場合、native_client はレイテンシが低くなります。QPS が大きい場合、pubsubio はより優れた安定したパフォーマンスを提供します。デフォルト値は pubsubio です。 |
Cloud Spanner change streams to Pub/Sub テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to Pub/Sub template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームPUBSUB_TOPIC
: 変更ストリーム出力の Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームPUBSUB_TOPIC
: 変更ストリーム出力の Pub/Sub トピック
MongoDB to BigQuery(CDC)
MongoDB to BigQuery CDC(変更データ キャプチャ)テンプレートは、MongoDB 変更ストリームと連携するストリーミング パイプラインです。パイプラインは、MongoDB 変更ストリームを介して Pub/Sub に push された JSON レコードを読み取り、userOption
パラメータで指定されたとおりに BigQuery に書き込みます。
このパイプラインの要件
- ターゲット BigQuery データセットが存在すること。
- ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。
- MongoDB から Pub/Sub に変更を push する変更ストリームが実行されている必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
mongoDbUri |
MongoDB 接続 URI。形式は mongodb+srv://:@ 。 |
database |
コレクションを読み取る MongoDB 内のデータベース。例: my-db 。 |
collection |
MongoDB データベース内のコレクションの名前。例: my-collection 。 |
outputTableSpec |
書き込み先の BigQuery テーブル。例: bigquery-project:dataset.output_table 。 |
userOption |
FLATTEN または NONE 。FLATTEN : ドキュメントを第 1 レベルでフラット化します。NONE は、ドキュメント全体を JSON 文字列として格納します。 |
inputTopic |
読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。 |
MongoDB to BigQuery(CDC)テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the MongoDB to BigQuery (CDC) template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
OUTPUT_TABLE_SPEC
: ターゲット BigQuery テーブル名。MONGO_DB_URI
: MongoDB URI。DATABASE
: MongoDB データベース。COLLECTION
: MongoDB コレクション。USER_OPTION
: FLATTEN または NONE。INPUT_TOPIC
: Pub/Sub 入力トピック。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
OUTPUT_TABLE_SPEC
: ターゲット BigQuery テーブル名。MONGO_DB_URI
: MongoDB URI。DATABASE
: MongoDB データベース。COLLECTION
: MongoDB コレクション。USER_OPTION
: FLATTEN または NONE。INPUT_TOPIC
: Pub/Sub 入力トピック。