Spanner Change Streams to Source Database template

Streaming pipeline. Reads data from Spanner Change Streams and writes them to a source.

Template parameters

Parameter Description
changeStreamName The name of the Spanner change stream that the pipeline reads from.
instanceId The name of the Spanner instance where the change stream is present.
databaseId The name of the Spanner database that the change stream monitors.
spannerProjectId The name of the Spanner project.
metadataInstance The instance to store the metadata used by the connector to control the consumption of the change stream API data.
metadataDatabase The database to store the metadata used by the connector to control the consumption of the change stream API data.
sourceShardsFilePath Path to a Cloud Storage file containing connection profile information for source shards.
startTimestamp Optional: The starting timestamp for reading changes. Defaults to empty.
endTimestamp Optional: The end timestamp for reading changes. If no timestamp provided, reads indefinitely. Defaults to empty.
shadowTablePrefix Optional: The prefix used to name shadow tables. Default: shadow_.
sessionFilePath Optional: Session path in Cloud Storage that contains mapping information from HarbourBridge.
filtrationMode Optional: Mode of filtration. Specifies how to drop certain records based on a criteria. Supported modes are: none (filter nothing), forward_migration (filter records written using the forward migration pipeline). Defaults to forward_migration.
shardingCustomJarPath Optional: Custom JAR file location in Cloud Storage that contains the customization logic for fetching the shard id. If you set this parameter, set the shardingCustomJarPath parameter. Defaults to empty.
shardingCustomClassName Optional: Fully qualified class name having the custom shard id implementation. If shardingCustomJarPath is specified, this parameter is required. Defaults to empty.
shardingCustomParameters Optional: String containing any custom parameters to be passed to the custom sharding class. Defaults to empty.
sourceDbTimezoneOffset Optional: The timezone offset from UTC for the source database. Example value: +10:00. Defaults to: +00:00.
dlqGcsPubSubSubscription Optional: The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ retry directory when running in regular mode. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>. When set, the deadLetterQueueDirectory and dlqRetryMinutes are ignored.
skipDirectoryName Optional: Records skipped from reverse replication are written to this directory. Default directory name is skip.
maxShardConnections Optional: The maximum number of connections that a given shard can accept. Defaults to: 10000.
deadLetterQueueDirectory Optional: The path used when storing the error queue output. The default path is a directory under the Dataflow job's temp location.
dlqMaxRetryCount Optional: The maximum number of times that temporary errors can be retried through the dead-letter queue. Defaults to 500.
runMode Optional: The run mode type. Supported values: regular, retryDLQ. Default: regular. Specify retryDLQ is retry severe dead-letter queue records only.
dlqRetryMinutes Optional: The number of minutes between dead-letter queue retries. Defaults to 10.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Spanner Change Streams to Source Database template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud CLI

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • CHANGE_STREAM_NAME: the Name of the change stream to read from
  • INSTANCE_ID: the Cloud Spanner Instance Id.
  • DATABASE_ID: the Cloud Spanner Database Id.
  • SPANNER_PROJECT_ID: the Cloud Spanner Project Id.
  • METADATA_INSTANCE: the Cloud Spanner Instance to store metadata when reading from changestreams
  • METADATA_DATABASE: the Cloud Spanner Database to store metadata when reading from changestreams
  • SOURCE_SHARDS_FILE_PATH: the Path to GCS file containing the Source shard details

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • CHANGE_STREAM_NAME: the Name of the change stream to read from
  • INSTANCE_ID: the Cloud Spanner Instance Id.
  • DATABASE_ID: the Cloud Spanner Database Id.
  • SPANNER_PROJECT_ID: the Cloud Spanner Project Id.
  • METADATA_INSTANCE: the Cloud Spanner Instance to store metadata when reading from changestreams
  • METADATA_DATABASE: the Cloud Spanner Database to store metadata when reading from changestreams
  • SOURCE_SHARDS_FILE_PATH: the Path to GCS file containing the Source shard details