Spanner change streams to Cloud Storage template

The Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner v2.

The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.

Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Spanner instance or Cloud Storage bucket. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. See more about Dataflow regions.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Spanner instance must exist prior to running the pipeline.
  • The Spanner database must exist prior to running the pipeline.
  • The Spanner metadata instance must exist prior to running the pipeline.
  • The Spanner metadata database must exist prior to running the pipeline.
  • The Spanner change stream must exist prior to running the pipeline.
  • The Cloud Storage output bucket must exist prior to running the pipeline.

Template parameters

Parameter Description
spannerInstanceId The Spanner instance ID to read change streams data from.
spannerDatabase The Spanner database to read change streams data from.
spannerDatabaseRole (Optional) The Spanner database role to use when running the template. This parameter is required only when the IAM principal who is running the template is a fine-grained access control user. The database role must have the SELECT privilege on the change stream and the EXECUTE privilege on the change stream's read function. For more information, see Fine-grained access control for change streams.
spannerMetadataInstanceId The Spanner instance ID to use for the change streams connector metadata table.
spannerMetadataDatabase The Spanner database to use for the change streams connector metadata table.
spannerChangeStreamName The name of the Spanner change stream to read from.
gcsOutputDirectory The file location for change streams output in Cloud Storage in the format: 'gs://${BUCKET}/${ROOT_PATH}/'.
outputFilenamePrefix (Optional) The filename prefix of the files to write to. Default file prefix is set to "output".
spannerProjectId (Optional) Project to read change streams from. This is also the project where the change streams connector metadata table is created. The default for this parameter is the project where the Dataflow pipeline is running.
startTimestamp (Optional) The starting DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, i.e. the current time.
endTimestamp (Optional) The ending DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future.
outputFileFormat (Optional) The format of the output Cloud Storage file. Allowed formats are TEXT, AVRO. Default is AVRO.
windowDuration (Optional) The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to 5m, with a minimum of 1s. Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for minutes, example: 12m), [int]h (for hours, example: 2h).
rpcPriority (Optional) The request priority for Spanner calls. The value must be one of:[HIGH,MEDIUM,LOW]. (Default: HIGH)
numShards (Optional) The maximum number of output shards produced when writing. Default number is 20. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files
spannerMetadataTableName (Optional) The Spanner change streams connector metadata table name to use. If not provided, a Spanner change streams metadata table will automatically be created during the pipeline flow. This parameter must be provided when updating an existing pipeline and shouldn't be provided otherwise.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to Google Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • GCS_OUTPUT_DIRECTORY: File location for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • GCS_OUTPUT_DIRECTORY: File location for change streams output

What's next