使用 Dataflow 构建变更数据流连接

本页面演示了如何创建 Dataflow 流水线, 使用和转发 Spanner 更改数据 变更数据流。您可以使用本页中的示例代码来 构建自定义流水线

核心概念

以下是 Dataflow 流水线的一些核心概念, 变更数据流。

Dataflow

Dataflow 是一项速度快且经济实惠的无服务器服务, 批处理。它通过使用 开源 Apache Beam 并自动执行基础架构预配和集群管理。 从变更数据流读取数据时,Dataflow 可提供近乎实时的流处理功能。

您可以通过 Dataflow 来使用 Spanner 更改 SpannerIO 中的 连接器,它提供了一种 来查询变更数据流包含 因此您不必管理变更数据流 分区生命周期,这在您使用 Spanner API 的完整实现。借助该连接器,您可以 提供一连串数据更改记录,让您可以将更多精力放在 对应用逻辑的介绍,较少介绍具体的 API 详情和动态 变更数据流分区。我们建议使用 SpannerIO 连接器,而不是 变更数据流数据。

Dataflow 模板是预构建的 Dataflow 流水线 实现常见应用场景所需的资源请参阅 简要了解 Dataflow 模板

Dataflow 流水线

Spanner 变更数据流 Dataflow 流水线由 包括以下四个主要部分:

  1. 具有变更数据流的 Spanner 数据库
  2. SpannerIO 连接器
  3. 用户定义的转换和接收器
  4. 接收器 I/O 写入程序

图片

下面将更详细地介绍上述各项。

Spanner 变更数据流

如需详细了解如何创建变更数据流,请参阅创建 变更数据流

Apache Beam SpannerIO 连接器

这是前文所述的 SpannerIO 连接器。这是一个 源 I/O 连接器发出 PCollection 的数据变更记录 流水线的后续阶段通过 事件时间 针对发出的每条数据变更记录 将是提交时间戳。请注意,发出的记录 无序,并且 SpannerIO 连接器保证不会出现 延迟记录

处理变更数据流时,Dataflow 会使用检查点。 因此,每个工作器最多可能等待配置的检查点间隔 在发送更改以供进一步处理之前缓冲更改。

用户定义的转换

通过用户定义的转换,用户可以聚合、转换或修改 在 Dataflow 流水线中处理数据。常见用途 移除个人身份信息 满足下游数据格式要求以及排序。请参阅 关于使用 API 的 转换

Apache Beam 接收器 I/O 写入程序

Apache Beam 包含 内置 I/O 转换, 用于从 Dataflow 流水线写入数据接收器 例如 BigQuery原生支持大多数常见的数据接收器。

Dataflow 模板

Dataflow 模板提供了一种简单的工具, 基于预构建 Docker 创建 Dataflow 作业的方法 Google Cloud 控制台、Google Cloud CLI 或 Rest API 调用。

对于 Spanner 变更数据流,我们提供了三种 Dataflow Flex 模板:

构建 Dataflow 流水线

本部分涵盖了连接器的初始配置,并提供了 与 Spanner 变更数据流的常见集成示例 功能

要执行这些步骤,您需要使用 Java 开发环境 。如需了解详情,请参阅 使用 Java 创建 Dataflow 流水线

创建变更流

如需详细了解如何创建变更数据流,请参阅创建变更数据流。 如需继续执行后续步骤,您必须拥有 Spanner 数据库 并配置变更数据流

授予精细的访问权限控制权限

如果您希望任何精细的访问权限控制用户运行 Dataflow 作业, 确保用户被授予对数据库的访问权限 对变更数据流和 EXECUTE 具有 SELECT 权限的角色 针对变更数据流的表值函数的权限。此外,请确保 主账号在 SpannerIO 配置中或 Dataflow Flex 模板。

如需了解详情,请参阅精细访问权限控制简介

将 SpannerIO 连接器添加为依赖项

