Cloud Spanner change streams to Pub/Sub template

Cloud Spanner change streams to Pub/Sub

The Cloud Spanner change streams to the Pub/Sub template is a streaming pipeline that streams Cloud Spanner data change records and writes them into Pub/Sub topics using Dataflow Runner V2.

To output your data to a new Pub/Sub topic, you need to first create the topic. After creation, Pub/Sub automatically generates and attaches a subscription to the new topic. If you try to output data to a Pub/Sub topic that doesn't exist, the dataflow pipeline throws an exception, and the pipeline gets stuck as it continuously tries to make a connection.

If the necessary Pub/Sub topic already exists, you can output data to that topic.

For more information, see About change streams, Build change streams connections with Dataflow, and Change streams best practices.

Pipeline requirements

  • The Cloud Spanner instance must exist before running the pipeline.
  • The Cloud Spanner database must exist prior to running the pipeline.
  • The Cloud Spanner metadata instance must exist prior to running the pipeline.
  • The Cloud Spanner metadata database must exist prior to running the pipeline.
  • The Cloud Spanner change stream must exist prior to running the pipeline.
  • The Pub/Sub topic must exist prior to running the pipeline.

Template parameters

Parameter Description
spannerInstanceId The Cloud Spanner instance to read change streams from.
spannerDatabase The Cloud Spanner database to read change streams from.
spannerMetadataInstanceId The Cloud Spanner instance to use for the change streams connector metadata table.
spannerMetadataDatabase The Cloud Spanner database to use for the change streams connector metadata table.
spannerChangeStreamName The name of the Cloud Spanner change stream to read from.
pubsubTopic The Pub/Sub topic for change streams output.
spannerProjectId (Optional) The project to read change streams from. This is also the project where the change streams connector metadata table is created. The default for this parameter is the project where the Dataflow pipeline is running.
spannerMetadataTableName (Optional) The Cloud Spanner change streams connector metadata table name to use. If not provided, Cloud Spanner automatically creates the streams connector metadata table during the pipeline flow change. You must provide this parameter when updating an existing pipeline. Don't use this parameter for other cases.
rpcPriority (Optional) The request priority for Cloud Spanner calls. The value must be one of: [HIGH,MEDIUM,LOW]. (Default: HIGH)
startTimestamp (Optional) The starting DateTime, inclusive, to use for reading change streams. For example, ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, that is, the current time.
endTimestamp (Optional) The ending DateTime, inclusive, to use for reading change streams. For example, ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future.
outputFileFormat (Optional) The format of the output. Output is wrapped in many PubsubMessages and sent to a Pub/Sub topic. Allowed formats are JSON and AVRO. Default is JSON.
pubsubAPI (Optional) Pub/Sub API used to implement the pipeline. Allowed APIs are pubsubio and native_client. For a small number of queries per second (QPS), native_client has less latency. For a large number of QPS, pubsubio provides better and more stable performance. The default is pubsubio.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to Pub/Sub template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • PUBSUB_TOPIC: The Pub/Sub topic for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • PUBSUB_TOPIC: The Pub/Sub topic for change streams output