使用 Dataflow 流式传输更改

借助 Bigtable Beam 连接器,您可以使用 Dataflow 读取 Bigtable 数据更改记录,而无需跟踪或处理代码中的分区更改,因为连接器会处理相应逻辑。

本文档介绍了如何配置和使用 Bigtable Beam 连接器以通过 Dataflow 流水线读取变更数据流。在阅读本文档之前,您应该先阅读变更数据流概览,并熟悉 Dataflow

构建自己的流水线的替代方案

如果您不想构建自己的 Dataflow 流水线,则可以使用以下任一选项。

您可以使用 Google 提供的 Dataflow 模板。

您也可以以 Bigtable 教程或快速入门中的代码示例为基础构建您的代码。

确保您生成的代码使用 google cloud libraries-bom 26.14.0 或更高版本。

连接器详细信息

借助 Bigtable Beam 连接器方法 BigtableIO.readChangeStream,您可以读取可处理的数据更改记录流 (ChangeStreamMutation)。Bigtable Beam 连接器是 Apache Beam GitHub 代码库的一个组件。如需了解连接器代码,请参阅 BigtableIO.java 中的注释。

您必须将该连接器与 Beam 2.48.0 或更高版本搭配使用。请检查 Apache Beam 运行时支持,以确保您使用的是受支持的 Java 版本。然后,您可以将使用该连接器的流水线部署到 Dataflow,以便处理资源的预配和管理,并帮助实现流式数据处理的可伸缩性和可靠性。

如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档

不使用事件时间对数据进行分组

使用 Bigtable Beam 连接器流式传输的数据更改记录与依赖事件时间的 Dataflow 函数不兼容。

复制和水印中所述,如果分区的复制过程未与实例的其余部分同步,则低水印可能不会前进。低水印停止前进时,可能会导致变更数据流停滞。

为防止数据流停滞,Bigtable Beam 连接器会输出所有数据,输出时间戳为零。零时间戳使 Dataflow 将所有数据更改记录都视为迟到数据。因此,依赖于事件时间的 Dataflow 函数与 Bigtable 变更数据流不兼容。具体来说,您不能使用数据选取函数事件时间触发器事件时间计时器

而是可以将 GlobalWindows 与非事件时间触发器结合使用,将这些迟到数据分组到窗格中,如本教程中的示例所示。如需详细了解触发器和窗格,请参阅 Beam 编程指南中的触发器

自动扩缩

连接器支持 Dataflow 自动扩缩,在使用 Runner v2(必需)时,该功能默认启用。Dataflow 自动扩缩算法会考虑估算的变更数据流积压,您可以在 Backlog 部分的 Dataflow 监控页面上对其进行监控。部署作业时请使用 --maxNumWorkers 标志以限制工作器数量。

如需手动扩缩流水线(而不是使用自动扩缩),请参阅手动扩缩流处理流水线

限制

在将 Bigtable Beam 连接器与 Dataflow 搭配使用之前,请注意以下限制。

Dataflow Runner V2

只能使用 Dataflow Runner v2 执行连接器。若要启用此功能,请在命令行参数中指定 --experiments=use_runner_v2。使用 Runner v1 运行会导致流水线失败,并出现以下异常:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

快照

连接器不支持 Dataflow 快照

重复消息

Bigtable Beam 连接器会按提交时间戳顺序串流每个行键和每个集群的更改,但由于它有时会从数据流中的更早时间重启,因此可能会产生重复项。

准备工作

在使用该连接器之前,请先满足以下前提条件。

设置身份验证

如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

如需了解详情,请参阅 Set up authentication for a local development environment

如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud

启用变更数据流

您必须先在表上启用变更数据流,然后才能读取数据流。您还可以创建启用了变更数据流的新表

更改数据流元数据表

当您使用 Dataflow 流式传输更改时,Bigtable Beam 连接器会默认创建一个名为 __change_stream_md_table 的元数据表。变更数据流元数据表用于管理连接器的运行状态,并存储与数据更改记录相关的元数据。

默认情况下,连接器会在正在流式传输的表所在的实例中创建表。为确保该表正常运行,元数据表的应用配置文件必须使用单集群路由,并且已启用单行事务。

如需详细了解如何使用 Bigtable Beam 连接器从 Bigtable 流式传输更改,请参阅 BigtableIO 文档

所需的角色

如需获得使用 Dataflow 读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。

若要从 Bigtable 读取更改,您需要以下角色:

  • 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)

若要运行 Dataflow 作业,您需要以下角色:

如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

将 Bigtable Beam 连接器添加为依赖项

将类似于以下依赖项的代码添加到 Maven pom.xml 文件中。版本必须为 2.48.0 或更高版本。

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

读取变更数据流

若要构建 Dataflow 流水线以读取数据更改记录,请配置连接器,然后添加转换和接收器。然后,使用连接器读取 Beam 流水线中的 ChangeStreamMutation 对象。

本部分中的代码示例使用 Java 编写,演示了如何构建流水线并使用它将键值对转换为字符串。每个键值对都由一个行键和一个 ChangeStreamMutation 对象组成。流水线将每个对象的条目转换为以逗号分隔的字符串。

构建流水线

此 Java 代码示例演示了如何构建流水线:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

处理数据更改记录

此示例演示了如何循环遍历行的数据更改记录中的所有条目,以及如何根据条目类型调用转换为字符串的方法。

如需查看数据更改记录可以包含的条目类型列表,请参阅数据更改记录中包含的内容

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

在以下示例中,系统会转换写入条目:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

在此示例中,系统会转换删除单元条目:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

在以下示例中,系统会转换删除列族条目:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

监控

通过 Google Cloud 控制台中的以下资源,您可以在运行 Dataflow 流水线来读取 Bigtable 变更数据流时监控 Google Cloud 资源:

请特别检查以下指标:

  • 在 Bigtable 监控页面上,检查以下指标
    • 指标 cpu_load_by_app_profile_by_method_by_table 中的变更数据流的 CPU 利用率数据。显示变更数据流对集群 CPU 使用率的影响。
    • 变更数据流存储空间利用率(字节)(change_stream_log_used_bytes)。
  • 在 Dataflow 监控页面上,查看数据新鲜度,其中显示了当前时间和水印之间的差异。此过程大约需要两分钟,偶尔会出现高峰,此时会多花一两分钟。如果数据新鲜度指标始终高于该阈值,则表示您的流水线可能资源不足,您应该添加更多 Dataflow 工作器。数据新鲜度并不能表明数据更改记录的处理速度是否缓慢。
  • Dataflow processing_delay_from_commit_timestamp_MEAN 指标可让您了解数据更改记录在作业生命周期内的平均处理时间。

在监控正在读取 Bigtable 更改数据流的 Dataflow 流水线时,Bigtable server/latencies 指标没有用处,因为它反映的是流式传输请求时长,而不是数据更改记录处理延迟时间。更改流中的高延迟时间并不意味着请求的处理速度缓慢;而是表示连接保持打开状态的时间很长。

后续步骤