Apache Beam SpannerIO 连接器封装了通过 Cloud Spanner API 直接使用变更数据流的复杂性,将变更数据流数据记录的 PCollection 发送到流水线的后续阶段。

这些对象可以在用户的 Dataflow 流水线的其他阶段使用。变更数据流集成是 SpannerIO 连接器的一部分。为了能够使用 SpannerIO 连接器,需要将依赖项添加到您的 pom.xml 文件中:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

创建元数据数据库

在运行 Apache Beam 流水线。它会将这些元数据保存在 Spanner 中。 由连接器在初始化期间创建的表。您可以指定 配置数据库时将在其中创建此表的数据库 连接器。

具体说明 变更数据流最佳实践, 为此,建议您创建新的数据库,而不是允许 以使用您应用的数据库来存储其元数据表。

使用 SpannerIO 的 Dataflow 作业的所有者 必须拥有以下 IAM 权限

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

配置连接器

Spanner 变更数据流连接器可按如下方式配置:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

以下是 readChangeStream() 选项的说明:

Spanner 配置(必需)

用于配置在其中创建变更数据流的项目、实例和数据库,并且应从中查询变更数据流。 此外,还可以选择指定在访问 IAM 时要使用的数据库角色。 运行 Dataflow 作业的主账号是精细的访问权限控制用户。 该作业假定拥有此数据库角色才能访问变更数据流。对于 如需了解详情,请参阅精细访问权限控制简介

变更数据流名称(必填)

此名称可唯一标识变更数据流。此处的名称必须与创建时使用的名称相同。

元数据实例 ID(可选)

该实例用于存储连接器所使用的元数据,以控制变更数据流 API 数据的使用情况。

元数据数据库 ID(必填)

此数据库用于存储连接器用于控制变更数据流 API 数据使用的元数据。

元数据表名称(可选)

此方法仅应在更新现有流水线时使用。

这是现有元数据表名称, 连接器。供连接器用来存储 控制变更数据流 API 数据的使用。如果此选项 则 Spanner 会创建一个新表,其中包含生成的 名称。

RPC 优先级(可选)

请求 优先级 用于变更数据流查询如果省略此参数,将使用 high priority

InclusiveStartAt(必需)

系统会将给定时间戳的更改返回给调用方。

InclusiveEndAt(可选)

系统会将给定时间戳之前的更改返回给调用方。如果省略此参数,系统将无限期地发出更改。

添加转换和接收器以处理变更数据

完成上述步骤后,已配置的 SpannerIO 连接器已准备就绪 以发出由 DataChangeRecord 对象的 PCollection。 如需查看几个示例,请参阅转换和接收器示例 以各种方式处理此流式数据的流水线配置。

请注意,SpanIO 连接器发出的变更数据流记录是无序的。这是因为 PCollection 不提供任何排序保证。如果您需要有序的流,则必须在流水线中以转换的形式对记录进行分组和排序:请参阅示例:按键排序。您可以扩展此示例,以根据记录的任何字段(例如按事务 ID)对记录进行排序。

转换和接收器示例

您可以定义自己的转换并指定要将数据写入的接收器。Apache Beam 文档提供了大量可应用的转换,以及现成的 I/O 连接器将数据写入外部系统。

示例:按键排序

此代码示例使用 Dataflow 连接器发出按提交时间戳排序的数据变更记录,并按主键分组。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

此代码示例使用状态和计时器来缓冲每个键的记录,并将计时器的到期时间设置为将来的某个用户配置的时间 T(在 BufferKeyUntilOutputTimestamp 函数中定义)。当 Dataflow 水印超过 T 时,此代码会刷新缓冲区中时间戳小于 T 的所有记录,按提交时间戳对这些记录进行排序,并输出一个键值对,其中:

  • 键是输入键,即经过哈希处理为大小为 1000 的存储桶数组的主键。
  • 该值是针对相应键进行缓冲的有序数据变更记录。

