事件和数据流

概览

Datastream 中的数据层次结构如下:

  • 数据流,由数据源和目标位置组成。
  • 对象,是数据流的一部分,例如来自特定数据库的表。
  • 事件,由特定对象(例如数据库插入)生成的单个更改。

数据流、对象和事件具有关联的数据和元数据。这些数据和元数据可用于不同目的。

关于事件

每个事件包含 3 种类型的数据:

  • 事件数据:表示来自数据流来源的对象中对数据本身的更改。每个事件都包含已更改的整行。
  • 通用元数据:此元数据显示在 Datastream 生成的每个用于操作(例如移除目标位置的重复数据)的事件中。
  • 特定于来源的元数据:此元数据显示在特定数据流来源生成的每个事件上。此元数据因来源而异。

事件数据

事件数据是来自数据流来源的给定对象中的每次更改的载荷。

事件采用 Avro 或 JSON 格式。如果使用 Avro 格式,则对于每一列,事件将包含列索引和值。通过列索引,可以从 Avro 标头中的架构中检索列名称和统一类型。

使用 JSON 格式时,对于每一列,事件将包含列名称和值。

事件元数据可用于收集事件来源的相关信息,还可用于移除目标位置的重复数据并按下游使用者对事件排序。

下表列出并描述了通用件元数据和特定于来源的事件元数据对应的字段和数据类型。

通用元数据

此元数据在所有类型的数据流中是一致的。

字段 Avro 类型 JSON 类型 说明
stream_name 字符串 字符串 创建时定义的唯一数据流名称。
read_method 字符串 字符串

指示是否使用变更数据捕获 (CDC) 方法从来源读取数据作为历史回填的一部分,或者作为在 CDC 复制期间回滚事务时创建的补充任务的一部分。

可能的值包括:

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
object 字符串 字符串 用于对不同类型的事件进行分组的名称,通常是来源中表或对象的名称。
schema_key 字符串 字符串 事件统一架构的唯一标识符。
uuid 字符串 字符串 Datastream 生成的事件的唯一标识符。
read_timestamp timestamp-millis 字符串 Datastream 读取记录时的时间戳 (UTC)(Epoch 时间戳,以毫秒为单位)。
source_timestamp timestamp-millis 字符串 记录在来源中发生更改时的时间戳 (UTC)(Epoch 时间戳,以毫秒为单位)。
sort_keys {"type": "array", "items": ["string", "long"]} 数组 可用于按事件发生顺序对事件进行排序的值数组。

特定于来源的元数据

此元数据与来自源数据库的 CDC 和回填事件相关联。要查看此元数据,请从以下下拉菜单中选择一个来源。

来源 字段 Avro 类型 JSON 类型 说明
MySQL log_file 字符串 字符串 Datastream 在 CDC 复制期间从中拉取事件的日志文件。
MySQL log_position 长整型 长整型 MySQL 二进制日志中的日志位置(偏移量)。
MySQL primary_keys 字符串数组 字符串数组 包含构成表主键的(一个或多个)列名称的列表。如果表没有主键,则此字段为空。
MySQL is_deleted boolean boolean
  • true 值表示已在来源中删除该行。
  • false 值表示未删除该行。
MySQL database 字符串 字符串 与事件关联的数据库。
MySQL table 字符串 字符串 与事件关联的表。
MySQL change_type 字符串 字符串

