Google はオープンソースの Dataflow テンプレートを提供しています。
これらの Dataflow テンプレートは、データのインポート、エクスポート、バックアップ、復元、API の一括オペレーションなど、大規模なデータタスクの解決に役立ちます。専用の開発環境を使用しなくても、これらの処理を実行できます。テンプレートは Apache Beam 上に構築され、Dataflow を使用してデータを変換します。
テンプレートに関する一般的な情報については、Dataflow テンプレートをご覧ください。Google が提供するテンプレートのリストについては、Google 提供のテンプレートの概要をご覧ください。このガイドでは、ストリーミング テンプレートについて説明します。
Pub/Sub Subscription to BigQuery
Pub/Sub Subscription to BigQuery テンプレートは、Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。
このパイプラインの要件:
- Pub/Sub メッセージの
data
フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data
フィールドの値が{"k1":"v1", "k2":"v2"}
としてフォーマットされたメッセージは、文字列データ型のk1
とk2
という 2 つの列を持つ BigQuery テーブルに挿入できます。 - パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。 |
outputTableSpec |
BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。 |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、OUTPUT_TABLE_SPEC_error_records が代わりに使用されます。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
Pub/Sub Subscription to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Subscription to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
Pub/Sub Topic to BigQuery
Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。
このパイプラインの要件:
- Pub/Sub メッセージの
data
フィールドは、この JSON ガイドで説明されている JSON 形式を使用する必要があります。たとえば、data
フィールドの値が{"k1":"v1", "k2":"v2"}
としてフォーマットされたメッセージは、文字列データ型のk1
とk2
という 2 つの列を持つ BigQuery テーブルに挿入できます。 - パイプラインの実行前に出力テーブルが存在している必要があります。テーブル スキーマが入力 JSON オブジェクトと一致する必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み込まれる Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。 |
outputTableSpec |
BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。 |
outputDeadletterTable |
出力テーブルに到達できなかったメッセージの BigQuery テーブル。<my-project>:<my-dataset>.<my-table> の形式で指定します。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
Pub/Sub Topic to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名DATASET
: BigQuery データセットTABLE_NAME
: BigQuery テーブル名
Pub/Sub Avro to BigQuery
Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
このパイプラインの要件
- 入力 Pub/Sub サブスクリプションが存在していること。
- Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 未処理の Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
schemaPath |
Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc 。 |
inputSubscription |
読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription> |
outputTopic |
未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
outputTableSpec |
BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table> 。createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。 |
writeDisposition |
(省略可)BigQuery の WriteDisposition。例: WRITE_APPEND 、WRITE_EMPTY 、または WRITE_TRUNCATE 。デフォルト: WRITE_APPEND |
createDisposition |
(省略可)BigQuery の CreateDisposition。例: CREATE_IF_NEEDED 、CREATE_NEVER 。デフォルト: CREATE_IF_NEEDED |
Pub/Sub Avro to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Avro to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
Pub/Sub Proto to BigQuery
Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
データを変換するための JavaScript ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。
このパイプラインの要件:
- 入力 Pub/Sub サブスクリプションが存在していること。
- Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 出力 Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
- BigQuery テーブルが存在する場合は、
createDisposition
値にかかわらず、proto データに一致するスキーマが必要です。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
protoSchemaPath |
自己完結型の proto スキーマ ファイルの Cloud Storage の場所。例: gs://path/to/my/file.pb このファイルは、protoc コマンドの --descriptor_set_out フラグを使用して生成できます。--include_imports フラグを使用すると、ファイルが自己完結することを保証できます。 |
fullMessageName |
proto メッセージの完全な名前。たとえば、package.name.MessageName の場合、package.name は、java_package ステートメントではなく、package ステートメントに指定された値です。 |
inputSubscription |
読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription> |
outputTopic |
未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
outputTableSpec |
BigQuery 出力テーブルの場所。例: my-project:my_dataset.my_table createDisposition の指定によっては、入力スキーマ ファイルを使用して出力テーブルが自動的に作成されます。 |
preserveProtoFieldNames |
(省略可)JSON で元の Proto フィールド名を保持するには true 。false : 標準の JSON 名を使用します。たとえば、false と指定すると field_name が fieldName に変更されます。(デフォルト: false ) |
bigQueryTableSchemaPath |
(省略可)BigQuery スキーマパスへの Cloud Storage パス。例: gs://path/to/my/schema.json 指定されていない場合、スキーマは Proto スキーマから推測されます。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
udfOutputTopic |
(省略可)UDF エラーを格納する Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> です。指定されていない場合、UDF エラーは outputTopic と同じトピックに送信されます。 |
writeDisposition |
(省略可)BigQuery WriteDisposition 。WRITE_APPEND 、WRITE_EMPTY 、WRITE_TRUNCATE など。デフォルト: WRITE_APPEND |
createDisposition |
(省略可)BigQuery CreateDisposition 。例: CREATE_IF_NEEDED 、CREATE_NEVER デフォルト: CREATE_IF_NEEDED |
Pub/Sub Proto to BigQuery テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Proto to BigQuery template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Proto スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: Proto メッセージ名(例:package.name.MessageName
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名UNPROCESSED_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SCHEMA_PATH
: Proto スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: Proto メッセージ名(例:package.name.MessageName
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名UNPROCESSED_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
Pub/Sub to Pub/Sub
Pub/Sub to Pub/Sub テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、別の Pub/Sub トピックにそのメッセージを書き込むストリーミング パイプラインです。Pub/Sub トピックに書き込むメッセージのフィルタリングに使用される、オプションのメッセージ属性キーと値を指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しながら、Pub/Sub サブスクリプションから別の Pub/Sub トピックにメッセージをコピーできます。
このパイプラインの要件:
- 実行前にコピー元の Pub/Sub サブスクリプションが存在している必要があります。
- コピー元の Pub/Sub サブスクリプションが pull サブスクリプションである必要があります。
- 実行前にコピー先の Pub/Sub トピックが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
読み取り元の入力 Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name> |
outputTopic |
書き込み先の出力 Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
filterKey |
(省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。 |
filterValue |
(省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。 |
Pub/Sub to Pub/Sub テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Pub/Sub template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOPIC_NAME
: Pub/Sub トピック名FILTER_KEY
: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。FILTER_VALUE
: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOPIC_NAME
: Pub/Sub トピック名FILTER_KEY
: イベントをフィルタする属性キー。キーが指定されていない場合、フィルタは適用されません。FILTER_VALUE
: イベント フィルタキーが指定されている場合に使用するフィルタ属性値。有効な Java 正規表現文字列をイベント フィルタ値として受け入れます。正規表現を指定した場合、メッセージがフィルタされるには、式全体が一致する必要があります。部分一致(部分文字列など)はフィルタされません。デフォルトでは、null イベント フィルタ値が使用されます。
Pub/Sub to Splunk
Pub/Sub to Splunk テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、メッセージ ペイロードを Splunk の HTTP Event Collector(HEC)を介して Splunk に書き込むストリーミング パイプラインです。このテンプレートの最も一般的な使用例は Splunk へのログのエクスポートです。基盤となるワークフローの例については、Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイするをご覧ください。
Splunk に書き込む前に、メッセージ ペイロードに JavaScript ユーザー定義関数を適用することもできます。処理が失敗したメッセージは、トラブルシューティングと再処理のために Pub/Sub の未処理のトピックに転送されます。
HEC トークンの保護を強化するために、Base64 エンコードされた HEC トークン パラメータを Cloud KMS 鍵で暗号化して、この Cloud KMS 鍵とともに渡すこともできます。HEC トークン パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。
このパイプラインの要件:
- パイプラインの実行前にソース Pub/Sub サブスクリプションが存在している必要があります。
- パイプラインを実行する前に、Pub/Sub に未処理のトピックが存在している必要があります。
- Dataflow ワーカーのネットワークから Splunk の HEC エンドポイントにアクセスできる必要があります。
- Splunk の HEC トークンが生成済みで、利用可能である必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
入力の読み取り元の Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name> |
token |
(省略可)Splunk の HEC 認証トークン。tokenSource が PLAINTEXT または KMS に設定されている場合は必須です。 |
url |
Splunk の HEC URL。パイプラインが実行される VPC からルーティング可能である必要があります。例: https://splunk-hec-host:8088。 |
outputDeadletterTopic |
配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
batchCount |
(省略可)複数のイベントを Splunk に送信するためのバッチサイズ。デフォルト: 1(バッチ処理なし)。 |
parallelism |
(省略可)並行リクエストの最大数。デフォルト: 1(並列処理なし)。 |
disableCertificateValidation |
(省略可)SSL 証明書の検証を無効にします。デフォルト: false(検証が有効)。true の場合、証明書は検証されず(すべての証明書が信頼されます)、rootCaCertificatePath パラメータは無視されます。 |
includePubsubMessage |
(省略可)完全な Pub/Sub メッセージをペイロードに含めます。デフォルト: false(ペイロードにはデータ要素のみが含まれる)。 |
tokenSource |
トークンのソース。PLAINTEXT、KMS、SECRET_MANAGER のいずれかです。Secret Manager を使用する場合は、このパラメータを指定する必要があります。tokenSource が KMS に設定されている場合、tokenKMSEncryptionKey と暗号化された token を指定する必要があります。tokenSource を SECRET_MANAGER に設定する場合は、tokenSecretId を指定する必要があります。tokenSource が PLAINTEXT に設定されている場合は、token を指定する必要があります。 |
tokenKMSEncryptionKey |
(省略可)HEC トークン文字列を復号するための Cloud KMS 鍵。tokenSource が KMS に設定されている場合、このパラメータを指定する必要があります。Cloud KMS 鍵を指定する場合は、HEC トークン文字列を暗号化して渡す必要があります。 |
tokenSecretId |
(省略可)トークンの Secret Manager シークレット ID。tokenSource が SECRET_MANAGER に設定されている場合、このパラメータを指定する必要があります。projects/<project-id>/secrets/<secret-name>/versions/<secret-version> 形式で指定する必要があります。 |
rootCaCertificatePath |
(省略可)Cloud Storage のルート CA 証明書の完全 URL。例: gs://mybucket/mycerts/privateCA.crt Cloud Storage で提供される証明書は DER でエンコードする必要があります。この証明書はバイナリまたは印刷可能(Base64)エンコードで提供できます。証明書を Base64 エンコードで提供する場合は、証明書を -----BEGIN CERTIFICATE----- と -----END CERTIFICATE----- で囲む必要があります。このパラメータを指定すると、このプライベート CA 証明書ファイルが取得され、Splunk HEC エンドポイントの SSL 証明書の検証用に Dataflow ワーカーのトラストストアに追加されます。このパラメータを指定しない場合は、デフォルトのトラストストアが使用されます。 |
enableBatchLogs |
(省略可)Splunk に書き込まれたバッチに対してログを有効にするかどうかを指定します。デフォルト: true |
enableGzipHttpCompression |
(省略可)Splunk HEC に送信される HTTP リクエストを圧縮するかどうか(gzip コンテンツをエンコードするかどうか)を指定します。デフォルト: true |
Pub/Sub to Splunk テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Splunk template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION,\ rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOKEN
: Splunk の Http Event Collector トークンURL
: Splunk の Http Event Collector の URL パス(例:https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: Pub/Sub トピック名JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
: Splunk に複数のイベントを送信するために使用するバッチサイズPARALLELISM
: Splunk にイベントを送信するために使用する並列リクエストの数DISABLE_VALIDATION
: SSL 証明書の検証を無効にする場合、true
ROOT_CA_CERTIFICATE_PATH
: Cloud Storage のルート CA 証明書へのパス(例:gs://your-bucket/privateCA.crt
)。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名TOKEN
: Splunk の Http Event Collector トークンURL
: Splunk の Http Event Collector の URL パス(例:https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: Pub/Sub トピック名JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI。例:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
: Splunk に複数のイベントを送信するために使用するバッチサイズPARALLELISM
: Splunk にイベントを送信するために使用する並列リクエストの数DISABLE_VALIDATION
: SSL 証明書の検証を無効にする場合、true
ROOT_CA_CERTIFICATE_PATH
: Cloud Storage のルート CA 証明書へのパス(例:gs://your-bucket/privateCA.crt
)。
Pub/Sub to Avro Files on Cloud Storage
Pub/Sub to Avro files to Cloud Storage テンプレートは、Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。
このパイプラインの要件:
- パイプラインの実行前に入力 Pub/Sub トピックが存在すること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
メッセージを使用するために購読する Cloud Pub/Sub トピック。トピック名は、projects/<project-id>/topics/<topic-name> の形式にする必要があります。 |
outputDirectory |
出力 Avro ファイルがアーカイブされる出力ディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/ |
avroTempDirectory |
一時的な Avro ファイル用のディレクトリ。末尾には、/ を含める必要があります。例: gs://example-bucket/example-directory/ |
outputFilenamePrefix |
(省略可)Avro ファイルの出力ファイル名接頭辞。 |
outputFilenameSuffix |
(省略可)Avro ファイルの出力ファイル名接尾辞。 |
outputShardTemplate |
(省略可)出力ファイルのシャード テンプレート。文字 S や N の繰り返しシーケンスで指定します。例: SSS-NNN これらはそれぞれシャード番号やシャードの総数に置き換えられます。このパラメータを指定しない場合、デフォルトのテンプレートの形式は W-P-SS-of-NN です。 |
Pub/Sub to Cloud Storage Avro テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Avro Files on Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ avroTempDirectory=gs://BUCKET_NAME/temp/
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILENAME_PREFIX
: 使用したい出力ファイル名接頭辞FILENAME_SUFFIX
: 使用したい出力ファイル名接尾辞SHARD_TEMPLATE
: 使用したい出力シャード テンプレート
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前FILENAME_PREFIX
: 使用したい出力ファイル名接頭辞FILENAME_SUFFIX
: 使用したい出力ファイル名接尾辞SHARD_TEMPLATE
: 使用したい出力シャード テンプレート
Pub/Sub Topic to Text Files on Cloud Storage
Pub/Sub to Cloud Storage Text テンプレートは、Pub/Sub トピックからレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。
このパイプラインの要件:
- 実行前に Pub/Sub トピックが存在している必要があります。
- トピックに公開するメッセージは、テキスト形式となる必要があります。
- トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 |
outputDirectory |
出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/ この値は「/」で終わる必要があります。 |
outputFilenamePrefix |
ウィンドウ処理されたファイルの接頭辞。例: output- |
outputFilenameSuffix |
ウィンドウ処理されたファイルの接尾辞。通常は、.txt や .csv などの拡張子です。 |
outputShardTemplate |
シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplate は W-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplate の SS-of-NN 部分は 00-of-01 になります。 |
Pub/Sub Topic to Text Files on Cloud Storage テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Text Files on Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
STAGING_LOCATION
: ローカル ファイルをステージングする場所(例:gs://your-bucket/staging
)TEMP_LOCATION
: 一時ファイルを書き込む場所(例:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
Pub/Sub Topic or Subscription to Text Files on Cloud Storage
Pub/Sub Topic or Subscription to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。
このパイプラインの要件:
- 実行前に Pub/Sub トピックまたはサブスクリプションが存在している必要があります。
- トピックに公開するメッセージは、テキスト形式となる必要があります。
- トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputTopic |
読み取り元の Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。 このパラメータが指定されている場合は、inputSubscription を指定しないでください。 |
inputSubscription |
読み取り元の入力 Pub/Sub サブスクリプション。サブスクリプション名は projects/<project-id>/subscription/<subscription-name> の形式にする必要があります。このパラメータが指定されている場合は、inputTopic を指定しないでください。 |
outputDirectory |
出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/ この値は「/」で終わる必要があります。 |
outputFilenamePrefix |
ウィンドウ処理されたファイルの接頭辞。例: output- |
outputFilenameSuffix |
ウィンドウ処理されたファイルの接尾辞。通常は、.txt や .csv などの拡張子です。 |
outputShardTemplate |
シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplate は W-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplate の SS-of-NN 部分は 00-of-01 になります。 |
windowDuration |
(省略可)ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。 |
Pub/Sub Topic or Subscription to Text Files on Cloud Storage テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template jobs run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名BUCKET_NAME
: Cloud Storage バケットの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名BUCKET_NAME
: Cloud Storage バケットの名前
Pub/Sub to MongoDB
Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。スキーマの不一致や不正な形式の JSON によるエラー、または変換の実行中に発生したエラーは、入力メッセージとともに未処理メッセージの BigQuery テーブルに記録されます。未処理レコードのテーブルが実行前に存在しない場合は、パイプラインによって自動的にこのテーブルが作成されます。
このパイプラインの要件:
- Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
- MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
Pub/Sub サブスクリプションの名前。例:
|
mongoDBUri |
MongoDB サーバーのカンマ区切りのリスト。例: 192.285.234.12:27017,192.287.123.11:27017 |
database |
コレクションを格納する MongoDB のデータベース。例: my-db 。 |
collection |
MongoDB データベース内のコレクションの名前。例: my-collection 。 |
deadletterTable |
エラー(スキーマの不一致や JSON の形式の誤りなど)によりメッセージが保存される BigQuery テーブル。例: project-id:dataset-name.table-name 。 |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
batchSize |
(省略可)MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルト: 1000 |
batchSizeBytes |
(省略可)バッチサイズ(バイト単位)。デフォルト: 5242880 |
maxConnectionIdleTime |
(省略可)接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルト: 60000 |
sslEnabled |
(省略可)MongoDB への接続が SSL かどうかを示すブール値。デフォルト: true |
ignoreSSLCertificate |
(省略可)SSL 証明書を無視するかどうかを示すブール値。デフォルト: true |
withOrdered |
(省略可)MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルト: true |
withSSLInvalidHostNameAllowed |
(省略可)SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルト: true |
Pub/Sub to MongoDB テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to MongoDB template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDREGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
INPUT_SUBSCRIPTION
: Pub/Sub サブスクリプション(例:
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: MongoDB サーバーのアドレス(例:192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: MongoDB データベースの名前(例:users
)COLLECTION
: MongoDB コレクションの名前(例:profiles
)UNPROCESSED_TABLE
: BigQuery テーブルの名前(例:your-project:your-dataset.your-table-name
)
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDLOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
INPUT_SUBSCRIPTION
: Pub/Sub サブスクリプション(例:
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: MongoDB サーバーのアドレス(例:192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: MongoDB データベースの名前(例:users
)COLLECTION
: MongoDB コレクションの名前(例:profiles
)UNPROCESSED_TABLE
: BigQuery テーブルの名前(例:your-project:your-dataset.your-table-name
)
Pub/Sub to Elasticsearch
Pub/Sub to Elasticsearch テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、ユーザー定義関数(UDF)を実行して、それらをドキュメントとして Elasticsearch に書き込むストリーミング パイプラインです。Dataflow テンプレートは、Elasticsearch のデータ ストリーム機能を使用して、複数のインデックスにまたがる時系列データを保存し、リクエストに対して単一の名前付きリソースを提供します。データ ストリームは、ログ、指標、トレースに適しています。また、継続的に生成され、Pub/Sub に保存されるデータにも適しています。
このパイプラインの要件
- 参照元の Pub/Sub サブスクリプションが存在し、メッセージが有効な JSON 形式でエンコードされている必要があります。
- GCP インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上で一般公開されている到達可能な Elasticsearch ホスト。詳細については、Elastic のための Google Cloud 統合をご覧ください。
- エラー出力用の Pub/Sub トピック。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
inputSubscription |
使用する Pub/Sub サブスクリプション。名前は projects/<project-id>/subscriptions/<subscription-name> の形式にします。 |
connectionUrl |
https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。 |
apiKey |
認証に使用される Base64 エンコードの API キー。 |
errorOutputTopic |
失敗したレコードを公開するための Pub/Sub 出力トピック。projects/<project-id>/topics/<topic-name> の形式にします。 |
dataset |
(省略可)Pub/Sub 経由で送信されるログのタイプ。すぐに使えるダッシュボードが用意されています。既知のログタイプ値は、audit、vpcflow、firewall です。デフォルト: pubsub |
namespace |
(省略可)環境(開発、生産、QA)、チーム、戦略事業部門などの任意のグループ。デフォルト: default |
batchSize |
(省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000 |
batchSizeBytes |
(省略可)バッチサイズ(バイト数)。デフォルト: 5242880 (5 MB)。 |
maxRetryAttempts |
(省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: no retries |
maxRetryDuration |
(省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: no retries |
javascriptTextTransformGcsPath |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js |
javascriptTextTransformFunctionName |
(省略可)
使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
|
propertyAsIndex |
(省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none |
propertyAsId |
(省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none |
javaScriptIndexFnGcsPath |
(省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none |
javaScriptIndexFnName |
(省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none |
javaScriptIdFnGcsPath |
(省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none |
javaScriptIdFnName |
(省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none |
javaScriptTypeFnGcsPath |
(省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none |
javaScriptTypeFnName |
(省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none |
javaScriptIsDeleteFnGcsPath |
(省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none |
javaScriptIsDeleteFnName |
(省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none |
usePartialUpdate |
(省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false |
bulkInsertMethod |
(省略可)INDEX (インデックス、upserts を許可する)または CREATE (作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE |
Pub/Sub to Elasticsearch テンプレートの実行
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、the Pub/Sub to Elasticsearch template を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
ERROR_OUTPUT_TOPIC
: エラー出力用の Pub/Sub トピック。SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名CONNECTION_URL
: Elasticsearch の URLDATASET
: ログタイプNAMESPACE
: データセットの名前空間APIKEY
: 認証用に Base64 でエンコードされた API キー
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン エンドポイント。例:us-central1
VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。- バージョン名(例:
2021-09-20-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
ERROR_OUTPUT_TOPIC
: エラー出力用の Pub/Sub トピック。SUBSCRIPTION_NAME
: Pub/Sub サブスクリプション名CONNECTION_URL
: Elasticsearch の URLDATASET
: ログタイプNAMESPACE
: データセットの名前空間APIKEY
: 認証用に Base64 でエンコードされた API キー