本文档介绍如何使用 Apache Beam BigQuery I/O 连接器将数据从 BigQuery 读取到 Dataflow。
概览
BigQuery I/O 连接器支持使用以下两种方法从 BigQuery 读取数据:
- 直接表读取。这个选项是最快的,因为它使用 BigQuery Storage Read API。
- 导出作业。如果使用此选项,BigQuery 会运行导出作业,以将表数据写入 Cloud Storage。然后,连接器会从 Cloud Storage 读取导出的数据。此选项效率较低,因为它需要导出步骤。
导出作业是默认选项。如需指定直接读取,请调用 withMethod(Method.DIRECT_READ)
。
连接器将表数据序列化为 PCollection
。PCollection
中的每个元素表示一个表行。连接器支持以下序列化方法:
- 以 Avro 记录的形式读取数据。使用此方法,您可以提供一个函数,用于将 Avro 记录解析为自定义数据类型。
- 将数据作为
TableRow
对象读取。这种方法很方便,因为不需要自定义数据类型。但是,它的性能通常低于读取 Avro 格式的记录。
最大并行数量
此连接器中的并行性取决于读取方法:
直接读取:I/O 连接器会根据导出请求的大小生成动态数量的流。它直接从 BigQuery 并行读取这些流。
导出作业:BigQuery 确定要写入 Cloud Storage 的文件数量。文件数取决于查询和数据量。I/O 连接器会并行读取导出的文件。
性能
下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2
工作器上运行。它们未使用 Runner v2。
1 亿条记录 | 1 KB | 1 列 | 吞吐量(字节) | 吞吐量(元素) |
---|---|---|
Storage Read | 120 MBps | 每秒 88,000 个元素 |
Avro Export | 105 MBps | 每秒 78,000 个元素 |
Json Export | 110 MBps | 每秒 81,000 个元素 |
这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能。
最佳做法
通常,我们建议使用直接表读取 (
Method.DIRECT_READ
)。Storage Read API 比导出作业更适合数据流水线,因为它不需要导出数据的中间步骤。如果您使用直接读取,则需要支付 Storage Read API 的使用费。请参阅 BigQuery 价格页面中的数据提取价格。
导出作业不产生额外费用。但是,导出作业存在一些限制。如果需要移动大量数据,而及时性最为重要且费用可调整,则建议使用直接读取。
Storage Read API 具有配额限制。使用 Google Cloud 指标监控配额用量。
使用 Storage Read API 时,您可能会在日志中看到租期到期和会话超时错误,例如:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
当操作花费的时间超过超时值时,可能会发生这些错误,通常发生在运行超过 6 小时的流水线中。为了缓解此问题,请改用文件导出功能。
使用 Java SDK 时,请考虑创建一个表示 BigQuery 表架构的类。然后,在流水线中调用
useBeamSchema
,以便在 Apache BeamRow
和 BigQueryTableRow
类型之间自动转换。如需查看架构类的示例,请参阅ExampleModel.java
。
示例
本部分中的代码示例使用直接表读取。
如需改用导出作业,请省略对 withMethod
的调用或指定 Method.EXPORT
。然后设置 --tempLocation
流水线选项,为导出的文件指定 Cloud Storage 存储桶。
这些代码示例假定源表有以下列:
name
(字符串)age
(整数)
指定为 JSON 架构文件:
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
读取 Avro 格式的记录
如需将 BigQuery 数据读入 Avro 格式的记录,请使用 read(SerializableFunction)
方法。此方法接受应用定义的函数,该函数会解析 SchemaAndRecord
对象并返回自定义数据类型。连接器的输出是自定义数据类型的 PCollection
。
以下代码会从 BigQuery 表中读取 PCollection<MyData>
,其中 MyData
是应用定义的类。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
read
方法采用 SerializableFunction<SchemaAndRecord, T>
接口,该接口定义了一个函数,用于将 Avro 记录转换为自定义数据类。在上面的代码示例中,MyData.apply
方法实现了此转换函数。示例函数会解析 Avro 记录中的 name
和 age
字段,并返回 MyData
实例。
如需指定要读取的 BigQuery 表,请调用 from
方法,如上述示例所示。如需了解详情,请参阅 BigQuery I/O 连接器文档中的表名称。
读取 TableRow
对象
readTableRows
方法会将 BigQuery 数据读入 TableRow
对象的 PCollection
。每个 TableRow
都是一个键值对映射,用于保存一行表数据。可以通过调用 from
方法指定要读取的 BigQuery 表。
以下代码会从 BigQuery 表中读取 PCollection<TableRows>
。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
此示例还展示了如何访问 TableRow
字典中的值。整数值会被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。
列投影和过滤
使用直接读取 (Method.DIRECT_READ
) 时,您可以减少从 BigQuery 读取并通过网络发送的数据量,从而提高读取操作的效率。
- 列投影:调用
withSelectedFields
以从表中读取部分列。这样,当表包含许多列时,可实现高效读取。 - 行过滤:调用
withRowRestriction
以指定在服务器端过滤数据的谓词。
过滤条件谓词必须具有确定性,并且不支持汇总。
以下示例会投影 "user_name"
和 "age"
列,并过滤掉与谓词 "age > 18"
不匹配的行。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
从查询结果中读取
前面的示例展示了如何从表中读取行。您还可以通过调用 fromQuery
从 SQL 查询结果中读取。此方法将一些计算工作迁移到 BigQuery。您还可以使用此方法对 BigQuery 视图或具体化视图运行查询,从而从中读取数据。
以下示例会对 BigQuery 公共数据集运行查询并读取结果。流水线运行后,您可以在 BigQuery 作业历史记录中看到查询作业。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
后续步骤
- 阅读 Bigtable I/O 连接器文档。
- 参阅 Google 提供的模板列表。