管理数据流

在本页中,您将学习如何使用 Datastream API 执行以下操作:

  • 创建数据流
  • 获取数据流和数据流对象的相关信息
  • 通过启动、暂停、恢复和修改数据流,以及启动和停止数据流对象的回填来更新数据流
  • 恢复永久失败的直播
  • 为 Oracle Streams 启用大型对象流式传输
  • 删除数据流

您可以通过两种方式使用 Datastream API。您可以进行 REST API 调用,也可以使用 Google Cloud CLI (CLI)。

如需查看有关如何使用 Google Cloud CLI 管理 Datastream 数据流的概要信息,请参阅 gcloud CLI Datastream 数据流

创建数据流

在本部分中,您将学习如何创建用于将数据从来源转移到目标位置的数据流。以下示例并非详尽无遗,而是重点介绍 Datastream 的特定功能。如需解决您的特定用例,请将这些示例与 Datastream API 参考文档搭配使用。

本部分介绍以下使用情形:

示例 1:将特定对象流式传输到 BigQuery

在此示例中,您将学习如何:

  • 从 MySQL 流式传输到 BigQuery
  • 在数据流中包含一组对象
  • 将数据流的写入模式定义为仅附加
  • 回填数据流中包含的所有对象

以下请求会从 schema1 拉取所有表,并从 schema2 拉取两个特定表:tableAtableC。这些事件会写入 BigQuery 中的数据集。

该请求不包含 customerManagedEncryptionKey 参数,因此系统会使用 Google Cloud 内部密钥管理系统来加密数据,而不是使用 CMEK。

与执行历史回填(或快照)相关联的 backfillAll 参数设置为一个空字典 ({}),这意味着 Datastream 会回填数据流中包含的所有表的历史数据。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 2:从具有 PostgreSQL 源的数据流中排除特定对象

在此示例中,您将学习如何:

  • 将数据从 PostgreSQL 流式传输到 BigQuery
  • 从数据流中排除对象
  • 从回填中排除对象

以下代码提供了一个请求,它创建一个数据流用于将数据从源 PostgreSQL 数据库转移到 BigQuery。 从源 PostgreSQL 数据库创建数据流时,您需要在请求中指定两个额外的 PostgreSQL 特有字段:

  • replicationSlot:复制槽是配置 PostgreSQL 数据库以进行复制的前提条件。您需要为每个流创建一个复制槽。
  • publication:发布是一组您要复制其更改的表。在开始数据流之前,数据库中必须存在发布内容名称。发布内容至少必须包含数据流的 includeObjects 列表中指定的表。

与执行历史回填(或快照)相关联的 backfillAll 参数设置为排除一个表。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 3:为数据流指定仅追加写入模式

在流式传输到 BigQuery 时,您可以定义写入模式:mergeappendOnly。如需了解详情,请参阅配置写入模式

如果您未在创建流的请求中指定写入模式,则系统会使用默认的 merge 模式。

以下请求展示了如何在创建从 MySQL 到 BigQuery 的数据流时定义 appendOnly 模式。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 4:将数据流式传输到 Cloud Storage 目标位置

在此示例中,您将学习如何:

  • 从 Oracle 流式传输到 Cloud Storage
  • 定义要包含在数据流中的一组对象
  • 定义用于加密静态数据的 CMEK

以下请求展示了如何创建将事件写入 Cloud Storage 中存储桶的数据流。

在此示例请求中,事件以 JSON 输出格式写入,每 100 MB 或每 30 秒创建一个新文件(替换默认值 50 MB 和 60 秒)。

对于 JSON 格式,您可以执行以下操作:

  • 在路径中添加 Unified Types 架构文件。因此,Datastream 会将两个文件写入 Cloud Storage:JSON 数据文件和 Avro 架构文件。架构文件与数据文件同名,扩展名为 .schema

  • 启用 gzip 压缩可使 Datastream 压缩写入 Cloud Storage 的文件。

通过使用 backfillNone 参数,请求指定仅将正在进行的更改流式传输到目标位置,而不进行回填。

该请求指定了客户管理的加密密钥参数,可让您控制用于对 Google Cloud 项目中的静态数据进行加密的密钥。该参数是指 Datastream 用于加密从源流向目标的数据的 CMEK。它还指定了 CMEK 的密钥环。

