Manage streams

Stay organized with collections Save and categorize content based on your preferences.

Overview

In this section, you learn how to use the Datastream API to:

  • Validate and create streams
  • Get information about streams and objects of streams
  • Update streams by starting, pausing, resuming, and modifying them, as well as by initiating and stopping backfill for objects of the streams
  • Delete streams

There are two ways that you can use the Datastream API. You can make REST API calls or you can use the Google Cloud CLI (CLI).

To see high-level information about using gcloud to manage Datastream streams, click here.

Validate a stream

Before creating a stream, you can validate it. This way, you can ensure that the stream will run successfully, and that all validation checks pass.

Validating a stream checks:

  • Whether the source is configured properly to allow Datastream to stream data from it.
  • Whether the stream can connect to both the source and the destination.
  • The end-to-end configuration of the stream.

The following code shows a request to validate a stream that's used to transfer data from a source Oracle database to a destination bucket in Cloud Storage.

REST

POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {  "customerManagedEncryptionKey": "projects/datastream-con-ronteller/locations/us-central1/keyRings/ringy/cryptoKeys/nicekey",
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}",
      "fileRotationMb": MBytes,
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

The &validate_only=true value indicates that you're only validating the stream; you're not creating it. Also, for this request, put the entire URL in quotation marks. This ensures that Datastream will pick up the &validate_only=true value to validate the stream.

After you make this request, you'll see the validation checks that Datastream runs for your source and destination, along with whether the checks pass or fail. For any validation check that doesn't pass, information appears as to why it failed and what to do to rectify the problem.

For example, suppose you have a customer-managed encryption key (CMEK) that you want Datastream to use to encrypt data that's streamed from the source to the destination.

As part of validating the stream, Datastream will verify that the key exists, and that Datastream has permissions to use the key.

If either of these conditions aren't met, then when you validate the stream, the following error message will be returned:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

To resolve this issue, verify that the key that you provided exists, and that the Datastream service account has the cloudkms.cryptoKeys.get permission for the key.

After making the appropriate corrections, make the request again to ensure that all validation checks pass. For the example above, the CMEK_VALIDATE_PERMISSIONS check will no longer return an error message, but will have a status of PASSED.

gcloud

For more information on using gcloud to validate a stream, click here.

Create a stream

Create a stream from a source Oracle database

The following code shows a request to create a stream that's used to transfer data from a source Oracle database to a destination bucket in Cloud Storage.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

For example, here's a request to pull all tables from schema1 and two specific tables from schema3: tableA and tableC.

The events are written to a bucket in Cloud Storage in Avro format, and a new file will be created every 100 MB or 30 seconds (overriding the default values of 50 MB and 60 seconds).

All data stored within Google Cloud is encrypted at rest using the same hardened key management systems that we use for our own encrypted data. These key-management systems provide strict key access controls and auditing, and encrypt user data at rest using AES-256 encryption standards. No setup, configuration, or management is required. Google Cloud's default encryption at rest is the best choice for users who don't have specific requirements related to compliance or locality of cryptographic material.

If you need more control over the keys used to encrypt data at rest within a Google Cloud project, then Datastream offers the ability to protect your data using encryption keys managed by you within Cloud Key Management Service (KMS). These encryption keys are called customer-managed encryption keys (CMEK). When you protect data in Datastream with CMEK, the CMEK is within your control.

The customerManagedEncryptionKey parameter is associated with having a CMEK that Datastream can use to encrypt data that's streamed from the source to the destination. The CMEK is represented by the [customer-managed-encryption-key] placeholder.

The [ring] placeholder represents the key ring for your CMEK. A key ring organizes keys in a specific Google Cloud location and allows you to manage access control on groups of keys. A key ring's name doesn't need to be unique across a Google Cloud project, but must be unique within a given location. For more information about key rings, see Cloud KMS resources.

As part of creating the stream, Datastream will verify that the CMEK exists, and that Datastream has permissions to use the key. For more information about these checks, see Validate a stream.

If you prefer to use Google Cloud's internal key management system instead of a CMEK to encrypt your data, then don't include the customerManagedEncryptionKey parameter and value in your API request.

The backfillAll parameter is associated with historical backfill. By setting this parameter to an empty dictionary ({}), Datastream will backfill:

  • Historical data, in addition to ongoing changes to the data, from the source database into the destination.
  • Schemas and tables, from the source into the destination.
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

gcloud

For more information on using gcloud to create a stream, click here.

Create a stream from a source PostgreSQL database

The following code shows a request to create a stream that's used to transfer data from a source PostgreSQL database to a destination bucket in Cloud Storage.

When creating a stream from a source PostgreSQL database, you need to specify two additional, PostgreSQL-specific fields in your request:

  • replicationSlot: A replication slot is a prerequisite for configuring a PostgreSQL database for replication. You need to create a replication slot for each stream.

  • publication: A publication is a group of tables that you want to replicate changes from. The publication name must exist in the database before starting a stream. At a minimum, the publication has to include the tables specified in the stream's includeObjects list.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "postgresqlSourceConfig": {
      "replicationSlot": "[replicationSlotName]",
      "publication": "[publicationName]",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "[schemaName]",
          "postgresqlTables": {
            "table: "[tableName]"
          }
        }
      }
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

