本文档介绍了如何将数据从 BigQuery 读取到 Dataflow。
概览
对于大多数用例,请考虑使用托管式 I/O 从 BigQuery 读取数据。托管式 I/O 提供自动升级和一致的配置 API 等功能。从 BigQuery 读取数据时,托管式 I/O 会执行直接表读取,从而提供最佳读取性能。
如果您需要更高级的性能调优,不妨考虑使用 BigQueryIO
连接器。BigQueryIO
连接器支持直接读取表,也支持从 BigQuery 导出作业读取数据。它还可更精细地控制表记录的反序列化。如需了解详情,请参阅本文档中的使用 BigQueryIO
连接器。
列投影和过滤
为了减少流水线从 BigQuery 读取的数据量,您可以使用以下技巧:
- 列投影用于指定要从表中读取的列的子集。如果您的表包含大量列,但您只需要读取其中的一部分,请使用列投影。
- 行过滤指定要应用于表的谓词。BigQuery 读取操作仅返回与过滤条件匹配的行,这可以减少流水线注入的总数据量。
以下示例从表中读取 "user_name"
和 "age"
列,并过滤掉不符合谓词 "age > 18"
的行。此示例使用托管式 I/O。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证。
从查询结果中读取
以下示例使用托管式 I/O 读取 SQL 查询的结果。它针对 BigQuery 公共数据集运行查询。您还可以使用 SQL 查询从 BigQuery 视图或物化视图中读取数据。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证。
使用 BigQueryIO
连接器
BigQueryIO
连接器支持以下序列化方法:
- 以 Avro 记录的形式读取数据。使用此方法,您可以提供一个函数,用于将 Avro 记录解析为自定义数据类型。
- 将数据作为
TableRow
对象读取。这种方法很方便,因为不需要自定义数据类型。但是,它的性能通常低于读取 Avro 格式的记录。
连接器支持以下两种数据读取选项:
- 导出作业。默认情况下,
BigQueryIO
连接器会运行一个 BigQuery 导出作业,该作业会将表数据写入 Cloud Storage。然后,连接器会从 Cloud Storage 读取数据。 - 直接表读取。此选项比导出作业更快,因为它使用 BigQuery Storage Read API 并跳过导出步骤。如需使用直接表读取,请在构建流水线时调用
withMethod(Method.DIRECT_READ)
。
选择使用哪种选项时,请考虑以下几点:
通常,我们建议使用直接表读取。Storage Read API 比导出作业更适合数据流水线,因为它不需要导出数据的中间步骤。
如果您使用直接读取,则需要支付 Storage Read API 的使用费。请参阅 BigQuery 价格页面中的数据提取价格。
导出作业不产生额外费用。但是,导出作业存在一些限制。如果需要移动大量数据,而及时性最为重要且费用可调整,则建议使用直接读取。
Storage Read API 具有配额限制。使用 Google Cloud 指标监控配额用量。
如果您使用导出作业,请设置
--tempLocation
流水线选项,为导出的文件指定 Cloud Storage 存储桶。使用 Storage Read API 时,您可能会在日志中看到租期到期和会话超时错误,例如:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
当操作花费的时间超过超时值时,可能会发生这些错误,通常发生在运行超过 6 小时的流水线中。为了缓解此问题,请改用文件导出功能。
并行度取决于读取方法:
直接读取:I/O 连接器会根据导出请求的大小生成动态数量的流。它直接从 BigQuery 并行读取这些流。
导出作业:BigQuery 确定要写入 Cloud Storage 的文件数量。文件数取决于查询和数据量。I/O 连接器会并行读取导出的文件。
下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2
工作器上运行。它们未使用 Runner v2。
1 亿条记录 | 1 KB | 1 列 | 吞吐量(字节) | 吞吐量(元素) |
---|---|---|
Storage Read | 120 MBps | 每秒 88,000 个元素 |
Avro Export | 105 MBps | 每秒 78,000 个元素 |
Json Export | 110 MBps | 每秒 81,000 个元素 |
这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能。
示例
以下代码示例使用 BigQueryIO
连接器进行直接表读取。如需改用导出作业,请省略对 withMethod
的调用。
读取 Avro 格式的记录
此示例展示了如何使用 BigQueryIO
连接器读取 Avro 格式的记录。
如需将 BigQuery 数据读取到 Avro 格式的记录中,请使用 read(SerializableFunction)
方法。此方法接受应用定义的函数,该函数会解析 SchemaAndRecord
对象并返回自定义数据类型。连接器的输出是自定义数据类型的 PCollection
。
以下代码会从 BigQuery 表中读取 PCollection<MyData>
,其中 MyData
是应用定义的类。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证。
read
方法采用 SerializableFunction<SchemaAndRecord, T>
接口,该接口定义了一个函数,用于将 Avro 记录转换为自定义数据类。在上面的代码示例中,MyData.apply
方法实现了此转换函数。示例函数会解析 Avro 记录中的 name
和 age
字段,并返回 MyData
实例。
如需指定要读取的 BigQuery 表,请调用 from
方法,如上述示例所示。如需了解详情,请参阅 BigQuery I/O 连接器文档中的表名称。
读取 TableRow
对象
此示例展示了如何使用 BigQueryIO
连接器读取 TableRow
对象。
readTableRows
方法会将 BigQuery 数据读入 TableRow
对象的 PCollection
中。每个 TableRow
都是一个键值对映射,用于保存一行表数据。可以通过调用 from
方法指定要读取的 BigQuery 表。
以下代码会从 BigQuery 表中读取 PCollection<TableRows>
。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证。
此示例还展示了如何访问 TableRow
字典中的值。整数值会被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。