如需详细了解密钥环,请参阅 Cloud KMS 资源。如需详细了解如何使用加密密钥保护数据,请参阅 Cloud Key Management Service (KMS)

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

如需详细了解如何使用 gcloud 创建数据流,请参阅 Google Cloud SDK 文档

示例 5:将数据流式传输到 BigLake 托管式表

在此示例中,您将学习如何配置数据流,以在 append-only 模式下将数据从 MySQL 数据库复制到 BigLake Iceberg 表。在创建请求之前,请确保您已完成以下步骤:

  • 拥有一个 Cloud Storage 存储桶,用于存储数据
  • 创建 Cloud 资源连接
  • 向 Cloud 资源连接授予对 Cloud Storage 存储桶的访问权限

然后,您可以使用以下请求来创建信息流:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

mysql_source_config.json 源配置文件的内容:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

bl_config.json 配置文件的内容:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

验证数据流的定义

在创建数据流之前,您可以验证其定义。这样,您就可以确保所有验证检查都通过,并且数据流在创建后能够成功运行。

验证数据流检查:

  • 来源是否已正确配置,允许 Datastream 从其中流式传输数据。
  • 数据流是否可以连接到来源和目标位置。
  • 数据流的端到端配置。

如需验证某个信息流,请在请求正文之前的网址中添加 &validate_only=true

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

发出此请求后,您将看到 Datastream 针对来源和目标位置运行的验证检查,以及检查是否通过。对于未通过的验证检查,系统会显示失败原因以及纠正方法。

例如,假设您有一个客户管理的加密密钥 (CMEK),您希望 Datastream 使用它来加密从源流式传输到目标的数据。在验证数据流时,Datastream 将验证密钥是否存在,以及 Datastream 是否有权使用该密钥。如果未满足其中任何一个条件,则当您验证数据流时,将返回以下错误消息:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

如需解决此问题,请验证您提供的密钥是否存在,以及 Datastream 服务账号是否拥有该密钥的 cloudkms.cryptoKeys.get 权限。

进行适当的更正后,请再次发出请求,以确保通过所有验证检查。 对于上述示例,CMEK_VALIDATE_PERMISSIONS 检查将不再返回错误消息,但状态为 PASSED

获取数据流相关信息

以下代码显示了一个检索数据流相关信息的请求。此类信息包括:

  • 数据流的名称(唯一标识符)
  • 数据流的易记名称(显示名称)
  • 数据流的创建时间和上次更新时间的时间戳
  • 与数据流关联的来源和目标连接配置文件的相关信息
  • 数据流的状态

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

系统会显示如下响应:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

如需详细了解如何使用 gcloud 检索有关您的直播的信息,请参阅 Google Cloud SDK 文档

列出数据流

以下代码显示了一个请求,用于检索指定项目和位置中的所有数据流的列表。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

如需详细了解如何使用 gcloud 检索有关所有直播的信息,请参阅 Google Cloud SDK 文档

列出数据流的对象

以下代码显示了一个检索数据流的所有对象相关信息的请求。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

如需详细了解如何使用 gcloud 检索有关您的信息流中所有对象的信息,请参阅 Google Cloud SDK 文档

返回的对象列表可能如下所示:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

如需详细了解如何使用 gcloud 列出流的对象,请参阅 Google Cloud SDK 文档

启动数据流

以下代码显示了一个启动数据流的请求。

通过在请求中使用 updateMask 参数,使只有您指定的字段必须包含在请求正文中。如需开始直播,请将 state 字段中的值从 CREATED 更改为 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

如需详细了解如何使用 gcloud 启动直播,请参阅 Google Cloud SDK 文档

暂停数据流

以下代码显示了一个暂停正在运行的数据流的请求。

在本例中,为 updateMask 参数指定的字段是 state 字段。暂停数据流后,其状态会从 RUNNING 更改为 PAUSED

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

如需详细了解如何使用 gcloud 暂停直播,请参阅 Google Cloud SDK 文档

恢复数据流

以下代码显示了一个恢复已暂停的数据流的请求。

在本例中,为 updateMask 参数指定的字段是 state 字段。恢复数据流后,其状态会从 PAUSED 更改为 RUNNING

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

如需详细了解如何使用 gcloud 恢复数据流,请参阅 Google Cloud SDK 文档