对于每个密钥,我们都有以下保证:

  • 系统一定会按照到期时间戳的顺序触发计时器。
  • 下游阶段一定会按元素生成顺序接收元素。

例如,假设对于一个值为 100 的键,计时器分别在 T1T10 时触发,在每个时间戳处生成一组数据变更记录。由于在 T1 输出的数据变更记录是在 T10 输出的数据变更记录之前生成的,因此在 T10 输出的数据变更记录之前,我们也保证会在下一阶段收到数据变更记录,即 T1 输出的数据变更记录。此机制有助于我们保证按主键进行严格的提交时间戳排序,以便进行下游处理。

此过程将重复执行,直到流水线结束且所有数据更改记录都已处理完毕(如果没有指定结束时间,此过程将无限期重复)。

请注意,此代码示例使用状态和计时器(而不是窗口)按键执行排序。其原因在于 按顺序处理这意味着较旧窗口的处理时间可能会晚于较近的窗口,这可能会导致处理顺序混乱。

BreakRecordByModFn

每条数据变更记录可能包含多个 mod。每个 mod 表示对单个主键值的插入、更新或删除。此函数将每条数据更改记录拆分为单独的数据更改记录,每个 mod 一个。

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

此函数接受 DataChangeRecord 并输出由 Spanner 主键键控且经过哈希处理为整数值的 DataChangeRecord

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

计时器和缓冲区是按按键操作的。此函数会缓冲每条数据变更记录,直到水印超过我们想要输出已缓冲数据变更记录的时间戳。

此代码利用循环计时器来确定何时刷新缓冲区:

  1. 当它首次看到某个键的数据更改记录时,会将计时器设置为在数据更改记录的提交时间戳 + incrementIntervalSeconds(用户可配置的选项)时触发。
  2. 当计时器触发时,它会将缓冲区中时间戳小于计时器到期时间的所有数据更改记录添加到 recordsToOutput。如果缓冲区中有数据变更记录,且时间戳大于或等于计时器的到期时间,那么它会将这些数据变更记录重新添加到缓冲区,而不是将其输出。然后,它会将下一个计时器设置为当前计时器的到期时间加上 incrementIntervalInSeconds
  3. 如果 recordsToOutput 不为空,该函数会按提交时间戳和事务 ID 对 recordsToOutput 中的数据更改记录进行排序,然后输出它们。
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

为交易排序

可以将此流水线更改为按事务 ID 和提交时间戳排序。为此,请缓冲每个事务 ID / 提交时间戳对的记录,而不是为每个 Spanner 键缓冲记录。这需要修改 KeyByIdFn 中的代码。

示例:整合事务

此代码示例会读取数据变更记录,将属于同一事务的所有数据变更记录整合到单个元素中,然后输出该元素。请注意,此示例代码输出的事务不是按提交时间戳排序的。

此代码示例使用缓冲区来根据数据变更记录汇编事务。在首次收到属于某笔交易的数据变更记录后,它会读取数据变更记录中的 numberOfRecordsInTransaction 字段,该字段描述了属于该交易的预期数据变更记录数量。它会缓冲属于该事务的数据变更记录,直到已缓冲记录数量与 numberOfRecordsInTransaction 匹配,然后输出捆绑的数据变更记录。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

此函数接受 DataChangeRecord 并输出由事务 ID 键控的 DataChangeRecord

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn 会缓冲收到的以下键值对: 来自KeyByTransactionIdFn{TransactionId, DataChangeRecord}和 根据 TransactionId 将它们分组缓冲。当 缓冲的记录数等于 此函数会对 DataChangeRecord 对象进行排序, 按记录序列分组,并输出 {CommitTimestamp, TransactionId}Iterable<DataChangeRecord>

在这里,我们假设 SortKey 是一个用户定义的类,表示 {CommitTimestamp, TransactionId} 对。请参阅 SortKey实现示例

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

示例:按交易代码过滤

