概览
Datastream 中的数据层次结构如下:
- 数据流,由数据源和目的地组成。
- 对象,是数据流的一部分,例如来自特定数据库的表。
- 事件,由特定对象(例如数据库插入)生成的单个更改。
数据流、对象和事件具有关联的数据和元数据。这些数据和元数据可用于不同目的。
关于事件
每个事件包含 3 种类型的数据:
- 事件数据:表示来自数据流来源的对象中对数据本身的更改。每个事件都包含已更改的整行。
- 通用元数据:此元数据显示在 Datastream 生成的每个用于操作(例如移除目标位置的重复数据)的事件中。
- 特定于来源的元数据:此元数据显示在特定数据流来源生成的每个事件上。此元数据因来源而异。
事件数据
事件数据是来自数据流来源的给定对象中的每次更改的载荷。
事件采用 Avro 或 JSON 格式。
使用 Avro 格式时,对于每一列,事件将包含列索引和值。通过列索引,可以从 Avro 标头中的架构中检索列名称和统一类型。
使用 JSON 格式时,对于每一列,事件将包含列名称和值。
事件元数据可用于收集事件来源的相关信息,还可用于移除目标位置的重复数据并按下游使用者对事件排序。
下表列出并描述了通用件元数据和特定于来源的事件元数据对应的字段和数据类型。
通用元数据
此元数据在所有类型的数据流中是一致的。
字段 | Avro 类型 | JSON 类型 | 说明 |
---|---|---|---|
stream_name |
字符串 | 字符串 | 创建时定义的唯一数据流名称。 |
read_method |
字符串 | 字符串 | 指示是否使用变更数据捕获 (CDC) 方法从来源读取数据作为历史回填的一部分,或者作为在 CDC 复制期间回滚事务时创建的补充任务的一部分。 可能的值包括:
|
object |
字符串 | 字符串 | 用于对不同类型的事件进行分组的名称,通常是来源中表或对象的名称。 |
schema_key |
字符串 | 字符串 | 事件统一架构的唯一标识符。 |
uuid |
字符串 | 字符串 | Datastream 生成的事件的唯一标识符。 |
read_timestamp |
timestamp-millis | 字符串 | Datastream 读取记录时的时间戳 (UTC)(Epoch 时间戳,以毫秒为单位)。 |
source_timestamp |
timestamp-millis | 字符串 | 记录在来源中发生更改时的时间戳 (UTC)(Epoch 时间戳,以毫秒为单位)。 |
sort_keys |
{"type": "array", "items": ["string", "long"]} |
数组 | 可用于按事件发生顺序对事件进行排序的值数组。 |
特定于来源的元数据
此元数据与来自源数据库的 CDC 和回填事件相关联。要查看此元数据,请从以下下拉菜单中选择一个来源。
来源 | 字段 | Avro 类型 | JSON 类型 | 说明 |
---|---|---|---|---|
MySQL | log_file |
字符串 | 字符串 | Datastream 在 CDC 复制期间从中拉取事件的日志文件。 |
MySQL | log_position |
长整型 | 长整型 | MySQL 二进制日志中的日志位置(偏移量)。 |
MySQL | primary_keys |
字符串数组 | 字符串数组 | 包含构成表主键的(一个或多个)列名称的列表。如果表没有主键,则此字段为空。 |
MySQL | is_deleted |
布尔值 | 布尔值 |
|
MySQL | database |
字符串 | 字符串 | 与事件关联的数据库。 |
MySQL | table |
字符串 | 字符串 | 与事件关联的表。 |
MySQL | change_type |
字符串 | 字符串 | 事件表示的更改类型( |
Oracle | log_file |
字符串 | 字符串 | Datastream 在 CDC 复制期间从中拉取事件的日志文件。 |
Oracle | scn |
长整型 | 长整型 | Oracle 事务日志中的日志位置(偏移量)。 |
Oracle | row_id |
字符串 | 字符串 | Oracle 的 row_id。 |
Oracle | is_deleted |
布尔值 | 布尔值 |
|
Oracle | database |
字符串 | 字符串 | 与事件关联的数据库。 |
Oracle | schema |
字符串 | 字符串 | 与事件中的表关联的架构。 |
Oracle | table |
字符串 | 字符串 | 与事件关联的表。 |
Oracle | change_type |
字符串 | 字符串 | 事件表示的更改类型( |
Oracle | tx_id |
字符串 | 字符串 | 事件所属的交易 ID。 |
Oracle | rs_id |
字符串 | 字符串 | 记录集 ID。rs_id 和 ssn 相结合可唯一标识 V$LOGMNR_CONTENTS 中的一行。rs_id 可唯一标识生成该行的重做记录。 |
Oracle | ssn |
长整型 | 长整型 | SQL 序列号。此编号与 rs_id 搭配使用,可唯一标识 V$LOGMNR_CONTENTS 中的行。 |
PostgreSQL | schema |
字符串 | 字符串 | 与事件中的表关联的架构。 |
PostgreSQL | table |
字符串 | 字符串 | 与事件关联的表。 |
PostgreSQL | is_deleted |
布尔值 | 布尔值 |
|
PostgreSQL | change_type |
字符串 | 字符串 | 事件代表的更改类型(INSERT 、UPDATE 、DELETE )。
|
PostgreSQL | tx_id |
字符串 | 字符串 | 事件所属的交易 ID。 |
PostgreSQL | lsn |
字符串 | 字符串 | 当前条目的日志序列号。 |
PostgreSQL | primary_keys |
字符串数组 | 字符串数组 | 包含构成表主键的(一个或多个)列名称的列表。如果表没有主键,则此字段为空。 |
SQL Server | table |
字符串 | 字符串 | 与事件关联的表。 |
SQL Server | database |
长整型 | 长整型 | 与事件关联的数据库。 |
SQL Server | schema |
字符串数组 | 字符串数组 | 与事件中的表关联的架构。 |
SQL Server | is_deleted |
布尔值 | 布尔值 |
|
SQL Server | lsn |
字符串 | 字符串 | 事件的日志序列号。 |
SQL Server | tx_id |
字符串 | 字符串 | 事件所属的交易 ID。 |
SQL Server | physical_location |
整数数组 | 整数数组 | 日志记录的实际位置,由三个整数描述:记录的文件 ID、页面 ID 和槽 ID。 |
SQL Server | replication_index |
字符串数组 | 字符串数组 | 索引的列名称列表,可唯一标识表中的行。 |
SQL Server | change_type |
字符串 | 字符串 | 事件表示的更改类型(“INSERT”“UPDATE”“DELETE”)。 |
事件流示例
此流说明了由 3 个连续操作(INSERT
、UPDATE
和 DELETE
)在源数据库的 SAMPLE
表中的一行上生成的事件。
时间 | THIS_IS_MY_PK (int) | FIELD1 (nchar nullable) | FIELD2 (nchar non-null)> |
---|---|---|---|
0 | 1231535353 | foo | TLV |
1 | 1231535353 | NULL | TLV |
INSERT (T0)
消息载荷由整个新行组成。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "d7989206-380f-0e81-8056-240501101100",
"read_timestamp": "2019-11-07T07:37:16.808Z",
"source_timestamp": "2019-11-07T02:15:39",
"source_metadata": {
"log_file": ""
"scn": 15869116216871,
"row_id": "AAAPwRAALAAMzMBABD",
"is_deleted": false,
"database": "DB1",
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "INSERT",
"tx_id":
"rs_id": "0x0073c9.000a4e4c.01d0",
"ssn": 67,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": "foo",
"FIELD2": "TLV",
}
}
UPDATE (T1)
消息载荷由整个新行组成。不包含先前的值。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "e6067366-1efc-0a10-a084-0d8701101101",
"read_timestamp": "2019-11-07T07:37:18.808Z",
"source_timestamp": "2019-11-07T02:17:39",
"source_metadata": {
"log_file":
"scn": 15869150473224,
"row_id": "AAAGYPAATAAPIC5AAB",
"is_deleted": false,
"database":
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "UPDATE",
"tx_id":
"rs_id": "0x006cf4.00056b26.0010",
"ssn": 0,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": null,
"FIELD2": "TLV",
}
}
DELETE (T2)
消息载荷由整个新行组成。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
"read_timestamp": "2019-11-07T07:37:20.808Z",
"source_timestamp": "2019-11-07T02:19:39",
"source_metadata": {
"log_file":
"scn": 158691504732555,
"row_id": "AAAGYPAATAAPIC5AAC",
"is_deleted": true,
"database":
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "DELETE",
"tx_id":
"rs_id": "0x006cf4.00056b26.0011",
"ssn": 0,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": null,
"FIELD2": "TLV",
}
}
排序和一致性
本部分介绍 Datastream 如何处理排序和一致性。
排序
Datastream 不保证有序,但每个事件都包含完整的数据行以及数据写入来源的时间戳。在 BigQuery 中,系统会自动按正确的顺序合并无序事件。BigQuery 使用事件元数据和内部更改序列号 (CSN) 以正确的顺序将事件应用于表。在 Cloud Storage 中,同一时间的事件可以跨多个文件。
按照设计,在为了初始化数据流时创建的数据的初始回填而对事件进行回填时,会无序生成事件。
您可以按来源推断出顺序。
来源 | 说明 |
---|---|
MySQL | 属于初始回填的事件的 属于正在进行的复制的事件将 该顺序可由 |
Oracle | 属于初始回填的事件的 属于正在进行的复制的事件将 该顺序可由 |
PostgreSQL | 属于初始回填的事件的 属于正在进行的复制的事件将 顺序可以通过 |
SQL Server |
属于初始回填的事件的 属于正在进行的复制的事件将 顺序可以通过 |
一致性
Datastream 保证来自源数据库的数据将至少传送至目标位置一次。不会漏掉任何事件,但数据流中可能存在重复事件。重复事件的时段应大约几分钟,事件元数据中事件的通用唯一标识符 (UUID) 可用于检测重复项。
当数据库日志文件包含未提交的事务时,如果回滚了任何事务,则数据库会在日志文件中将其反映为“反向”数据操纵语言 (DML) 操作。例如,回滚 INSERT
操作具有相应的 DELETE
操作。Datastream 从日志文件中读取这些操作。
关于数据流
每个数据流都具有描述该数据流及其数据来源的元数据。这些元数据包括数据流名称、来源和目标连接配置文件等信息。
如需查看数据流对象的完整定义,请查看 API 参考文档。
数据流状态
数据流可能处于以下任一状态:
Not started
Starting
Running
Draining
Paused
Failed
Failed permanently
您可以使用日志来查找其他状态信息,例如表回填、处理的行数等。您还可以使用 FetchStreamErrors
API 来检索错误。
可通过发现 API 使用的对象元数据
发现 API 会返回一些对象,它们表示由连接配置文件表示的数据源或目标位置中定义的对象结构。对于对象本身及其拉取的数据的每个字段,每个对象都具有元数据。此元数据可通过发现 API 获取。