事件表示的更改类型(INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle log_file 字符串 字符串 Datastream 在 CDC 复制期间从中拉取事件的日志文件。
Oracle scn 长整型 长整型 Oracle 事务日志中的日志位置(偏移量)。
Oracle row_id 字符串 字符串 Oracle 的 row_id
Oracle is_deleted boolean boolean
  • true 值表示已在来源中删除该行。
  • false 值表示未删除该行。
Oracle database 字符串 字符串 与事件关联的数据库。
Oracle schema 字符串 字符串 与事件中的表关联的架构。
Oracle table 字符串 字符串 与事件关联的表。
Oracle change_type 字符串 字符串

事件表示的更改类型(INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle tx_id 字符串 字符串 事件所属的交易 ID。
Oracle rs_id 字符串 字符串 记录集 ID。rs_idssn 相结合可唯一标识 V$LOGMNR_CONTENTS 中的一行。rs_id 可唯一标识生成该行的重做记录。
Oracle ssn 长整型 长整型 SQL 序列号。此编号与 rs_id 搭配使用,可唯一标识 V$LOGMNR_CONTENTS 中的行。
PostgreSQL schema 字符串 字符串 与事件中的表关联的架构。
PostgreSQL table 字符串 字符串 与事件关联的表。
PostgreSQL is_deleted boolean boolean
  • true 值表示已在来源中删除该行。
  • false 值表示未删除该行。
PostgreSQL change_type 字符串 字符串 事件所表示的更改类型(INSERTUPDATEDELETE)。
PostgreSQL tx_id 字符串 字符串 事件所属的交易 ID。
PostgreSQL lsn 字符串 字符串 当前条目的日志序列号。
PostgreSQL primary_keys 字符串数组 字符串数组 包含构成表主键的(一个或多个)列名称的列表。如果表没有主键,则此字段为空。
SQL Server table 字符串 字符串 与事件关联的表。
SQL Server database 长整型 长整型 与事件关联的数据库。
SQL Server schema 字符串数组 字符串数组 与事件中的表关联的架构。
SQL Server is_deleted boolean boolean
  • 值为 true 时,表示已在来源中删除该行。
  • false 值表示未删除相应行。
SQL Server lsn 字符串 字符串 事件的日志序列号。
SQL Server tx_id 字符串 字符串 事件所属的交易 ID。
SQL Server physical_location 整数数组 整数数组 日志记录的实际位置,由三个整数表示:文件 ID、页面 ID 和记录的位置 ID。
SQL Server replication_index 字符串数组 字符串数组 索引可以唯一标识表中的行的列名称列表。
SQL Server change_type 字符串 字符串

事件所表示的更改类型(`INSERT`、UPDATE、`DELETE`)。

事件流示例

此流说明了由 3 个连续操作(INSERTUPDATEDELETE)在源数据库的 SAMPLE 表中的一行上生成的事件。

时间 THIS_IS_MY_PK (int) FIELD1 (nchar nullable) FIELD2 (nchar non-null)>
0 1231535353 foo TLV
1 1231535353 NULL TLV

INSERT (T0)

消息载荷由整个新行组成。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id":
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

UPDATE (T1)

消息载荷由整个新行组成。不包含先前的值。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",
  "source_metadata": {
    "log_file":
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE (T2)

消息载荷由整个新行组成。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file":
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

排序和一致性

本部分介绍 Datastream 如何处理排序和一致性。

排序

Datastream 不保证排序,但每个事件都包含完整的数据行以及数据写入来源时的时间戳。在 BigQuery 中,无序事件会自动按正确序列合并。BigQuery 使用事件元数据和内部更改序列号 (CSN),以正确的顺序将事件应用于表。在 Cloud Storage 中,同一时间的事件可能会跨越多个文件。

按照设计,在为了初始化数据流时创建的数据的初始回填而对事件进行回填时,会无序生成事件。

可以根据来源逐一推断排序。

来源 说明
MySQL

属于初始回填的事件的 read_method 字段以 mysql-backfill 开头。在回填中接收事件的顺序不受影响,因为可以按任意顺序使用它们。

属于正在进行的复制的事件将 read_method 字段设置为 mysql-cdc-binlog

该顺序可由 log_file 字段以及相对于日志文件偏移的 log_position 字段的组合推断得出。此组合提供一个唯一的递增编号,用于标识数据库中的操作顺序。

Oracle

属于初始回填的事件的 read_method 字段以 oracle-backfill 开头。在回填中接收事件的顺序不受影响,因为可以按任意顺序使用它们。

属于正在进行的复制的事件将 read_method 字段设置为 oracle-cdc-logminer

该顺序可由 rs_id(记录集 ID)字段和 ssn(SQL 序列号)字段的组合推断得出。此组合提供一个唯一的递增编号,用于标识数据库中的操作顺序。

PostgreSQL

属于初始回填的事件的 read_method 字段以 postgresql-backfill 开头。在回填中接收事件的顺序不受影响,因为可以按任意顺序使用它们。

属于正在进行的复制的事件将 read_method 字段设置为 postgres-cdc-wal

顺序可以通过 source_timestamp 字段和 lsn(日志序列号)字段的组合推断出来。此组合提供一个唯一的递增编号,用于标识数据库中的操作顺序。

SQL Server

属于初始回填的事件的 read_method 字段以 sqlserver-backfill 开头。在回填中接收事件的顺序不受影响,因为可以按任意顺序使用它们。

对于属于持续复制的一部分的事件,系统会将 read_method 字段设置为 sqlserver-cdc

顺序可以通过 source_timestamp 字段和 lsn(日志序列号)字段的组合推断出来。此组合提供一个唯一的递增编号,用于标识数据库中的操作顺序。

一致性

Datastream 保证来自源数据库的数据将至少传送至目标位置一次。不会漏掉任何事件,但数据流中可能存在重复事件。重复事件的时段应大约几分钟,事件元数据中事件的通用唯一标识符 (UUID) 可用于检测重复项。

当数据库日志文件包含未提交的事务时,如果回滚了任何事务,则数据库会在日志文件中将其反映为“反向”数据操纵语言 (DML) 操作。例如,回滚 INSERT 操作具有相应的 DELETE 操作。Datastream 从日志文件中读取这些操作。

关于数据流

每个数据流都具有描述该数据流及其数据来源的元数据。这些元数据包括数据流名称、来源和目标连接配置文件等信息。

如需查看数据流对象的完整定义,请查看 API 参考文档。

数据流状态

数据流可能处于以下任一状态:

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

您可以使用日志来查找其他状态信息,例如表回填、处理的行数等。您还可以使用 FetchStreamErrors API 来检索错误。

可通过发现 API 使用的对象元数据

发现 API 会返回一些对象,它们表示由连接配置文件表示的数据源或目标位置中定义的对象结构。对于对象本身及其拉取的数据的每个字段,每个对象都具有元数据。此元数据可通过发现 API 获取。