在为修改用户数据的交易添加标记时,相应的代码及其类型会存储为 DataChangeRecord 的一部分。以下示例演示了如何根据用户定义的事务标记和系统标记过滤变更数据流记录:

针对 my-tx-tag 的用户定义代码过滤:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

系统标记过滤/TTL 审核:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

示例:提取整行

此示例使用名为 Singer 的 Spanner 表,该表具有以下定义:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

在变更数据流的默认 OLD_AND_NEW_VALUES 值捕获模式下,当有对 Spanner 行的更新时,接收到的数据变更记录将仅包含已更改的列。已跟踪但未更改的列将不会包含在记录中。mod 的主键可用于在数据更改记录的提交时间戳处执行 Spanner 快照读取,以提取未更改的列,甚至检索整行。

请注意,若要成功读取快照,可能需要将数据库保留政策更改为大于或等于变更数据流保留政策的值。

另请注意,建议使用 NEW_ROW 值捕获类型,这种方法会更高效执行此操作,因为它默认返回行的所有跟踪列,不需要额外的快照读入 Spanner。

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

此转换将在收到的每条记录的提交时间戳处执行过时读取,并将整行映射到 JSON。

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

此代码会创建一个 Spanner 数据库客户端来执行完整行提取,并将会话池配置为只有几个会话,并依序在 ToFullRowJsonFn 的一个实例中执行读取操作。Dataflow 会确保生成此函数的多个实例,每个实例都有自己的客户端池。

示例:从 Spanner 到 Pub/Sub

在这种情况下,调用方会将录制内容作为如下内容流式传输到 Pub/Sub: 而不进行任何分组或聚合操作。这是一个很好 适合触发下游处理,因为流式传输所有新行 插入 Spanner 表 Pub/Sub 处理。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

请注意,Pub/Sub 接收器可以配置为确保“正好一次”语义。

示例:从 Spanner 迁移到 Cloud Storage

在此场景中,调用方会将给定窗口中的所有记录分组,并将该组保存在单独的 Cloud Storage 文件中。这非常适合用于分析和时间点归档,此类归档独立于 Spanner 的保留期限。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

请注意,默认情况下,Cloud Storage 接收器提供“至少一次”语义。通过额外的处理,可以将其修改为具有正好一次语义。

我们还提供了适用于此用例的 Dataflow 模板:请参阅将变更数据流连接到 Cloud Storage

示例:从 Spanner 到 BigQuery(分类记录表)

此时,调用方会将变更记录流式传输到 BigQuery 中。每条数据更改记录在 BigQuery 中反映为一行。非常适合用于分析。此代码使用之前在提取完整行部分中定义的函数来检索记录的完整行并将其写入 BigQuery。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

请注意,默认情况下,BigQuery 接收器提供“至少一次”语义。通过额外的处理,可以将其修改为具有正好一次语义。

我们还针对此使用场景提供了 Dataflow 模板:请参阅将变更数据流连接到 BigQuery

监控流水线

有两类指标可用于监控变更数据流 Dataflow 流水线。

标准 Dataflow 指标

Dataflow 提供多种指标来确保作业的健康状况,例如数据新鲜度、系统延迟、作业吞吐量、工作器 CPU 利用率等。如需了解详情,请参阅为 Dataflow 流水线使用 Monitoring

对于变更数据流流水线,需要考虑两个主要指标:系统延迟时间数据新鲜度

通过系统延迟时间,您可以了解当前最长处理或等待处理某个数据项的时长(以秒为单位)。

数据新鲜度会显示从现在(实时)到输出水印的经过。时间 T 的输出水印表示(严格)事件时间早于 T 的所有元素都已经过计算处理。换句话说,数据新鲜度衡量管道在处理它收到的事件方面的最新程度。

如果流水线资源不足,您可以在这两个指标中看到这种影响。因为项目需要等待更长时间才能处理完毕,因此系统延迟时间会增加。数据新鲜度也会提高,因为流水线无法跟上接收的数据量。

