Manage streams

Overview

In this section, you learn how to use the Datastream API to:

  • Validate and create streams
  • Get information about streams
  • Retrieve errors from streams
  • Update streams by starting, pausing, resuming, and modifying them
  • Delete streams

There are two ways that you can use the Datastream API. You can make REST API calls or you can use the gcloud command-line tool (CLI).

To see high-level information about using gcloud to manage Datastream streams, click here.

Validate a stream

Before creating a stream, you can validate it. This way, you can ensure that the stream will run successfully, and that all validation checks pass.

Validating a stream checks:

  • Whether the source is configured properly to allow Datastream to stream data from it.
  • Whether the stream can connect to both the source and the destination.
  • The end-to-end configuration of the stream.

The following code shows a request to validate a stream that's used to transfer data from a source Oracle database to a destination bucket in Cloud Storage.

REST

POST "https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

The &validate_only=true value indicates that you're only validating the stream; you're not creating it. Also, for this request, put the entire URL in quotation marks. This ensures that Datastream will pick up the &validate_only=true value to validate the stream.

After you make this request, you'll see the validation checks that Datastream runs for your source and destination, along with whether the checks pass or fail. For any validation check that doesn't pass, information appears as to why it failed and what to do to rectify the problem.

After making the appropriate corrections, make the request again to ensure that all validation checks pass.

Create a stream

The following code shows a request to create a stream that's used to transfer data from a source Oracle database to a destination bucket in Cloud Storage.

REST

POST https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

For example, here's a request to pull all tables from schema1 and two specific tables from schema3: tableA and tableC.

The events are written to a bucket in Cloud Storage in Avro format, and a new file will be created every 100 MB or 30 seconds (overriding the default values of 50 MB and 60 seconds).

The backfillAll parameter is associated with historical backfill. By setting this parameter to an empty dictionary ({}), Datastream will backfill:

  • Historical data, in addition to ongoing changes to the data, from the source database into the destination.
  • Schemas and tables, from the source into the destination.
POST https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "backfillAll": {}
}

Get information about a stream

The following code shows a request to retrieve information about a stream. This information includes:

  • The stream's name that's recognized by Datastream
  • A user-friendly name for the stream (the display name)
  • Date-and-time stamps of when the stream was created and updated
  • Information about the source and destination connection profiles associated with the stream
  • The stream's state

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]

For example:

GET https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

The response appears, as follows:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "backfillAll": {}
}

gcloud

For more information on using gcloud to retrieve information about your stream, click here.

List streams

The following code shows a request to retrieve information about all of your streams.

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams

gcloud

For more information on using gcloud to retrieve information about all of your streams, click here.

Retrieve errors from a stream

The following code shows a request to retrieve a list of errors associated with a stream not working properly.

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]:fetchStreamErrors

For example:

GET https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream:fetchStreamErrors

Update a stream

Start a stream

The following code shows a request to start a stream.

By using the updateMask parameter in the request, only the fields that you specify have to be included in the body of the request.

For this example, the field specified is the state field, which represents the stream's status (or state). By starting the stream, you're changing its state from CREATED to RUNNING.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

For example:

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Pause a stream

The following code shows a request to pause a running stream.

For this example, the field specified for the updateMask parameter is the state field. By pausing the stream, you're changing its state from RUNNING to PAUSED.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

For example:

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

Resume a stream

The following code shows a request to resume a paused stream.

For this example, the field specified for the updateMask parameter is the state field. By resuming the stream, you're changing its state from PAUSED to RUNNING.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

For example:

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Modify a stream

The following code shows a request to update the file rotation configuration of a stream to rotate the file every 75 MB or 45 seconds.

For this example, the fields specified for the updateMask parameter include the fileRotationMb and fileRotationInterval fields, represented by the destinationConfig.gcsDestinationConfig.fileRotationMb and destinationConfig.gcsDestinationConfig.fileRotationInterval flags, respectively.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

For example:

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

The following code shows a request to include a Unified Types schema file in the path of files that Datastream writes to Cloud Storage. As a result, Datastream writes two files: a JSON data file and an Avro schema file.

For this example, the field specified is the jsonFileFormat field, represented by the destinationConfig.gcsDestinationConfig.jsonFileFormat flag.

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

For example:

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

The following code shows a request for Datastream to replicate existing data, in addition to ongoing changes to the data, from the source database into the destination.

The oracleExcludedObjects section of the code shows those tables and schemas that are restricted from being backfilled into the destination.

For this example, all tables and schemas will be backfilled, except for tableA in schema3.

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}  

For example:

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
} 

Delete a stream

The following code shows a request to delete a stream.

REST

DELETE https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]

For example:

DELETE https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

For more information on using gcloud to delete your stream, click here.