MongoDB to BigQuery template (Stream)

This template creates a streaming pipeline that works with MongoDB change streams. To use this template, publish the change stream data to Pub/Sub. The pipeline reads the JSON records from Pub/Sub and writes them to BigQuery. The records written to BigQuery have the same format as the MongoDB to BigQuery batch template.

Pipeline requirements

  • The target BigQuery dataset must exist.
  • The source MongoDB instance must be accessible from the Dataflow worker machines.
  • You must create a Pub/Sub topic to read the change stream. While the pipeline is running, listen for change data capture (CDC) events on the MongoDB change stream and publish them to Pub/Sub as JSON records. For more information about publishing messages to Pub/Sub, see Publish messages to topics.
  • This template uses MongoDB change streams. It doesn't support BigQuery change data capture.

Template parameters

Required parameters

  • mongoDbUri: The MongoDB connection URI in the format mongodb+srv://:@..
  • database: Database in MongoDB to read the collection from. For example, my-db.
  • collection: Name of the collection inside MongoDB database. For example, my-collection.
  • userOption: FLATTEN, JSON, or NONE. FLATTEN flattens the documents to the single level. JSON stores document in BigQuery JSON format. NONE stores the whole document as a JSON-formatted STRING. Defaults to: NONE.
  • inputTopic: The Pub/Sub input topic to read from, in the format of projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputTableSpec: The BigQuery table to write to. For example, bigquery-project:dataset.output_table.

Optional parameters

  • useStorageWriteApiAtLeastOnce: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly- once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
  • KMSEncryptionKey: Cloud KMS Encryption Key to decrypt the mongodb uri connection string. If Cloud KMS key is passed in, the mongodb uri connection string must all be passed in encrypted. For example, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: Bson filter in json format. For example, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
  • bigQuerySchemaPath: The Cloud Storage path for the BigQuery JSON schema. For example, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). For example, transform.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF) in JavaScript. The template calls the UDF for each input element. Element payloads are serialized as JSON strings.

To use a UDF, upload the JavaScript file to Cloud Storage and set the following template parameters:

ParameterDescription
javascriptDocumentTransformGcsPath The Cloud Storage location of the JavaScript file.
javascriptDocumentTransformFunctionName The name of the JavaScript function.

For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: a MongoDB document.
  • Output: an object serialized as a JSON string.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

      For a list of regions where you can run a Dataflow job, see Dataflow locations.

    5. From the Dataflow template drop-down menu, select the MongoDB (CDC) to BigQuery template.
    6. In the provided parameter fields, enter your parameter values.
    7. Click Run job.

    gcloud

    In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • OUTPUT_TABLE_SPEC: your target BigQuery table name.
    • MONGO_DB_URI: your MongoDB URI.
    • DATABASE: your MongoDB database.
    • COLLECTION: your MongoDB collection.
    • USER_OPTION: FLATTEN, JSON, or NONE.
    • INPUT_TOPIC: your Pub/Sub input topic.

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • OUTPUT_TABLE_SPEC: your target BigQuery table name.
    • MONGO_DB_URI: your MongoDB URI.
    • DATABASE: your MongoDB database.
    • COLLECTION: your MongoDB collection.
    • USER_OPTION: FLATTEN, JSON, or NONE.
    • INPUT_TOPIC: your Pub/Sub input topic.

    What's next