This template creates a streaming pipeline that works with MongoDB change streams. To use this template, publish the change stream data to Pub/Sub. The pipeline reads the JSON records from Pub/Sub and writes them to BigQuery. The records written to BigQuery have the same format as the MongoDB to BigQuery batch template.
Pipeline requirements
- The target BigQuery dataset must exist.
- The source MongoDB instance must be accessible from the Dataflow worker machines.
- You must create a Pub/Sub topic to read the change stream. While the pipeline is running, listen for change data capture (CDC) events on the MongoDB change stream and publish them to Pub/Sub as JSON records. For more information about publishing messages to Pub/Sub, see Publish messages to topics.
Template parameters
Parameter | Description |
---|---|
mongoDbUri |
MongoDB connection URI in the format mongodb+srv://:@ . |
database |
Database in MongoDB to read the collection from. For example: my-db . |
collection |
Name of the collection inside MongoDB database. For example: my-collection . |
outputTableSpec |
BigQuery table to write to. For example, bigquery-project:dataset.output_table . |
userOption |
FLATTEN or NONE . FLATTEN flattens the documents to the first level. NONE stores the whole document as a JSON string. |
inputTopic |
The Pub/Sub input topic to read from, in the format of projects/<project>/topics/<topic> . |
javascriptDocumentTransformGcsPath |
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js .
|
javascriptDocumentTransformFunctionName |
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ } , then the function name is
myTransform . For sample JavaScript UDFs, see
UDF Examples.
|
useStorageWriteApi |
(Optional)
If true , the pipeline uses the
BigQuery Storage Write API. The default value is false . For more information, see
Using the Storage Write API.
|
useStorageWriteApiAtLeastOnce |
(Optional)
When using the Storage Write API, specifies the write semantics. To use
at-least-once semantics, set this parameter to true . To use exactly-once semantics,
set the parameter to false . This parameter applies only when
useStorageWriteApi is true . The default value is false .
|
numStorageWriteApiStreams |
(Optional)
When using the Storage Write API, specifies the number of write streams. If
useStorageWriteApi is true and useStorageWriteApiAtLeastOnce
is false , then you must set this parameter.
|
storageWriteApiTriggeringFrequencySec |
(Optional)
When using the Storage Write API, specifies the triggering frequency, in seconds. If
useStorageWriteApi is true and useStorageWriteApiAtLeastOnce
is false , then you must set this parameter.
|
User-defined function
Optionally, you can extend this template by writing a user-defined function (UDF) in JavaScript. The template calls the UDF for each input element. Element payloads are serialized as JSON strings.
To use a UDF, upload the JavaScript file to Cloud Storage and set the following template parameters:
Parameter | Description |
---|---|
javascriptDocumentTransformGcsPath |
The Cloud Storage location of the JavaScript file. |
javascriptDocumentTransformFunctionName |
The name of the JavaScript function. |
For more information, see Create user-defined functions for Dataflow templates.
Function specification
The UDF has the following specification:
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the MongoDB to BigQuery (CDC) template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: your target BigQuery table name.MONGO_DB_URI
: your MongoDB URI.DATABASE
: your MongoDB database.COLLECTION
: your MongoDB collection.USER_OPTION
: FLATTEN or NONE.INPUT_TOPIC
: your Pub/Sub input topic.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: your target BigQuery table name.MONGO_DB_URI
: your MongoDB URI.DATABASE
: your MongoDB database.COLLECTION
: your MongoDB collection.USER_OPTION
: FLATTEN or NONE.INPUT_TOPIC
: your Pub/Sub input topic.