自定义变更数据流指标

这些指标在 Cloud Monitoring 中提供,包括:

  • 从在 Spanner 中提交的记录到连接器将其发送到 PCollection 之间的分桶(直方图)延迟时间。此指标可用于查看流水线的任何性能(延迟时间)问题。
  • 读取的数据记录总数。这是连接器发出的记录数的总体指示。这个数字应该不断增长,反映出底层 Spanner 数据库中的写入趋势。
  • 当前正在读取的分区数。应该始终存在正在读取的分区。如果此数字为零,则表示流水线中发生了错误。
  • 连接器执行期间发出的查询总数。这是在流水线的整个执行过程中对 Spanner 实例进行的变更数据流查询的总体指示。这可用于估算从连接器到 Spanner 数据库的负载。

更新现有流水线

可以更新使用 SpannerIO 的正在运行的流水线 处理变更数据流的作业兼容性 检查。待办事项 因此您必须明确设置元数据表名称参数, 更新新作业。使用 metadataTable 的值 流水线选项。

如果您使用的是 Google 提供的 Dataflow 模板,请将 使用 spannerMetadataTableName 参数指定表名称。您还可以修改 明确使用元数据表与方法搭配使用 withMetadataTable(your-metadata-table-name) 英寸 连接器配置。完成后,您可以按照 发布更换服务 job 用于更新正在运行的作业的 Dataflow 文档。

变更数据流和 Dataflow 的最佳实践

以下是构建变更数据流连接的一些最佳实践 使用 Dataflow

使用单独的元数据数据库

我们建议您为 SpannerIO 连接器创建一个单独的数据库,以用于 而不是将其配置为使用您的应用数据库。

如需了解详情,请参阅 考虑使用单独的元数据数据库

调整集群大小

一个行业中初始工作器数量的经验法则 Spanner 变更数据流作业每 1000 次写入一个工作器 。请注意,此估算值可能会因 多种因素,例如每个事务的大小、有多少变更数据流 记录是通过单个事务和其他转换生成的, 数据汇总或接收器。

完成初始资源分配后,请务必跟踪其中提及的指标 在监控流水线中, 以确保流水线运行状况良好我们建议尝试 并监控您的 流水线会处理负载,并在必要时增加节点数量。通过 CPU 利用率是检查负载是否正常以及更多节点的关键指标 所需的资源。

已知限制

自动扩缩

为包含以下内容的任何流水线提供自动扩缩支持 SpannerIO.readChangeStream 需要 Apache Beam 2.39.0 或 。

如果您使用 2.39.0 之前的 Apache Beam 版本,则包含 SpannerIO.readChangeStream 的流水线需要明确指定自动扩缩 NONE 调用的算法,如 横向自动扩缩

接收者 您可以手动扩缩 Dataflow 流水线,而不是使用自动扩缩,具体请参阅 手动伸缩流处理流水线

Runner V2

Spanner 变更数据流连接器需要 Dataflow Runner V2。 这必须在执行期间手动指定,否则将出现错误 。您可以使用以下命令配置作业,以指定 Runner V2--experiments=use_unified_worker,use_runner_v2.

快照

Spanner 变更数据流连接器不支持 Dataflow 快照

正在排空

Spanner 变更数据流连接器不支持 排空作业。 只能取消现有作业。

您还可以更新现有流水线 而不必停止它

OpenCensus

要使用 OpenCensus 来监控流水线,请指定版本 0.28.3 或更高版本。

流水线启动时 NullPointerException

在某些情况下,启动流水线时,Apache Beam 版本 2.38.0 中的bug可能会导致 NullPointerException。这会导致您的作业无法启动,改为显示以下错误消息:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

要解决此问题,请使用 Apache Beam 版本 2.39.0 或更高版本,或者手动指定 beam-sdks-java-core2.37.0的形式:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

更多信息