恢复数据流

您可以使用 RunStream 方法恢复永久失败的流。每种源数据库类型都有自己的流恢复操作定义。如需了解详情,请参阅恢复流

恢复 MySQL 或 Oracle 源的数据流

以下代码示例展示了如何从各种日志文件位置恢复 MySQL 或 Oracle 源的数据流:

REST

从当前位置恢复数据流。这是默认选项:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

从下一个可用位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

从最近的位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

从特定位置恢复数据流(基于 MySQL 二进制日志的复制):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

替换以下内容:

  • NAME_OF_THE_LOG_FILE:要从中恢复数据流的日志文件的名称
  • POSITION:日志文件中您要从中恢复流的位置。如果您未提供该值,Datastream 会从文件开头恢复数据流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

从特定位置恢复数据流(基于 MySQL GTID 的复制):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET 替换为一个或多个单个 GTID 或 GTID 范围,您希望从这些 GTID 恢复数据流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

从特定位置恢复数据流 (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替换为重做日志文件中的系统变更编号 (SCN),您希望从该编号开始恢复数据流。此字段为必填字段。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

如需详细了解可用的恢复选项,请参阅恢复数据流

gcloud

不支持使用 gcloud 恢复数据流。

恢复 PostgreSQL 源的流

以下代码示例展示了针对 PostgreSQL 源恢复数据流的请求。在恢复期间,数据流会开始从为该数据流配置的复制槽中的第一个日志序列号 (LSN) 开始读取。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

如果您想更改复制槽,请先使用新的复制槽名称更新数据流:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

不支持使用 gcloud 恢复数据流。

恢复 SQL Server 源的流

以下代码示例展示了针对 SQL Server 源恢复数据流的请求示例。

REST

从第一个可用位置恢复数据流:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

从首选的日志序列号恢复数据流:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

不支持使用 gcloud 恢复数据流。

从特定位置开始或恢复播放视频

对于 MySQL 和 Oracle 源,您可以从特定位置开始数据流或恢复已暂停的数据流。如果您想使用外部工具执行回填,或者从您指定的位置开始 CDC,这可能会很有用。对于 MySQL 源,您需要指明 binlog 位置或 GTID 集;对于 Oracle 源,您需要指明重做日志文件中的系统变更编号 (SCN)。

以下代码展示了如何从特定位置开始或恢复已创建的数据流。

从特定 binlog 位置(MySQL)开始或恢复数据流:

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

替换以下内容:

  • NAME_OF_THE_LOG_FILE:您要从中开始流式传输的日志文件的名称。
  • POSITION:您希望开始流式传输的日志文件中的位置。如果您不提供此值,Datastream 会从文件开头开始读取。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复播放视频流。如需了解如何使用 gcloud 启动或恢复直播,请参阅 Cloud SDK 文档

从特定 GTID 集开始或恢复流(MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET 替换为一个或多个单个 GTID 或 GTID 范围,您希望从这些 GTID 开始或恢复数据流。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复播放视频流。如需了解如何使用 gcloud 启动或恢复直播,请参阅 Cloud SDK 文档

从重做日志文件 (Oracle) 中的特定系统变更编号开始或恢复数据流:

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn 替换为重做日志文件中的系统变更编号 (SCN),您希望从该编号开始数据流。此字段为必填字段。

例如:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

不支持使用 gcloud 从特定位置开始或恢复播放视频流。如需了解如何使用 gcloud 启动直播,请参阅 Cloud SDK 文档

修改数据流

以下代码显示了一个请求,它更新数据流的文件轮替配置,以便每 75 MB 或每 45 秒轮替一次文件。

在本例中,为 updateMask 参数指定的字段包括 fileRotationMbfileRotationInterval 字段,分别由 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 标志表示。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

以下代码显示了一个请求,它在 Datastream 写入 Cloud Storage 的文件的路径中添加 Unified Types 架构文件。因此,Datastream 会写入两个文件:JSON 数据文件和 Avro 架构文件。

在本例中,指定的字段是 jsonFileFormat 字段,由 destinationConfig.gcsDestinationConfig.jsonFileFormat 标志表示。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

以下代码显示了一个请求,它请求 Datastream 将现有数据连同正在对数据进行的更改从源数据库中复制到目标位置。

代码的 oracleExcludedObjects 部分显示了无法回填到目标位置的表和架构。

在本例中,系统将回填所有表和架构,schema3 中的 tableA 除外。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

如需详细了解如何使用 gcloud 修改直播,请参阅 Google Cloud SDK 文档

启动数据流对象的回填

Datastream 中的数据流可以回填历史数据,并将正在进行的更改流式传输到目标位置。正在进行的更改将始终从来源流式传输到目标位置。不过,您可以指定是否要流式传输历史数据。

如果要将历史数据从来源流式传输到目标位置,请使用 backfillAll 参数。

借助 Datastream,您还可以仅流式传输特定数据库表的历史数据。为此,请使用 backfillAll 参数并排除不需要其历史数据的表。

如果您只希望将正在进行的更改流式传输到目标位置,请使用 backfillNone 参数。如果您希望 Dataflow 将所有现有数据的快照从来源流式传输到目标位置,则必须对包含此数据的对象手动启动回填。

为对象启动回填的另一个原因是来源与目标位置之间的数据不同步。例如,用户可以意外删除目标位置中的数据,而这些数据现在丢失。在这种情况下,为对象启动回填可用作“重置机制”,因为所有数据都一次性流式传输到目标位置。因此,数据会在来源和目标位置之间同步。

您必须先检索对象的相关信息,然后才能对数据流的对象启动回填。

每个对象都有一个 OBJECT_ID,用于唯一标识该对象。您可以使用 OBJECT_ID 为数据流启动回填。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

如需详细了解如何使用 gcloud 为数据流的对象启动回填,请参阅 Google Cloud SDK 文档

停止数据流对象的回填

启动数据流对象的回填后,您可停止该对象的回填。例如,如果用户修改数据库架构,则架构或数据可能会损坏。您不希望将此架构或数据流式传输到目标位置,因此需停止对象的回填。

您还可以停止对象的回填来实现负载均衡。Datastream 可以并行运行多个回填。这可能会给来源带来额外的负担。如果负载显著,请停止每个对象的回填,然后逐个启动对象的回填。

您必须先发出请求来检索数据流的所有对象的相关信息,然后才能停止数据流对象的回填。返回的每个对象都有一个 OBJECT_ID,用于唯一标识该对象。您可以使用 OBJECT_ID 来停止数据流的回填。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

如需详细了解如何使用 gcloud 停止流对象的回填,请参阅 Google Cloud SDK 文档

更改并发 CDC 任务数上限

以下代码展示了如何将 MySQL 流的最大并发变更数据捕获 (CDC) 任务数设置为 7。

在本例中,为 updateMask 参数指定的字段是 maxConcurrentCdcTasks 字段。通过将其值设置为 7,您可以将并发 CDC 任务的最大数量从之前的值更改为 7。您可以使用 0 到 50(含)之间的值。如果您未定义该值,或者将其定义为 0,则系统会为该流设置默认值 5 个任务。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

如需详细了解如何使用 gcloud,请参阅 Google Cloud SDK 文档

更改了并发回填任务数上限

以下代码展示了如何将 MySQL 流的最大并发回填任务数设置为 25。

在本例中,为 updateMask 参数指定的字段是 maxConcurrentBackfillTasks 字段。通过将此值设置为 25,您可以将并发回填任务数上限从之前的值更改为 25。您可以使用 0 到 50(含)之间的值。如果您未定义该值,或者将其定义为 0,则系统会为该流设置 16 个任务的默认值。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

如需详细了解如何使用 gcloud,请参阅 Google Cloud SDK 文档

为 Oracle 来源启用大型对象流式传输

您可以为具有 Oracle 源的流启用大型对象(例如二进制大型对象 [BLOB]、字符大型对象 [CLOB] 和国家字符大型对象 [NCLOB])的流式传输。借助 streamLargeObjects 标志,您可以在新数据流和现有数据流中包含大型对象。该标志是在数据流级别设置的,您无需指定大对象数据类型的列。

以下示例展示了如何创建可用于流式传输大型对象的流。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

如需详细了解如何使用 gcloud 更新数据流,请参阅 Google Cloud SDK 文档

删除数据流

以下代码显示了一个删除数据流的请求。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

如需详细了解如何使用 gcloud 删除数据流,请参阅 Google Cloud SDK 文档

后续步骤