使用 Dataflow 流式传输更改
借助 Bigtable Beam 连接器,您可以使用 Dataflow 执行以下操作: 读取 Bigtable 数据变更记录,而无需跟踪或 进程分区 更改 因为连接器为您处理该逻辑。
本文档介绍了如何配置和使用 Bigtable Beam 连接器以使用 Dataflow 流水线读取变更数据流。 在阅读本文档之前,您应该先阅读变更数据流概览,并熟悉 Dataflow。
构建自己的流水线的替代方案
如果您不想构建自己的 Dataflow 流水线,则可以使用以下任一选项。
您可以使用 Google 提供的 Dataflow 模板。
您也可以以 Bigtable 教程或快速入门中的代码示例为基础构建您的代码。
确保您生成的代码使用 google cloud libraries-bom
26.14.0 或更高版本。
连接器详细信息
借助 Bigtable Beam 连接器方法 BigtableIO.readChangeStream
,您可以读取数据流
更改记录 (ChangeStreamMutation
)。Bigtable Beam 连接器是
Apache Beam GitHub 的
代码库。如需了解连接器代码,请参阅 BigtableIO.java
中的注释。
您必须将该连接器与 Beam 2.48.0 或更高版本搭配使用。请检查 Apache Beam 运行时支持,以确保您使用的是受支持的 Java 版本。然后,您可以将使用该连接器的流水线部署到 Dataflow,以便处理资源的预配和管理,并帮助实现流式数据处理的可伸缩性和可靠性。
如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档。
不使用事件时间对数据进行分组
使用 Bigtable Beam 连接器流式传输的数据更改记录与依赖事件时间的 Dataflow 函数不兼容。
如复制和水印中所述,如果分区的复制过程未与实例的其余部分同步,则低水印可能不会前进。低水印停止前进时,可能会导致变更数据流停滞。
为防止数据流停滞,Bigtable Beam 连接器会输出输出时间戳为 零。零时间戳使 Dataflow 将所有数据更改记录都视为迟到数据。因此,依赖于事件时间的 Dataflow 函数与 Bigtable 变更数据流不兼容。具体来说,您不能使用数据选取函数、事件时间触发器或事件时间计时器。
您可以改用 GlobalWindows 使用非事件时间触发器将这些延迟数据分组到窗格中,如图所示 请参阅本教程中的示例中的说明。如需详细了解触发器和窗格,请参阅 触发器 。
自动扩缩
连接器支持 Dataflow 自动扩缩,在使用 Runner v2(必需)时,该功能默认启用。Dataflow 自动扩缩算法会考虑估算的变更数据流积压,您可以在 Backlog
部分的 Dataflow 监控页面上对其进行监控。部署作业时请使用 --maxNumWorkers
标志以限制工作器数量。
如需手动扩缩流水线(而不是使用自动扩缩),请参阅手动扩缩流处理流水线。
限制
在将 Bigtable Beam 连接器与 Dataflow 搭配使用之前,请注意以下限制。
Dataflow Runner V2
只能使用 Dataflow Runner v2 执行连接器。若要启用此功能,请在命令行参数中指定 --experiments=use_runner_v2
。使用 Runner v1 运行会导致流水线失败,并出现以下异常:
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow
快照
连接器不支持 Dataflow 快照。
重复消息
Bigtable Beam 连接器会按提交时间戳顺序串流每个行键和每个集群的更改,但由于它有时会从数据流中的较早时间重启,因此可能会产生重复项。
准备工作
在使用该连接器之前,请先满足以下前提条件。
设置身份验证
如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
如需了解详情,请参阅 Set up authentication for a local development environment。
如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud。
启用变更数据流
您必须 启用变更数据流 然后才能阅读。您还可以创建启用了变更数据流的新表。
更改数据流元数据表
当您使用 Dataflow 流式传输更改时,Bigtable Beam 连接器会默认创建一个名为 __change_stream_md_table
的元数据表。变更数据流元数据表用于管理连接器的运行状态,并存储与数据更改记录相关的元数据。
默认情况下,该连接器会在与表相同的实例中创建表 哪些视频正在播放为了确保该表正常运行, 元数据表必须使用单集群路由且具有单行 交易已启用。
如需详细了解如何使用 Bigtable 流式传输更改, Bigtable Beam 连接器,请参阅 BigtableIO 文档。
所需的角色
如需获得使用 Dataflow 读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。
若要从 Bigtable 读取更改,您需要以下角色:
- 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)
若要运行 Dataflow 作业,您需要以下角色:
- 针对包含 Cloud 资源的项目的 Dataflow Developer (
roles/dataflow.developer
) - 针对包含 Cloud 资源的项目的 Dataflow Worker (roles/dataflow.worker)
- 针对您计划使用的 Cloud Storage 存储桶的 Storage Object Admin (roles/storage.objectAdmin)
如需详细了解如何授予角色,请参阅管理访问权限。
将 Bigtable Beam 连接器添加为依赖项
将类似于以下依赖项的代码添加到 Maven pom.xml 文件中。版本必须为 2.48.0 或更高版本。
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
读取变更数据流
若要构建 Dataflow 流水线以读取数据更改记录,请配置连接器,然后添加转换和接收器。然后,使用连接器读取 Beam 流水线中的 ChangeStreamMutation
对象。
本部分中的代码示例使用 Java 编写,演示了如何构建流水线并使用它将键值对转换为字符串。每个键值对都由一个行键和一个 ChangeStreamMutation
对象组成。流水线将每个对象的条目转换为以逗号分隔的字符串。
构建流水线
此 Java 代码示例演示了如何构建流水线:
处理数据更改记录
此示例演示了如何循环遍历行的数据更改记录中的所有条目,以及如何根据条目类型调用转换为字符串的方法。
如需查看数据更改记录可以包含的条目类型列表,请参阅数据更改记录中包含的内容。
在以下示例中,系统会转换写入条目:
在此示例中,系统会转换删除单元条目:
在以下示例中,系统会转换删除列族条目:
监控
通过 Google Cloud 控制台中的以下资源,您可以在运行 Dataflow 流水线来读取 Bigtable 变更数据流时监控 Google Cloud 资源:
请特别检查以下指标:
- 在 Bigtable 监控页面上,检查以下指标:
- 指标
cpu_load_by_app_profile_by_method_by_table
中的变更数据流的 CPU 利用率数据。显示变更数据流对集群 CPU 使用率的影响。 - 变更数据流存储空间利用率(字节)(
change_stream_log_used_bytes
)。
- 指标
- 在 Dataflow 监控页面上,查看数据新鲜度,其中显示了当前时间和水印之间的差异。此过程大约需要两分钟,偶尔会出现高峰,此时会多花一两分钟。如果数据新鲜度指标持续高于 则表示您的流水线可能资源不足, 应该添加更多 Dataflow 工作器。数据新鲜度并不能表明数据更改记录的处理速度是否缓慢。
- Dataflow
processing_delay_from_commit_timestamp_MEAN
指标可以反映过去 10 天中数据变更记录的平均处理时间, 整个作业生命周期
在以下情况下,Bigtable server/latencies
指标没有用
监控正在读取
Bigtable 变更数据流,因为它反映了流式传输请求
而不是数据变更记录处理延迟时间在
变更数据流并不意味着请求的处理速度很慢;它的意思是
表明连接处于打开状态的时长达到了上限。