使用 Dataflow 流式传输更改

借助 Bigtable Beam 连接器,您可以使用 Dataflow 执行以下操作: 读取 Bigtable 数据变更记录,而无需跟踪或 进程分区 更改 因为连接器为您处理该逻辑。

本文档介绍了如何配置和使用 Bigtable Beam 连接器以使用 Dataflow 流水线读取变更数据流。 在阅读本文档之前,您应该先阅读变更数据流概览,并熟悉 Dataflow

构建自己的流水线的替代方案

如果您不想构建自己的 Dataflow 流水线,则可以使用以下任一选项。

您可以使用 Google 提供的 Dataflow 模板。

您也可以以 Bigtable 教程或快速入门中的代码示例为基础构建您的代码。

确保您生成的代码使用 google cloud libraries-bom 26.14.0 或更高版本。

连接器详细信息

借助 Bigtable Beam 连接器方法 BigtableIO.readChangeStream,您可以读取数据流 更改记录 (ChangeStreamMutation)。Bigtable Beam 连接器是 Apache Beam GitHub 的 代码库。如需了解连接器代码,请参阅 BigtableIO.java 中的注释。

您必须将该连接器与 Beam 2.48.0 或更高版本搭配使用。请检查 Apache Beam 运行时支持,以确保您使用的是受支持的 Java 版本。然后,您可以将使用该连接器的流水线部署到 Dataflow,以便处理资源的预配和管理,并帮助实现流式数据处理的可伸缩性和可靠性。

如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档

不使用事件时间对数据进行分组

使用 Bigtable Beam 连接器流式传输的数据更改记录与依赖事件时间的 Dataflow 函数不兼容。

复制和水印中所述,如果分区的复制过程未与实例的其余部分同步,则低水印可能不会前进。低水印停止前进时,可能会导致变更数据流停滞。

为防止数据流停滞,Bigtable Beam 连接器会输出输出时间戳为 零。零时间戳使 Dataflow 将所有数据更改记录都视为迟到数据。因此,依赖于事件时间的 Dataflow 函数与 Bigtable 变更数据流不兼容。具体来说,您不能使用数据选取函数事件时间触发器事件时间计时器

您可以改用 GlobalWindows 使用非事件时间触发器将这些延迟数据分组到窗格中,如图所示 请参阅本教程中的示例中的说明。如需详细了解触发器和窗格,请参阅 触发器

自动扩缩

连接器支持 Dataflow 自动扩缩,在使用 Runner v2(必需)时,该功能默认启用。Dataflow 自动扩缩算法会考虑估算的变更数据流积压,您可以在 Backlog 部分的 Dataflow 监控页面上对其进行监控。部署作业时请使用 --maxNumWorkers 标志以限制工作器数量。

如需手动扩缩流水线(而不是使用自动扩缩),请参阅手动扩缩流处理流水线

限制

在将 Bigtable Beam 连接器与 Dataflow 搭配使用之前,请注意以下限制。

Dataflow Runner V2

只能使用 Dataflow Runner v2 执行连接器。若要启用此功能,请在命令行参数中指定 --experiments=use_runner_v2。使用 Runner v1 运行会导致流水线失败,并出现以下异常:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

快照

连接器不支持 Dataflow 快照

重复消息

Bigtable Beam 连接器会按提交时间戳顺序串流每个行键和每个集群的更改,但由于它有时会从数据流中的较早时间重启,因此可能会产生重复项。

准备工作

在使用该连接器之前,请先满足以下前提条件。

设置身份验证

如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

如需了解详情,请参阅 Set up authentication for a local development environment

如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud

启用变更数据流

您必须 启用变更数据流 然后才能阅读。您还可以创建启用了变更数据流的新表

更改数据流元数据表

当您使用 Dataflow 流式传输更改时,Bigtable Beam 连接器会默认创建一个名为 __change_stream_md_table 的元数据表。变更数据流元数据表用于管理连接器的运行状态,并存储与数据更改记录相关的元数据。

默认情况下,该连接器会在与表相同的实例中创建表 哪些视频正在播放为了确保该表正常运行, 元数据表必须使用单集群路由且具有单行 交易已启用。

如需详细了解如何使用 Bigtable 流式传输更改, Bigtable Beam 连接器,请参阅 BigtableIO 文档

所需的角色

如需获得使用 Dataflow 读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。

若要从 Bigtable 读取更改,您需要以下角色:

  • 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)

若要运行 Dataflow 作业,您需要以下角色:

如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

将 Bigtable Beam 连接器添加为依赖项

将类似于以下依赖项的代码添加到 Maven pom.xml 文件中。版本必须为 2.48.0 或更高版本。

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

读取变更数据流

若要构建 Dataflow 流水线以读取数据更改记录,请配置连接器,然后添加转换和接收器。然后,使用连接器读取 Beam 流水线中的 ChangeStreamMutation 对象。

本部分中的代码示例使用 Java 编写,演示了如何构建流水线并使用它将键值对转换为字符串。每个键值对都由一个行键和一个 ChangeStreamMutation 对象组成。流水线将每个对象的条目转换为以逗号分隔的字符串。

构建流水线

此 Java 代码示例演示了如何构建流水线:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

处理数据更改记录

此示例演示了如何循环遍历行的数据更改记录中的所有条目,以及如何根据条目类型调用转换为字符串的方法。

如需查看数据更改记录可以包含的条目类型列表,请参阅数据更改记录中包含的内容

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

在以下示例中,系统会转换写入条目:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

在此示例中,系统会转换删除单元条目:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

在以下示例中,系统会转换删除列族条目:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

监控

通过 Google Cloud 控制台中的以下资源,您可以在运行 Dataflow 流水线来读取 Bigtable 变更数据流时监控 Google Cloud 资源:

请特别检查以下指标:

  • 在 Bigtable 监控页面上,检查以下指标
    • 指标 cpu_load_by_app_profile_by_method_by_table 中的变更数据流的 CPU 利用率数据。显示变更数据流对集群 CPU 使用率的影响。
    • 变更数据流存储空间利用率(字节)(change_stream_log_used_bytes)。
  • 在 Dataflow 监控页面上,查看数据新鲜度,其中显示了当前时间和水印之间的差异。此过程大约需要两分钟,偶尔会出现高峰,此时会多花一两分钟。如果数据新鲜度指标持续高于 则表示您的流水线可能资源不足, 应该添加更多 Dataflow 工作器。数据新鲜度并不能表明数据更改记录的处理速度是否缓慢。
  • Dataflow processing_delay_from_commit_timestamp_MEAN 指标可以反映过去 10 天中数据变更记录的平均处理时间, 整个作业生命周期

在以下情况下,Bigtable server/latencies 指标没有用 监控正在读取 Bigtable 变更数据流,因为它反映了流式传输请求 而不是数据变更记录处理延迟时间在 变更数据流并不意味着请求的处理速度很慢;它的意思是 表明连接处于打开状态的时长达到了上限。

后续步骤