このテンプレートは、MongoDB からドキュメントを読み取り、BigQuery に書き込むバッチ パイプラインを作成します。
MongoDB 変更ストリーム データをキャプチャする場合は、MongoDB to BigQuery(CDC)テンプレートを使用できます。
パイプラインの要件
- ターゲット BigQuery データセットが存在すること。
- ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。
出力形式
出力レコードの形式は、userOption
パラメータの値によって異なります。userOption
が NONE
の場合、出力のスキーマは次のとおりです。source_data
フィールドには、JSON 形式のドキュメントが含まれます。
[ {"name":"id","type":"STRING"}, {"name":"source_data","type":"STRING"}, {"name":"timestamp","type":"TIMESTAMP"} ]
userOption
が FLATTEN
の場合、パイプラインはドキュメントをフラット化し、最上位のフィールドをテーブルの列として書き込みます。たとえば、MongoDB コレクションのドキュメントに次のフィールドがあるとします。
"_id"
(string
)"title"
(string
)"genre"
(string
)
FLATTEN
を使用すると、出力のスキーマは次のようになります。timestamp
フィールドはテンプレートによって追加されます。
[ {"name":"_id","type":"STRING"}, {"name":"title","type":"STRING"}, {"name":"genre","type":"STRING"}, {"name":"timestamp","type":"TIMESTAMP"} ]
テンプレートのパラメータ
必須パラメータ
- mongoDbUri: MongoDB 接続 URI。形式は
mongodb+srv://:@.
。 - database: コレクションを読み取る MongoDB 内のデータベース(例: my-db)。
- collection: MongoDB データベース内のコレクションの名前(例: my-collection)。
- userOption:
FLATTEN
、JSON
、NONE
。FLATTEN
は、ドキュメントを単一レベルにフラット化します。JSON
は、ドキュメントを BigQuery JSON 形式で保存します。NONE
は、ドキュメント全体を JSON 形式の文字列として保存します。デフォルトは NONE です。 - outputTableSpec: 書き込み先の BigQuery テーブル。たとえば、
bigquery-project:dataset.output_table
のようにします。
オプション パラメータ
- KMSEncryptionKey: MongoDB URI 接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、MongoDB URI 接続文字列はすべて暗号化されて渡されます(例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key)。
- filter: JSON 形式の Bson フィルタ(例: { "val": { $gt: 0, $lt: 9 }})。
- useStorageWriteApi:
true
の場合、パイプラインでは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値はfalse
です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。 - useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを
true
に設定します。exactly-once セマンティクスを使用するには、パラメータをfalse
に設定します。このパラメータは、useStorageWriteApi
がtrue
の場合にのみ適用されます。デフォルト値はfalse
です。 - bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス(例: gs://your-bucket/your-schema.json)。
- javascriptDocumentTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する
.js
ファイルの Cloud Storage URI(例: gs://your-bucket/your-transforms/*.js)。 - javascriptDocumentTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください(例: transform)。
ユーザー定義関数
必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。
UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。
パラメータ | 説明 |
---|---|
javascriptDocumentTransformGcsPath |
JavaScript ファイルの Cloud Storage の場所。 |
javascriptDocumentTransformFunctionName |
JavaScript 関数の名前。 |
詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。
関数の仕様
UDF の仕様は次のとおりです。
userOption
が NONE
の場合、JSON オブジェクトにはドキュメント ID を含む _id
という名前のプロパティが含まれている必要があります。テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the MongoDB to BigQuery template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
OUTPUT_TABLE_SPEC
: ターゲット BigQuery テーブル名。MONGO_DB_URI
: MongoDB URI。DATABASE
: MongoDB データベース。COLLECTION
: MongoDB コレクション。USER_OPTION
: FLATTEN または NONE。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
OUTPUT_TABLE_SPEC
: ターゲット BigQuery テーブル名。MONGO_DB_URI
: MongoDB URI。DATABASE
: MongoDB データベース。COLLECTION
: MongoDB コレクション。USER_OPTION
: FLATTEN または NONE。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。