Datastream to BigQuery (Stream) template

The Datastream to BigQuery template is a streaming pipeline that reads Datastream data and replicates it into BigQuery. The template reads data from Cloud Storage using Pub/Sub notifications and replicates it into a time partitioned BigQuery staging table. Following replication, the template executes a MERGE in BigQuery to upsert all change data capture (CDC) changes into a replica of the source table.

The template handles creating and updating the BigQuery tables managed by the replication. When data definition language (DDL) is required, a callback to Datastream extracts the source table schema and translates it into BigQuery data types. Supported operations include the following:

  • New tables are created as data is inserted.
  • New columns are added to BigQuery tables with null initial values.
  • Dropped columns are ignored in BigQuery and future values are null.
  • Renamed columns are added to BigQuery as new columns.
  • Type changes are not propagated to BigQuery.

It's recommended to run this pipeline using at-least-once streaming mode, because the template performs de-duplication when it merges data from a temporary BigQuery table to the main BigQuery table. This step in the pipeline means there is no additional benefit to using exactly-once streaming mode.

Pipeline requirements

  • A Datastream stream that is ready to or already replicating data.
  • Cloud Storage Pub/Sub notifications are enabled for the Datastream data.
  • BigQuery destination datasets are created and the Compute Engine Service Account has been granted admin access to them.
  • A primary key is necessary in the source table for the destination replica table to be created.
  • A MySQL or Oracle source database. PostgreSQL databases are not supported.

Template parameters

Parameter Description
inputFilePattern The file location for Datastream files in Cloud Storage to replicate. This file location is typically the root path for the stream.
gcsPubSubSubscription The Pub/Sub subscription with Datastream file notifications. For example, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat The format of the output file produced by Datastream. For example avro,json. Default, avro.
outputStagingDatasetTemplate The name of an existing dataset to contain staging tables. You can include the template {_metadata_dataset} as a placeholder that is replaced with the name of your source dataset/schema (e.g. {_metadata_dataset}_log).
outputDatasetTemplate The name of an existing dataset to contain replica tables. You can include the template {_metadata_dataset} as a placeholder that is replaced with the name of your source dataset/schema (e.g. {_metadata_dataset}).
deadLetterQueueDirectory The file path to store any unprocessed messages with the reason they failed to be processed. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
outputStagingTableNameTemplate Optional: The template for the name of staging tables. Default is {_metadata_table}_log. If you are replicating multiple schemas, suggested is {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate Optional: The template for the name of replica tables. Default, {_metadata_table}. If you are replicating multiple schemas, suggested is {_metadata_schema}_{_metadata_table}.
outputProjectId Optional: Project for BigQuery datasets to output data into. The default for this parameter is the project where the Dataflow pipeline is running.
streamName Optional: The name or template for the stream to poll for schema information. Default, {_metadata_stream}.
mergeFrequencyMinutes Optional: The number of minutes between merges for a given table. Default, 5.
dlqRetryMinutes Optional: The number of minutes between dead letter queue (DLQ) retries. Default, 10.
javascriptTextTransformGcsPath Optional: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Optional: The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.
useStorageWriteApi Optional: If true, the pipeline uses the BigQuery Storage Write API. The default value is false. For more information, see Using the Storage Write API.
useStorageWriteApiAtLeastOnce Optional: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics, set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
numStorageWriteApiStreams Optional: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
storageWriteApiTriggeringFrequencySec Optional: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
applyMerge Optional: Specifies whether the template executes a MERGE statement in BigQuery after replicating data to the staging table. Default: true.
fileReadConcurrency Optional: The number of Datastream files to read concurrently. Default: 10.
mergeConcurrency Optional: The number of concurrent BigQuery MERGE statements. Default: 30.
partitionRetentionDays Optional: The number of days to use for partition retention when running BigQuery MERGE statements. Default: 1.
rfcStartDateTime Optional: The starting time for reading files from Cloud Storage, as an RFC 3339 date-time value. Default: 1970-01-01T00:00:00.00Z.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the CDC data, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

      For a list of regions where you can run a Dataflow job, see Dataflow locations.

    5. From the Dataflow template drop-down menu, select the Datastream to BigQuery template.
    6. In the provided parameter fields, enter your parameter values.
    7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
    8. Click Run job.

    gcloud

    In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: the Cloud Storage path to Datastream data. For example: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: the Pub/Sub subscription to read changed files from. For example: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: your BigQuery dataset name.
    • BIGQUERY_TABLE: your BigQuery table template. For example, {_metadata_schema}_{_metadata_table}_log

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: the Cloud Storage path to Datastream data. For example: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: the Pub/Sub subscription to read changed files from. For example: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: your BigQuery dataset name.
    • BIGQUERY_TABLE: your BigQuery table template. For example, {_metadata_schema}_{_metadata_table}_log

    What's next