本文档简要介绍了 BigQuery 订阅、其工作流和关联属性。
BigQuery 订阅是一种导出订阅,会在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅者客户端。使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 可以创建、更新、列出、分离或删除 BigQuery 订阅。
如果没有 BigQuery 订阅类型,则您需要拉取或推送订阅,以及读取消息并将其写入 BigQuery 表的订阅程序(如 Dataflow)。如果将消息存储在 BigQuery 表中之前不需要进行额外处理,则不需要运行 Dataflow 作业的开销;您可以改用 BigQuery 订阅。
但是,对于一些需要先进行数据转换的 Pub/Sub 系统,我们仍建议使用 Dataflow 流水线,以便将数据存储在 BigQuery 表中。如需了解如何使用 Dataflow 通过转换将数据从 Pub/Sub 流式传输到 BigQuery,请参阅从 Pub/Sub 流式传输到 BigQuery。
默认情况下, 从 Dataflow 到 BigQuery 的 Pub/Sub 订阅模板会强制执行仅传送一次。这通常通过 Dataflow 流水线中的重复信息删除机制来实现。但是,BigQuery 订阅仅支持至少传送一次。如果精确的重复信息删除对您的使用场景至关重要,请考虑使用 BigQuery 中的下游进程处理潜在的重复项。
准备工作
在阅读本文档之前,请确保您熟悉以下内容:
Pub/Sub 的工作原理以及不同的 Pub/Sub 术语。
Pub/Sub 支持的不同订阅类型,以及为何可能需要使用 BigQuery 订阅。
BigQuery 的工作原理以及如何配置和管理 BigQuery 表。
BigQuery 订阅工作流
下图显示了 BigQuery 订阅与 BigQuery 之间的工作流。
以下是对参考图 1 的工作流的简要说明:
- Pub/Sub 使用 BigQuery Storage Write API 将数据发送到 BigQuery 表。
- 这些消息会批量发送到 BigQuery 表。
- 写入操作成功完成后,API 会返回 OK 响应。
- 如果写入操作发生任何失败,则 Pub/Sub 消息本身会得到否定确认。然后,该消息会被重新发送。如果消息失败的次数足够多,并且订阅上配置了死信主题,则该消息会移至死信主题。
BigQuery 订阅的属性
您为 BigQuery 订阅配置的属性决定了 Pub/Sub 将消息写入到的 BigQuery 表以及该表的架构类型。
如需了解详情,请参阅 BigQuery 属性。
架构兼容性
Pub/Sub 和 BigQuery 使用不同的方式来定义其架构。Pub/Sub 架构使用 Apache Avro 或 Protocol Buffer 格式定义,而 BigQuery 架构使用多种格式进行定义。下面列出了有关 Pub/Sub 主题与 BigQuery 表之间的架构兼容性的重要信息。
任何包含格式不正确字段的消息都不会写入 BigQuery。
在 BigQuery 架构中,
INT
、SMALLINT
、INTEGER
、BIGINT
、TINYINT
和BYTEINT
是INTEGER
的别名;DECIMAL
是NUMERIC
的别名;BIGDECIMAL
是BIGNUMERIC
的别名。如果主题架构中的类型为
string
且 BigQuery 表中的类型为JSON
、TIMESTAMP
、DATETIME
、DATE
、TIME
、NUMERIC
或BIGNUMERIC
,则 Pub/Sub 消息中此字段的任何值都必须遵循为 BigQuery 数据类型指定的格式。支持某些 Avro 逻辑类型,如下表所示。未列出的任何逻辑类型仅与其注解的等效 Avro 类型匹配,详见 Avro 规范。
以下是不同架构格式到 BigQuery 数据类型的一系列映射。
Avro 类型
Avro 类型 | BigQuery 数据类型 |
null |
Any NULLABLE |
boolean |
BOOLEAN |
int |
INTEGER 、NUMERIC 或 BIGNUMERIC |
long |
INTEGER 、NUMERIC 或 BIGNUMERIC |
float |
FLOAT64 、NUMERIC 或 BIGNUMERIC |
double |
FLOAT64 、NUMERIC 或 BIGNUMERIC |
bytes |
BYTES 、NUMERIC 或 BIGNUMERIC |
string |
STRING 、JSON 、TIMESTAMP 、DATETIME 、DATE 、TIME 、NUMERIC 或 BIGNUMERIC |
record |
RECORD/STRUCT |
array /Type |
REPEATED Type |
map ,值为 ValueType |
REPEATED STRUCT <key STRING, value
ValueType> |
union ,具有两种类型,一种是 null ,另一种是 Type |
NULLABLE Type |
其他 union |
无法映射 |
fixed |
BYTES 、NUMERIC 或 BIGNUMERIC |
enum |
INTEGER |
Avro 逻辑类型
Avro 逻辑类型 | BigQuery 数据类型 |
timestamp-micros |
TIMESTAMP |
date |
DATE |
time-micros |
TIME |
duration |
INTERVAL |
协议缓冲区类型
协议缓冲区类型 | BigQuery 数据类型 |
double |
FLOAT64 、NUMERIC 或 BIGNUMERIC |
float |
FLOAT64 、NUMERIC 或 BIGNUMERIC |
int32 |
INTEGER 、NUMERIC 、BIGNUMERIC 或 DATE |
int64 |
INTEGER 、NUMERIC 、BIGNUMERIC 、DATE 、DATETIME 或 TIMESTAMP |
uint32 |
INTEGER 、NUMERIC 、BIGNUMERIC 或 DATE |
uint64 |
NUMERIC 或 BIGNUMERIC |
sint32 |
INTEGER 、NUMERIC 或 BIGNUMERIC |
sint64 |
INTEGER 、NUMERIC 、BIGNUMERIC 、DATE 、DATETIME 或 TIMESTAMP |
fixed32 |
INTEGER 、NUMERIC 、BIGNUMERIC 或 DATE |
fixed64 |
NUMERIC 或 BIGNUMERIC |
sfixed32 |
INTEGER 、NUMERIC 、BIGNUMERIC 或 DATE |
sfixed64 |
INTEGER 、NUMERIC 、BIGNUMERIC 、DATE 、DATETIME 或 TIMESTAMP |
bool |
BOOLEAN |
string |
STRING 、JSON 、TIMESTAMP 、DATETIME 、DATE 、TIME 、NUMERIC 或 BIGNUMERIC |
bytes |
BYTES 、NUMERIC 或 BIGNUMERIC |
enum |
INTEGER |
message |
RECORD/STRUCT |
oneof |
无法映射 |
map<KeyType, ValueType> |
REPEATED RECORD<key KeyType, value
ValueType> |
enum |
INTEGER |
repeated/array of Type |
REPEATED Type |
日期和时间整数表示法
从整数映射到某个日期或时间类型时,数字必须代表正确的值。以下是 BigQuery 数据类型与代表这些数据类型的整数的对应关系。
BigQuery 数据类型 | 整数表示法 |
DATE |
自 1970 年 1 月 1 日 Unix 纪元以来的天数 |
DATETIME |
使用 CivilTimeEncoder 表示为民用时间的日期和时间(以微秒为单位) |
TIME |
使用 CivilTimeEncoder 表示为民用时间的时间(以微秒为单位) |
TIMESTAMP |
自 Unix 纪元 1970 年 1 月 1 日 00:00:00 UTC 以来的微秒数 |
BigQuery 变更数据捕获
当订阅属性中的 use_topic_schema
或 use_table_schema
设置为 true
时,BigQuery 订阅支持变更数据捕获 (CDC) 更新。如需将该功能与 use_topic_schema
搭配使用,请使用以下字段设置主题的架构:
_CHANGE_TYPE
(必需):设置为UPSERT
或DELETE
的string
字段。如果写入 BigQuery 表的 Pub/Sub 消息的
_CHANGE_TYPE
设置为UPSERT
,则 BigQuery 会使用同一键更新该行(如果存在),如果不存在,则插入新行。如果写入 BigQuery 表的 Pub/Sub 消息的
_CHANGE_TYPE
设置为DELETE
,则 BigQuery 会删除表中具有相同键的行(如果存在)。
_CHANGE_SEQUENCE_NUMBER
(可选):设置int64
(long
) 或int32
(int
) 字段,以确保按顺序处理对 BigQuery 表所做的更新和删除。同一行键的消息必须包含_CHANGE_SEQUENCE_NUMBER
的单调递增值。如果消息的序列号小于为某一行处理的最高序列号,则不会对 BigQuery 表中的该行产生任何影响。
如需将该功能与 use_table_schema
搭配使用,请在 JSON 消息中添加上述字段。
Pub/Sub 服务帐号权限
如需创建 BigQuery 订阅,Pub/Sub 服务帐号必须具有写入特定 BigQuery 表和读取表元数据的权限。如需了解详情,请参阅为 Pub/Sub 服务帐号分配 BigQuery 角色。
处理消息失败
当 Pub/Sub 消息无法写入 BigQuery 时,无法确认该消息。如需转发此类无法传送的消息,请在 BigQuery 订阅上配置死信主题。转发到死信主题的 Pub/Sub 消息包含一个属性 CloudPubSubDeadLetterSourceDeliveryErrorMessage
,该属性具有导致 Pub/Sub 消息无法写入 BigQuery 的原因。
配额和限制
每个区域的 BigQuery 订阅者吞吐量存在配额限制。如需了解详情,请参阅 Pub/Sub 配额和限制。
BigQuery 订阅使用 BigQuery Storage Write API 写入数据。如需了解 Storage Write API 的配额和限制,请参阅 BigQuery Storage Write API 请求。BigQuery 订阅只会消耗 Storage Write API 的吞吐量配额。您可以忽略此实例中的其他 Storage Write API 配额注意事项。
价格
如需了解 BigQuery 订阅的价格,请参阅 Pub/Sub 价格页面。
后续步骤
创建订阅,例如 BigQuery 订阅。
排查 BigQuery 订阅问题。
了解 BigQuery。
查看 Pub/Sub(包括 BigQuery 订阅)的价格。
使用
gcloud
CLI 命令创建或修改订阅。使用 REST API 创建或修改订阅。