本文档介绍如何使用 Apache Beam BigQuery I/O 连接器将数据从 Dataflow 写入 BigQuery。
Apache Beam SDK 中提供 BigQuery I/O 连接器。我们建议使用最新的 SDK 版本。如需了解详情,请参阅 Apache Beam 2.x SDK。
我们还提供了针对 Python 的跨语言支持。
概览
BigQuery I/O 连接器支持使用以下方法将数据写入 BigQuery:
STORAGE_WRITE_API
。在此模式下,连接器使用 BigQuery Storage Write API 直接将数据写入 BigQuery 存储。Storage Write API 将流式提取和批量加载整合到一个高性能 API 中。此模式可保证“正好一次”语义。STORAGE_API_AT_LEAST_ONCE
。此模式也使用 Storage Write API,但提供“至少一次”语义。此模式可缩短大多数流水线的延迟时间。不过,可能会出现重复写入。FILE_LOADS
。在此模式下,连接器将输入数据写入 Cloud Storage 中的暂存文件。然后,连接器运行 BigQuery 加载作业以将数据加载到 BigQuery 中。 该模式是有界限PCollections
的默认模式,最常见于批处理流水线中。STREAMING_INSERTS
。在此模式中,连接器使用旧版流式插入 API。此模式是无界限PCollections
的默认模式,但不建议用于新项目。
选择写入方法时,请考虑以下几点:
- 对于流处理作业,请考虑使用
STORAGE_WRITE_API
或STORAGE_API_AT_LEAST_ONCE
,因为这些模式会直接写入 BigQuery 存储,而不使用中间暂存文件。 - 如果使用“至少一次”流处理模式运行流水线,请将写入模式设置为
STORAGE_API_AT_LEAST_ONCE
。此设置更高效,并且与“至少一次”流处理模式的语义相匹配。 - 文件加载和 Storage Write API 具有不同的配额和限制。
- 加载作业使用共享 BigQuery 槽池或预留槽。如需使用预留槽,请在预留分配类型为
PIPELINE
的项目中运行加载作业。如果您使用共享 BigQuery 槽池,加载作业是免费的。不过,BigQuery 不保证共享数据池的可用容量。如需了解详情,请参阅预留简介。
最大并行数量
对于流式处理流水线中的
FILE_LOADS
和STORAGE_WRITE_API
,连接器会将数据分片为多个文件或流。通常,我们建议调用withAutoSharding
以启用自动分片。对于批处理流水线中的
FILE_LOADS
,连接器会将数据写入分区文件,然后分区文件会并行加载到 BigQuery 中。对于批处理流水线中的
STORAGE_WRITE_API
,每个工作器都会创建一个或多个要写入 BigQuery 的流(由分片总数决定)。对于
STORAGE_API_AT_LEAST_ONCE
,有单个默认写入流。多个工作器附加到此写入流。
性能
下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2
工作器上运行。它们未使用 Runner v2。
1 亿条记录 | 1 KB | 1 列 | 吞吐量(字节) | 吞吐量(元素) |
---|---|---|
Storage Write | 55 MBps | 每秒 54,000 个元素 |
Avro Load | 78 MBps | 每秒 77,000 个元素 |
Json Load | 54 MBps | 每秒 53,000 个元素 |
这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能。
最佳做法
本部分介绍有关从 Dataflow 写入 BigQuery 的最佳实践。
一般注意事项
Storage Write API 具有配额限制。对于大多数流水线,连接器会处理这些限制。但是,某些场景可能会耗尽可用的 Storage Write API 流。例如,如果流水线使用自动分片和自动扩缩并具有大量目标位置,则可能会发生此问题,尤其是在具有多变工作负载的长时间运行的作业中。如果发生此问题,请考虑使用
STORAGE_WRITE_API_AT_LEAST_ONCE
来避免此问题。使用 Google Cloud 指标来监控 Storage Write API 配额用量。
使用文件加载时,Avro 的性能通常优于 JSON。如需使用 Avro,请调用
withAvroFormatFunction
。默认情况下,加载作业与 Dataflow 作业在同一项目中运行。如需指定其他项目,请调用
withLoadJobProjectId
。使用 Java SDK 时,请考虑创建一个表示 BigQuery 表架构的类。然后,在流水线中调用
useBeamSchema
,以便在 Apache BeamRow
和 BigQueryTableRow
类型之间自动转换。如需查看架构类的示例,请参阅ExampleModel.java
。如果要加载具有包含数千个字段的复杂架构的表,请考虑调用
withMaxBytesPerPartition
来为每个加载作业设置较小的大小上限。
流处理流水线
以下建议适用于流处理流水线。
对于流式处理流水线,我们建议使用 Storage Write API(
STORAGE_WRITE_API
或STORAGE_API_AT_LEAST_ONCE
)。流式处理流水线可以使用文件加载,但这种方法有以下缺点:
请尽可能考虑使用
STORAGE_WRITE_API_AT_LEAST_ONCE
。这可能会导致将重复的记录写入 BigQuery,但比STORAGE_WRITE_API
费用更低,且可伸缩性更强。一般而言,请避免使用
STREAMING_INSERTS
。流式插入比 Storage Write API 要贵,并且性能更低。数据分片可以提高流式处理流水线的性能。对于大多数流水线而言,自动分片是一个很好的起点。但是,您可以按如下方式调整分片:
- 对于
STORAGE_WRITE_API
,请调用withNumStorageWriteApiStreams
以设置写入流的数量。 - 对于
FILE_LOADS
,请调用withNumFileShards
以设置文件分片数量。
- 对于
如果您使用流式插入,我们建议您将
retryTransientErrors
设置为重试政策。
批处理流水线
以下建议适用于批处理流水线。
对于大多数大型批处理流水线,我们建议先尝试
FILE_LOADS
。批处理流水线可以使用STORAGE_WRITE_API
,但对于大规模情况(1,000 个或更多的 vCPU),或者如果并发流水线正在运行,可能会超出配额限制。Apache Beam 不会限制批量STORAGE_WRITE_API
作业的写入流数量上限,因此作业最终会达到 BigQuery Storage API 限制。使用
FILE_LOADS
时,您可能会耗尽共享 BigQuery 槽池或预留槽池。如果遇到此类故障,请尝试以下方法:- 减小作业的工作器数量上限或工作器大小。
- 购买更多预留槽。
- 考虑使用
STORAGE_WRITE_API
。
使用
STORAGE_WRITE_API
可能会使中小型流水线(少于 1,000 个 vCPU)受益。对于这些较小的作业,如果您需要死信队列或FILE_LOADS
共享槽池不足,请考虑使用STORAGE_WRITE_API
。如果您可以容忍重复数据,请考虑使用
STORAGE_WRITE_API_AT_LEAST_ONCE
。此模式可能会导致将重复记录写入 BigQuery,但费用可能会比STORAGE_WRITE_API
选项低。不同写入模式的执行方式可能会因流水线的特性而异。可通过实验找到最适合您的工作负载的写入模式。
处理行级错误
本部分介绍了如何处理可能在行级层发生的错误,例如输入数据格式错误或架构不匹配。
对于 Storage Write API,所有无法写入的行都会被放入单独的 PCollection
中。如需获取此集合,请对 WriteResult
对象调用 getFailedStorageApiInserts
。如需查看此方法的示例,请参阅将数据流式插入 BigQuery。
最好将错误发送到死信队列或表,以供日后进行处理。如需详细了解此模式,请参阅 BigQueryIO
死信模式。
对于 FILE_LOADS
,如果在加载数据时发生错误,则加载作业会失败,并且流水线会抛出运行时异常。您可以在 Dataflow 日志中查看错误或查看 BigQuery 作业历史记录。I/O 连接器不会返回个别失败行的相关信息。
如需详细了解如何排查错误,请参阅 BigQuery 连接器错误。
示例
以下示例展示了如何使用 Dataflow 写入 BigQuery。
写入现有表
以下示例会创建一个将 PCollection<MyData>
写入 BigQuery 的批处理流水线,其中 MyData
是自定义数据类型。
BigQueryIO.write()
方法会返回 BigQueryIO.Write<T>
类型,用于配置写入操作。如需了解详情,请参阅 Apache Beam 文档中的写入表格。此代码示例会将数据写入现有表 (CREATE_NEVER
) 并将新行附加到表 (WRITE_APPEND
)。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
写入新表或现有表
以下示例在目标表不存在时通过将创建处置方式设置为 CREATE_IF_NEEDED
来创建新表。使用此选项时,您必须提供表架构。如果创建新表,则连接器使用此架构。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
将数据流式传输到 BigQuery
以下示例展示了如何通过将写入模式设置为 STORAGE_WRITE_API
,使用“正好一次”语义来流式插入数据
并非所有流处理流水线都需要“正好一次”语义。例如,您可以从目标表中手动移除重复项。如果您的场景可以接受重复记录的可能性,请考虑将写入方法设置为 STORAGE_API_AT_LEAST_ONCE
来使用“至少一次”语义。此方法通常更高效,可缩短大多数流水线的延迟时间。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
后续步骤
- 如需详细了解 BigQuery I/O 连接器,请参阅 Apache Beam 文档。
- 了解如何使用 Storage Write API 将数据流式插入到 BigQuery(博文)。