使用 Dataflow 流式传输更改

借助 Bigtable Beam 连接器,您可以使用 Dataflow 读取 Bigtable 数据变更记录,而无需跟踪或处理代码中的分区更改,因为该连接器会为您处理该逻辑。

本文档介绍了如何配置和使用 Bigtable Beam 连接器,以便使用 Dataflow 流水线读取变更数据流。 在阅读本文档之前,您应该先阅读变更数据流概览,并熟悉 Dataflow

构建自己的流水线的替代方案

如果您不想构建自己的 Dataflow 流水线,则可以使用以下任一选项。

您可以使用 Google 提供的 Dataflow 模板。

您也可以以 Bigtable 教程或快速入门中的代码示例为基础构建您的代码。

确保您生成的代码使用 google cloud libraries-bom 26.14.0 或更高版本。

连接器详细信息

借助 Bigtable Beam 连接器方法 BigtableIO.readChangeStream,您可以读取可处理的数据更改历史记录 (ChangeStreamMutation) 流。Bigtable Beam 连接器是 Apache Beam GitHub 代码库的一个组件。如需了解连接器代码,请参阅 BigtableIO.java 中的注释。

您必须将该连接器与 Beam 2.48.0 或更高版本搭配使用。请检查 Apache Beam 运行时支持,以确保您使用的是受支持的 Java 版本。然后,您可以将使用该连接器的流水线部署到 Dataflow,以便处理资源的预配和管理,并帮助实现流式数据处理的可伸缩性和可靠性。

如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档

不使用事件时间对数据进行分组

使用 Bigtable Beam 连接器流式传输的数据变更记录与依赖于事件时间的 Dataflow 函数不兼容。

复制和水印中所述,如果分区的复制过程未与实例的其余部分同步,则低水印可能不会前进。低水印停止前进时,可能会导致变更数据流停滞。

为防止数据流停止,Bigtable Beam 连接器会在输出时间戳为零的情况下输出所有数据。零时间戳使 Dataflow 将所有数据更改记录都视为迟到数据。因此,依赖于事件时间的 Dataflow 函数与 Bigtable 变更数据流不兼容。具体来说,您不能使用数据选取函数事件时间触发器事件时间计时器

相反,您可以将 GlobalWindows 与非事件时间触发器结合使用,以将此延迟数据分组到窗格中,如本教程中的示例所示。如需详细了解触发器和窗格,请参阅 Beam 编程指南中的触发器

自动扩缩

连接器支持 Dataflow 自动扩缩,使用 Runner v2(必需)时,该自动扩缩功能默认处于启用状态。Dataflow 自动扩缩算法会考虑估算的变更数据流积压,相关信息可在 Dataflow 监控页面的 Backlog 部分进行监控。部署作业时请使用 --maxNumWorkers 标志以限制工作器数量。

如需手动扩缩流水线(而不是使用自动扩缩),请参阅手动扩缩流处理流水线

限制

将 Bigtable Beam 连接器与 Dataflow 一起使用之前,请注意以下限制。

Dataflow Runner V2

只能使用 Dataflow Runner v2 执行连接器。若要启用此功能,请在命令行参数中指定 --experiments=use_runner_v2。使用 Runner v1 运行会导致流水线失败,并出现以下异常:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

快照

连接器不支持 Dataflow 快照

重复消息

Bigtable Beam 连接器会按提交时间戳顺序流式传输每个行键和每个集群的更改,但由于它有时会在数据流中的较早时间重新开始,因此可能会产生重复项。

准备工作

在使用连接器之前,请先满足以下前提条件。

设置身份验证

如需从本地开发环境使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用用户凭据设置应用默认凭据。

  1. 安装 Google Cloud CLI。
  2. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  3. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login

如需了解详情,请参阅 为本地开发环境设置身份验证

如需了解如何为生产环境设置身份验证,请参阅 为在 Google Cloud 上运行的代码设置应用默认凭据

启用变更数据流

您必须先对表启用变更数据流,然后才能读取表。您还可以创建新表并启用变更数据流。

所需的角色

如需获得使用 Dataflow 读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。

若要从 Bigtable 读取更改,您需要以下角色:

  • 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)

若要运行 Dataflow 作业,您需要以下角色:

如需详细了解如何授予角色,请参阅管理访问权限

您还可以通过自定义角色或其他预定义角色获取所需的权限。

将 Bigtable Beam 连接器添加为依赖项

将类似于以下依赖项的代码添加到 Maven pom.xml 文件中。版本必须为 2.48.0 或更高版本。

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

读取变更数据流

若要构建 Dataflow 流水线以读取数据更改记录,请配置连接器,然后添加转换和接收器。然后,使用连接器读取 Beam 流水线中的 ChangeStreamMutation 对象。

本部分中的代码示例使用 Java 编写,演示了如何构建流水线并使用它将键值对转换为字符串。每个键值对都由一个行键和一个 ChangeStreamMutation 对象组成。流水线将每个对象的条目转换为以逗号分隔的字符串。

构建流水线

此 Java 代码示例演示了如何构建流水线:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

处理数据更改记录

此示例演示了如何循环遍历行的数据更改记录中的所有条目,以及如何根据条目类型调用转换为字符串的方法。

如需查看数据更改记录可以包含的条目类型列表,请参阅数据更改记录中包含的内容

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

在以下示例中,系统会转换写入条目:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

在此示例中,系统会转换删除单元条目:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

在以下示例中,系统会转换删除列族条目:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

监控

通过 Google Cloud 控制台中的以下资源,您可以在运行 Dataflow 流水线来读取 Bigtable 变更数据流时监控 Google Cloud 资源:

请特别检查以下指标:

  • 在 Bigtable 监控页面上,检查以下metrics
    • 指标 cpu_load_by_app_profile_by_method_by_table 中的变更数据流的 CPU 利用率数据。显示变更数据流对集群 CPU 使用率的影响。
    • 变更数据流存储空间利用率(字节)(change_stream_log_used_bytes)。
  • 在 Dataflow 监控页面上,查看数据新鲜度,其中显示了当前时间和水印之间的差异。此过程大约需要两分钟,偶尔会出现高峰,此时会多花一两分钟。如果数据新鲜度指标始终高于该阈值,则表示您的流水线可能资源不足,您应添加更多 Dataflow 工作器。数据新鲜度并不能指示数据更改记录处理速度是否缓慢。
  • Dataflow processing_delay_from_commit_timestamp_MEAN 指标可以告诉您作业生命周期内数据变更记录的平均处理时间。

当您监控正在读取 Bigtable 变更流的 Dataflow 流水线时,Bigtable server/latencies 指标没有用,因为它反映的是流式请求时长,而不是数据变更记录处理延迟时间。变更数据流中的延迟时间较长并不意味着请求处理速度很慢,而是表示连接处于打开状态的时间过长。

后续步骤