このテンプレートは、MongoDB 変更ストリームと連携するストリーミング パイプラインを作成します。このテンプレートを使用するには、変更ストリーム データを Pub/Sub に公開します。パイプラインは Pub/Sub から JSON レコードを読み取り、BigQuery に書き込みます。BigQuery に書き込まれるレコードは、MongoDB to BigQuery バッチ テンプレートと同じ形式になります。
パイプラインの要件
- ターゲット BigQuery データセットが存在すること。
- ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。
- 変更ストリームを読み取るには、Pub/Sub トピックを作成する必要があります。パイプラインの実行中に、MongoDB 変更ストリームで変更データ キャプチャ(CDC)イベントをリッスンし、それらを JSON レコードとして Pub/Sub に公開します。Pub/Sub へのメッセージのパブリッシュの詳細については、メッセージをトピックに公開するをご覧ください。
- このテンプレートでは MongoDB 変更ストリームを使用します。BigQuery の変更データ キャプチャはサポートされていません。
テンプレートのパラメータ
必須パラメータ
- mongoDbUri: MongoDB 接続 URI。形式は
mongodb+srv://:@.
。 - database: コレクションを読み取る MongoDB 内のデータベース。例:
my-db
- collection: MongoDB データベース内のコレクションの名前。例:
my-collection
- userOption:
FLATTEN
、JSON
、NONE
。FLATTEN
は、ドキュメントを単一レベルにフラット化します。JSON
は、ドキュメントを BigQuery JSON 形式で保存します。NONE
は、ドキュメント全体を JSON 形式の STRING として保存します。デフォルトは NONE です。 - inputTopic: 読み取る Pub/Sub 入力トピック。
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
の形式で指定します。 - outputTableSpec: 書き込み先の BigQuery テーブル。例:
bigquery-project:dataset.output_table
オプション パラメータ
- useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを
true
に設定します。exactly-once セマンティクスを使用するには、パラメータをfalse
に設定します。このパラメータは、useStorageWriteApi
がtrue
の場合にのみ適用されます。デフォルト値はfalse
です。 - KMSEncryptionKey: MongoDB URI 接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、MongoDB URI 接続文字列はすべて暗号化されて渡されます例:
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
- filter: JSON 形式の Bson フィルタ。例:
{ "val": { $gt: 0, $lt: 9 }}
- useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は
false
です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。 - numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。
useStorageWriteApi
がtrue
であり、useStorageWriteApiAtLeastOnce
がfalse
の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。 - storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。
useStorageWriteApi
がtrue
であり、useStorageWriteApiAtLeastOnce
がfalse
の場合に、このパラメータを設定する必要があります。 - bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス。例:
gs://your-bucket/your-schema.json
- javascriptDocumentTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する
.js
ファイルの Cloud Storage URI例:gs://your-bucket/your-transforms/*.js
- javascriptDocumentTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください例:transform
。
ユーザー定義関数
必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。
UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。
パラメータ | 説明 |
---|---|
javascriptDocumentTransformGcsPath |
JavaScript ファイルの Cloud Storage の場所。 |
javascriptDocumentTransformFunctionName |
JavaScript 関数の名前。 |
詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。
関数の仕様
UDF の仕様は次のとおりです。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the MongoDB (CDC) to BigQuery template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
OUTPUT_TABLE_SPEC
: ターゲット BigQuery テーブル名。MONGO_DB_URI
: MongoDB URI。DATABASE
: MongoDB データベース。COLLECTION
: MongoDB コレクション。USER_OPTION
: FLATTEN、JSON、NONE。INPUT_TOPIC
: Pub/Sub 入力トピック。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
OUTPUT_TABLE_SPEC
: ターゲット BigQuery テーブル名。MONGO_DB_URI
: MongoDB URI。DATABASE
: MongoDB データベース。COLLECTION
: MongoDB コレクション。USER_OPTION
: FLATTEN、JSON、NONE。INPUT_TOPIC
: Pub/Sub 入力トピック。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。