Spanner change streams to Cloud Storage template

The Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner v2.

The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.

Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Spanner instance or Cloud Storage bucket. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. See more about Dataflow regions.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Spanner instance must exist prior to running the pipeline.
  • The Spanner database must exist prior to running the pipeline.
  • The Spanner metadata instance must exist prior to running the pipeline.
  • The Spanner metadata database must exist prior to running the pipeline.
  • The Spanner change stream must exist prior to running the pipeline.
  • The Cloud Storage output bucket must exist prior to running the pipeline.

Template parameters

Required parameters

  • spannerInstanceId : The Spanner instance ID to read change streams data from.
  • spannerDatabase : The Spanner database to read change streams data from.
  • spannerMetadataInstanceId : The Spanner instance ID to use for the change streams connector metadata table.
  • spannerMetadataDatabase : The Spanner database to use for the change streams connector metadata table.
  • spannerChangeStreamName : The name of the Spanner change stream to read from.
  • gcsOutputDirectory : The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters. (Example: gs://your-bucket/your-path).

Optional parameters

  • spannerProjectId : The ID of the Google Cloud project that contains the Spanner database to read change streams from. This project is also where the change streams connector metadata table is created. The default for this parameter is the project where the Dataflow pipeline is running.
  • spannerDatabaseRole : The Spanner database role to use when running the template. This parameter is required only when the IAM principal who is running the template is a fine-grained access control user. The database role must have the SELECT privilege on the change stream and the EXECUTE privilege on the change stream's read function. For more information, see Fine-grained access control for change streams (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : The Spanner change streams connector metadata table name to use. If not provided, a Spanner change streams metadata table is automatically created during pipeline execution. You must provide a value for this parameter when updating an existing pipeline. Otherwise, don't use this parameter.
  • startTimestamp : The starting DateTime, inclusive, to use for reading change streams, in the format Ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, that is, the current time.
  • endTimestamp : The ending DateTime, inclusive, to use for reading change streams. For example, Ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future.
  • spannerHost : The Cloud Spanner endpoint to call in the template. Only used for testing. (Example: https://spanner.googleapis.com). Defaults to: https://spanner.googleapis.com.
  • outputFileFormat : The format of the output Cloud Storage file. Allowed formats are TEXT and AVRO. Defaults to AVRO.
  • windowDuration : The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to 5m (five minutes), with a minimum of 1s (one second). Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for minutes, example: 12m), [int]h (for hours, example: 2h). (Example: 5m).
  • rpcPriority : The request priority for Spanner calls. The value must be HIGH, MEDIUM, or LOW. Defaults to HIGH.
  • outputFilenamePrefix : The prefix to place on each windowed file. (Example: output-). Defaults to: output.
  • numShards : The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. Defaults to: 20.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to Google Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • GCS_OUTPUT_DIRECTORY: File location for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • GCS_OUTPUT_DIRECTORY: File location for change streams output

What's next