This template creates a streaming pipeline that works with MongoDB change streams. To use this template, publish the change stream data to Pub/Sub. The pipeline reads the JSON records from Pub/Sub and writes them to BigQuery. The records written to BigQuery have the same format as the MongoDB to BigQuery batch template.
Pipeline requirements
- The target BigQuery dataset must exist.
- The source MongoDB instance must be accessible from the Dataflow worker machines.
- You must create a Pub/Sub topic to read the change stream. While the pipeline is running, listen for change data capture (CDC) events on the MongoDB change stream and publish them to Pub/Sub as JSON records. For more information about publishing messages to Pub/Sub, see Publish messages to topics.
- This template uses MongoDB change streams. It doesn't support BigQuery change data capture.
Template parameters
Required parameters
- mongoDbUri : The MongoDB connection URI in the format
mongodb+srv://:@.
. - database : Database in MongoDB to read the collection from. (Example: my-db).
- collection : Name of the collection inside MongoDB database. (Example: my-collection).
- userOption :
FLATTEN
,JSON
, orNONE
.FLATTEN
flattens the documents to the single level.JSON
stores document in BigQuery JSON format.NONE
stores the whole document as a JSON-formatted STRING. Defaults to: NONE. - inputTopic : The Pub/Sub input topic to read from, in the format of projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
- outputTableSpec : The BigQuery table to write to. For example,
bigquery-project:dataset.output_table
.
Optional parameters
- useStorageWriteApiAtLeastOnce : When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to
true
. To use exactly- once semantics, set the parameter tofalse
. This parameter applies only whenuseStorageWriteApi
istrue
. The default value isfalse
. - KMSEncryptionKey : Cloud KMS Encryption Key to decrypt the mongodb uri connection string. If Cloud KMS key is passed in, the mongodb uri connection string must all be passed in encrypted. (Example: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
- filter : Bson filter in json format. (Example: { "val": { $gt: 0, $lt: 9 }}).
- useStorageWriteApi : If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is
false
. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : When using the Storage Write API, specifies the number of write streams. If
useStorageWriteApi
istrue
anduseStorageWriteApiAtLeastOnce
isfalse
, then you must set this parameter. Defaults to: 0. - storageWriteApiTriggeringFrequencySec : When using the Storage Write API, specifies the triggering frequency, in seconds. If
useStorageWriteApi
istrue
anduseStorageWriteApiAtLeastOnce
isfalse
, then you must set this parameter. - bigQuerySchemaPath : The Cloud Storage path for the BigQuery JSON schema. (Example: gs://your-bucket/your-schema.json).
- javascriptDocumentTransformGcsPath : The Cloud Storage URI of the
.js
file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://your-bucket/your-transforms/*.js). - javascriptDocumentTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). (Example: transform).
User-defined function
Optionally, you can extend this template by writing a user-defined function (UDF) in JavaScript. The template calls the UDF for each input element. Element payloads are serialized as JSON strings.
To use a UDF, upload the JavaScript file to Cloud Storage and set the following template parameters:
Parameter | Description |
---|---|
javascriptDocumentTransformGcsPath |
The Cloud Storage location of the JavaScript file. |
javascriptDocumentTransformFunctionName |
The name of the JavaScript function. |
For more information, see Create user-defined functions for Dataflow templates.
Function specification
The UDF has the following specification:
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the MongoDB (CDC) to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: your target BigQuery table name.MONGO_DB_URI
: your MongoDB URI.DATABASE
: your MongoDB database.COLLECTION
: your MongoDB collection.USER_OPTION
: FLATTEN or NONE.INPUT_TOPIC
: your Pub/Sub input topic.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: your target BigQuery table name.MONGO_DB_URI
: your MongoDB URI.DATABASE
: your MongoDB database.COLLECTION
: your MongoDB collection.USER_OPTION
: FLATTEN or NONE.INPUT_TOPIC
: your Pub/Sub input topic.
What's next