Datastream to Spanner template

The Datastream to Spanner template is a streaming pipeline that reads Datastream events from a Cloud Storage bucket and writes them to a Spanner database. It is intended for data migration from Datastream sources to Spanner.

All tables required for migration must exist in the destination Spanner database prior to template execution. Hence schema migration from a source database to destination Spanner must be completed prior to data migration. Data can exist in the tables prior to migration. This template does not propagate Datastream schema changes to the Spanner database.

Data consistency is guaranteed only at the end of migration when all data has been written to Spanner. To store ordering information for each record written to Spanner, this template creates an additional table (called a shadow table) for each table in the Spanner database. This is used to ensure consistency at the end of migration. The shadow tables are not deleted after migration and can be used for validation purposes at the end of migration.

Any errors that occur during operation, such as schema mismatches, malformed JSON files, or errors resulting from executing transforms, are recorded in an error queue. The error queue is a Cloud Storage folder which stores all the Datastream events that had encountered errors along with the error reason in text format. The errors can be transient or permanent and are stored in appropriate Cloud Storage folders in the error queue. The transient errors are retried automatically while the permanent errors are not. In case of permanent errors, you have the option of making corrections to the change events and moving them to the retriable bucket while the template is running.

Pipeline requirements

  • A Datastream stream in Running or Not started state.
  • A Cloud Storage bucket where Datastream events are replicated.
  • A Spanner database with existing tables. These tables can be empty or contain data.

Template parameters

Parameter Description
inputFilePattern The file location for Datastream files in Cloud Storage to replicate. Typically, this is the root path for a stream.
streamName The name or template for the stream to poll for schema information and source type.
instanceId The Spanner instance where the changes are replicated.
databaseId The Spanner database where the changes are replicated.
projectId The Spanner project ID.
deadLetterQueueDirectory (Optional) This is the file path to store the error queue output. Default is a directory under the Dataflow job's temp location.
inputFileFormat (Optional) The format of the output file produced by Datastream. For example avro,json. Default, avro.
shadowTablePrefix (Optional) The prefix used to name shadow tables. Default: shadow_.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Datastream to Spanner template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: your Spanner instance.
  • CLOUDSPANNER_DATABASE: your Spanner database.
  • DLQ: the Cloud Storage path for the error queue directory.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: your Spanner instance.
  • CLOUDSPANNER_DATABASE: your Spanner database.
  • DLQ: the Cloud Storage path for the error queue directory.

What's next