使用 Dataflow 流式传输更改
借助 Bigtable Beam 连接器,您可以使用 Dataflow 读取 Bigtable 数据更改记录,而无需跟踪或处理代码中的分区更改,因为连接器会处理相应逻辑。
本文档介绍了如何配置和使用 Bigtable Beam 连接器以通过 Dataflow 流水线读取变更数据流。在阅读本文档之前,您应该先阅读变更数据流概览,并熟悉 Dataflow。
构建自己的流水线的替代方案
如果您不想构建自己的 Dataflow 流水线,则可以使用以下任一选项。
您可以使用 Google 提供的 Dataflow 模板。
您也可以以 Bigtable 教程或快速入门中的代码示例为基础构建您的代码。
确保您生成的代码使用 google cloud libraries-bom
26.14.0 或更高版本。
连接器详细信息
借助 Bigtable Beam 连接器方法 BigtableIO.readChangeStream
,您可以读取可处理的数据更改记录流 (ChangeStreamMutation
)。Bigtable Beam 连接器是 Apache Beam GitHub 代码库的一个组件。如需了解连接器代码,请参阅 BigtableIO.java
中的注释。
您必须将该连接器与 Beam 2.48.0 或更高版本搭配使用。请检查 Apache Beam 运行时支持,以确保您使用的是受支持的 Java 版本。然后,您可以将使用该连接器的流水线部署到 Dataflow,以便处理资源的预配和管理,并帮助实现流式数据处理的可伸缩性和可靠性。
如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档。
不使用事件时间对数据进行分组
使用 Bigtable Beam 连接器流式传输的数据更改记录与依赖事件时间的 Dataflow 函数不兼容。
如复制和水印中所述,如果分区的复制过程未与实例的其余部分同步,则低水印可能不会前进。低水印停止前进时,可能会导致变更数据流停滞。
为防止数据流停滞,Bigtable Beam 连接器会输出所有数据,输出时间戳为零。零时间戳使 Dataflow 将所有数据更改记录都视为迟到数据。因此,依赖于事件时间的 Dataflow 函数与 Bigtable 变更数据流不兼容。具体来说,您不能使用数据选取函数、事件时间触发器或事件时间计时器。
而是可以将 GlobalWindows 与非事件时间触发器结合使用,将这些迟到数据分组到窗格中,如本教程中的示例所示。如需详细了解触发器和窗格,请参阅 Beam 编程指南中的触发器。
自动扩缩
连接器支持 Dataflow 自动扩缩,在使用 Runner v2(必需)时,该功能默认启用。Dataflow 自动扩缩算法会考虑估算的变更数据流积压,您可以在 Backlog
部分的 Dataflow 监控页面上对其进行监控。部署作业时请使用 --maxNumWorkers
标志以限制工作器数量。
如需手动扩缩流水线(而不是使用自动扩缩),请参阅手动扩缩流处理流水线。
限制
在将 Bigtable Beam 连接器与 Dataflow 搭配使用之前,请注意以下限制。
Dataflow Runner V2
只能使用 Dataflow Runner v2 执行连接器。若要启用此功能,请在命令行参数中指定 --experiments=use_runner_v2
。使用 Runner v1 运行会导致流水线失败,并出现以下异常:
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow
快照
连接器不支持 Dataflow 快照。
重复消息
Bigtable Beam 连接器会按提交时间戳顺序串流每个行键和每个集群的更改,但由于它有时会从数据流中的更早时间重启,因此可能会产生重复项。
准备工作
在使用该连接器之前,请先满足以下前提条件。
设置身份验证
如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
如需了解详情,请参阅 Set up authentication for a local development environment。
如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud。
启用变更数据流
您必须先在表上启用变更数据流,然后才能读取数据流。您还可以创建启用了变更数据流的新表。
更改数据流元数据表
当您使用 Dataflow 流式传输更改时,Bigtable Beam 连接器会默认创建一个名为 __change_stream_md_table
的元数据表。变更数据流元数据表用于管理连接器的运行状态,并存储与数据更改记录相关的元数据。
默认情况下,连接器会在正在流式传输的表所在的实例中创建表。为确保该表正常运行,元数据表的应用配置文件必须使用单集群路由,并且已启用单行事务。
如需详细了解如何使用 Bigtable Beam 连接器从 Bigtable 流式传输更改,请参阅 BigtableIO 文档。
所需的角色
如需获得使用 Dataflow 读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。
若要从 Bigtable 读取更改,您需要以下角色:
- 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)
若要运行 Dataflow 作业,您需要以下角色:
- 针对包含 Cloud 资源的项目的 Dataflow Developer (
roles/dataflow.developer
) - 针对包含 Cloud 资源的项目的 Dataflow Worker (roles/dataflow.worker)
- 针对您计划使用的 Cloud Storage 存储桶的 Storage Object Admin (roles/storage.objectAdmin)
如需详细了解如何授予角色,请参阅管理访问权限。
将 Bigtable Beam 连接器添加为依赖项
将类似于以下依赖项的代码添加到 Maven pom.xml 文件中。版本必须为 2.48.0 或更高版本。
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
读取变更数据流
若要构建 Dataflow 流水线以读取数据更改记录,请配置连接器,然后添加转换和接收器。然后,使用连接器读取 Beam 流水线中的 ChangeStreamMutation
对象。
本部分中的代码示例使用 Java 编写,演示了如何构建流水线并使用它将键值对转换为字符串。每个键值对都由一个行键和一个 ChangeStreamMutation
对象组成。流水线将每个对象的条目转换为以逗号分隔的字符串。
构建流水线
此 Java 代码示例演示了如何构建流水线:
处理数据更改记录
此示例演示了如何循环遍历行的数据更改记录中的所有条目,以及如何根据条目类型调用转换为字符串的方法。
如需查看数据更改记录可以包含的条目类型列表,请参阅数据更改记录中包含的内容。
在以下示例中,系统会转换写入条目:
在此示例中,系统会转换删除单元条目:
在以下示例中,系统会转换删除列族条目:
监控
通过 Google Cloud 控制台中的以下资源,您可以在运行 Dataflow 流水线来读取 Bigtable 变更数据流时监控 Google Cloud 资源:
请特别检查以下指标:
- 在 Bigtable 监控页面上,检查以下指标:
- 指标
cpu_load_by_app_profile_by_method_by_table
中的变更数据流的 CPU 利用率数据。显示变更数据流对集群 CPU 使用率的影响。 - 变更数据流存储空间利用率(字节)(
change_stream_log_used_bytes
)。
- 指标
- 在 Dataflow 监控页面上,查看数据新鲜度,其中显示了当前时间和水印之间的差异。此过程大约需要两分钟,偶尔会出现高峰,此时会多花一两分钟。如果数据新鲜度指标始终高于该阈值,则表示您的流水线可能资源不足,您应该添加更多 Dataflow 工作器。数据新鲜度并不能表明数据更改记录的处理速度是否缓慢。
- Dataflow
processing_delay_from_commit_timestamp_MEAN
指标可让您了解数据更改记录在作业生命周期内的平均处理时间。
在监控正在读取 Bigtable 更改数据流的 Dataflow 流水线时,Bigtable server/latencies
指标没有用处,因为它反映的是流式传输请求时长,而不是数据更改记录处理延迟时间。更改流中的高延迟时间并不意味着请求的处理速度缓慢;而是表示连接保持打开状态的时间很长。