For example:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "public",
          "postgresqlTables": {
            "table: "tableA"
          }
        }
      }
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/gcsCp",
    "gcsDestinationConfig": {
      "avroFileFormat": "{}"
      "fileRotationMb": 31
      "fileRotationInterval": 100
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

gcloud

For more information on using gcloud to create a stream, click here.

Get information about a stream

The following code shows a request to retrieve information about a stream. This information includes:

  • The stream's name that's recognized by Datastream
  • A user-friendly name for the stream (the display name)
  • Date-and-time stamps of when the stream was created and updated
  • Information about the source and destination connection profiles associated with the stream
  • The stream's state

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

For example:

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

The response appears, as follows:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

gcloud

For more information on using gcloud to retrieve information about your stream, click here.

List streams

The following code shows a request to retrieve information about all of your streams.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams

gcloud

For more information on using gcloud to retrieve information about all of your streams, click here.

List objects of a stream

The following code shows a request to retrieve information about all objects of a stream.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects

For example:

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream/objects

gcloud

For more information on using gcloud to retrieve information about all objects of your stream, click here.

The list of objects that's returned may look similar to the following:

REST

{
  "streamObjects": [
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

Update a stream

Start a stream

The following code shows a request to start a stream.

By using the updateMask parameter in the request, only the fields that you specify have to be included in the body of the request.

For this example, the field specified is the state field, which represents the stream's status (or state). By starting the stream, you're changing its state from CREATED to RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

gcloud

For more information on using gcloud to start your stream, click here.

Pause a stream

The following code shows a request to pause a running stream.

For this example, the field specified for the updateMask parameter is the state field. By pausing the stream, you're changing its state from RUNNING to PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

gcloud

For more information on using gcloud to pause your stream, click here.

Resume a stream

The following code shows a request to resume a paused stream.

For this example, the field specified for the updateMask parameter is the state field. By resuming the stream, you're changing its state from PAUSED to RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

gcloud

For more information on using gcloud to resume your stream, click here.

Modify a stream

The following code shows a request to update the file rotation configuration of a stream to rotate the file every 75 MB or 45 seconds.

For this example, the fields specified for the updateMask parameter include the fileRotationMb and fileRotationInterval fields, represented by the destinationConfig.gcsDestinationConfig.fileRotationMb and destinationConfig.gcsDestinationConfig.fileRotationInterval flags, respectively.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

The following code shows a request to include a Unified Types schema file in the path of files that Datastream writes to Cloud Storage. As a result, Datastream writes two files: a JSON data file and an Avro schema file.

For this example, the field specified is the jsonFileFormat field, represented by the destinationConfig.gcsDestinationConfig.jsonFileFormat flag.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

The following code shows a request for Datastream to replicate existing data, in addition to ongoing changes to the data, from the source database into the destination.

The oracleExcludedObjects section of the code shows those tables and schemas that are restricted from being backfilled into the destination.

For this example, all tables and schemas will be backfilled, except for tableA in schema3.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}  

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
} 

gcloud

For more information on using gcloud to modify your stream, click here.

Initiate backfill for an object of a stream

A stream in Datastream can backfill historical data, as well as stream ongoing changes into a destination. Ongoing changes will always be streamed from a source into a destination. However, you can specify whether you want historical data to be streamed.

If you want historical data to be streamed from the source into the destination, then use the backfillAll parameter.

Datastream also lets you stream historical data only for specific database tables. To do this, use the backfillAll parameter, and exclude the tables for which you don't want historical data.

If you want only ongoing changes to be streamed into the destination, then use the backfillNone parameter. If you then want Datastream to stream a snapshot of all existing data from the source to the destination, you must initiate backfill manually for the objects that contain this data.

Another reason for initiating backfill for an object is if data is out of sync between the source and the destination. For example, a user can delete data in the destination inadvertently, and the data is now lost. In this case, initiating backfill for the object serves as a "reset mechanism" because all data is streamed into the destination in one shot. As a result, the data is synced between the source and the destination.

Before you can initiate backfill for an object of a stream, you must retrieve information about the object.

Each object has an [object-id], which uniquely identifies the object. You use the [object-id] to initiate backfill for the stream.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:startBackfillJob

gcloud

For more information on using gcloud to initial backfill for an object of your stream, click here.

Stop backfill for an object of a stream

After initiating backfill for an object of a stream, you can stop backfill for the object. For example, if a user modifies a database schema, then the schema or data may be corrupted. You don't want this schema or data to be streamed into the destination, and so you stop backfill for the object.

You can also stop backfill for an object for load balancing purposes. Datastream can run multiple backfills in parallel. This may put an additional load on the source. If the load is significant, stop backfill for each object, and then initiate backfill for the objects, one by one.

Before you can stop backfill for an object of a stream, you must make a request to retrieve information about all objects of a stream. Each object that's returned has an [object-id], which uniquely identifies the object. You use the [object-id] to stop backfill for the stream.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob

gcloud

For more information on using gcloud to stop backfill for an object of your stream, click here.

Change the number of maximum concurrent CDC tasks

The following code shows how to set the number of maximum concurrent change data capture (CDC) tasks for a MySQL stream to 7.

For this example, the field specified for the updateMask parameter is the maxConcurrentCdcTasks field. By setting its value to 7, you're changing the number of maximum concurrent CDC tasks from the previous value to 7. You can use values from 0 to 50 (inclusive). If you don't define the value, or if you define it as 0, the system default is set for the stream. Currently, the system default value is 5.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

For example:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

For more information on using gcloud, click here.

Delete a stream

The following code shows a request to delete a stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

For example:

DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

For more information on using gcloud to delete your stream, click here.