BigQuery Storage Write API 最佳做法

本文档介绍了使用 BigQuery Storage Write API 的最佳做法。在阅读本文档之前,请先阅读 BigQuery Storage Write API 概览

限制流的创建速率

在创建流之前,请考虑是否可以使用默认流。对于流式传输场景,相比应用创建的流,默认流具有较少的配额限制并且可以更好地进行扩缩。如果您使用应用创建的流,请确保先充分利用每个现有流上的最大吞吐量,然后再创建更多流。例如,您可以使用异步写入

对于应用创建的流,请避免高频率调用 CreateWriteStream。通常,如果每秒超过 40 至 50 次调用,API 调用的延时会显著增加(超过 25 秒)。确保您的应用可以接受冷启动,逐步增加流数量,并限制 CreateWriteStream 调用的速率。您还可以设置较长的截止时间,等待调用完成,以免调用失败并显示 DeadlineExceeded 错误。对于 CreateWriteStream 调用的最大速率,还有一个长期配额可供使用。 创建数据流是一个资源密集型过程,因此降低数据流的创建速率并充分利用现有数据流是避免超出此限制的最佳方法。

连接池管理

AppendRows 方法会创建与流的双向连接。您可以在默认流上打开多个连接,但只能在应用创建的流上打开单个活跃连接。

使用默认数据流时,您可以使用 Storage Write API 多路复用写入具有共享连接的多个目标表。多路复用池连接,用于提高吞吐量和资源利用率。如果您的工作流具有超过 20 个并发连接,我们建议您使用多路复用。Java 和 Go 支持多路复用。如需详细了解 Java 实现,请参阅使用多路复用。如需详细了解 Go 实现,请参阅连接共享(多路复用)。 如果您使用具有“至少一次”语义的 Beam 连接器,则可以通过 UseStorageApiConnectionPool 启用多路复用。Dataproc Spark 连接器会默认启用多路复用功能。

为获得最佳性能,请使用一个连接进行尽可能多的数据写入。请勿将单个连接仅用于单次写入,也不要针对许多小型写入打开和关闭流。

每个项目可以同时打开的并发连接数量有一个配额。超出该限额时,对 AppendRows 的调用会失败。 但是,并发连接的配额可以增加,并且通常不应成为扩缩的限制因素。

每次调用 AppendRows 时,系统都会创建一个新的数据写入者对象。因此,使用应用创建的数据流时,连接数对应于已创建的数据流的数量。通常,单个连接支持至少 1 MBps 的吞吐量。上限则取决于多个因素,例如网络带宽、数据架构和服务器负载,并且可能超过 10 MBps。

每个项目的总吞吐量也有一个配额。这表示流经 Storage Write API 服务的所有连接的每秒字节数。如果您的项目超出此配额,您可以申请更高的配额限制。通常,这涉及以相等的比率提高随附的配额(如并发连接配额)。

管理流偏移量以实现“正好一次”语义

Storage Write API 仅允许写入到流的当前末尾处,该位置会随着您不断附加数据而移动。流中的当前位置会被指定为相对于流起始位置的偏移量。

写入应用创建的流时,您可以指定流偏移量以实现“正好一次”写入语义。

指定偏移量时,写入操作具有幂等性,因此可以避免因网络错误或服务器无响应而导致的重试。处理与偏移量相关的以下错误:

  • ALREADY_EXISTS (StorageErrorCode.OFFSET_ALREADY_EXISTS):该行已写入。您可以放心地忽略此错误。
  • OUT_OF_RANGE (StorageErrorCode.OFFSET_OUT_OF_RANGE):先前的写入操作失败。从上一次成功写入重试。

请注意,如果设置的偏移值不正确,也可能会发生这些错误,因此您必须谨慎管理偏移量。

在使用流偏移量之前,请考虑您是否需要“正好一次”语义。例如,如果您的上游数据流水线仅保证“至少一次”写入,或者您可以在注入数据后轻松检测重复项,那么您可能不需要“正好一次”写入。在这种情况下,我们建议您使用默认流,因而避免跟踪行偏移量。

请勿阻止 AppendRows 调用

AppendRows 方法是异步执行的。您可以发送一系列写入,而不会单独阻止每个写入的响应。双向连接上的响应消息到达的顺序与请求加入队列的顺序相同。如需实现最大吞吐量,请勿阻止 AppendRows 调用,以等待响应。

处理架构更新

对于数据流式传输场景,表架构通常在流式传输流水线之外进行管理。架构通常会随着时间的推移而演变,例如,通过添加新的可以为 null 的字段。稳健的流水线必须能够处理带外架构更新。

Storage Write API 支持表架构,如下所示:

  • 第一个写入请求会包含架构。
  • 每行数据会作为二进制协议缓冲区进行发送。BigQuery 会将数据映射到架构。
  • 您可以省略可以为 null 的字段,但不能包含当前架构中不存在的任何字段。如果您发送包含额外字段的行,Storage Write API 会返回 StorageError 并显示 StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELD

如果要在载荷中发送新字段,您应首先更新 BigQuery 中的表架构。Storage Write API 会在片刻(大约几分钟)之后检测到架构更改。当 Storage Write API 检测到架构更改时,AppendRowsResponse 响应消息将包含描述新架构的 TableSchema 对象。

如需使用更新后的架构发送数据,您必须先关闭现有连接,然后使用新架构打开新连接。

Java 客户端。Java 客户端库通过 JsonStreamWriter 类为架构更新提供了一些额外的功能。架构更新后,JsonStreamWriter 会自动使用更新后的架构重新连接。您无需明确关闭和重新打开连接。如需以编程方式检查架构更改,请在 append 方法完成后调用 AppendRowsResponse.hasUpdatedSchema

您还可以将 JsonStreamWriter 配置为忽略输入数据中的未知字段。如需设置此行为,请调用 setIgnoreUnknownFields。此行为与使用旧版 tabledata.insertAll API 时的 ignoreUnknownValues 选项类似。但是这可能会导致意外的数据丢失,因为未知字段会被静默删除。