使用 Dataflow 构建变更数据流连接

本页演示了如何使用变更数据流创建可使用和转发 Spanner 变更数据的 Dataflow 流水线。您可以使用本页上的示例代码来构建自定义流水线。

核心概念

以下是有关变更数据流的 Dataflow 流水线的一些核心概念。

Dataflow

Dataflow 是一项无服务器、快速且经济实惠的服务,支持流式处理和批量处理。它可提供便携性,让您能够使用开源 Apache Beam 库编写处理作业,并自动执行基础架构预配和集群管理。 从变更数据流读取数据时,Dataflow 可提供近乎实时的流式传输。

您可以使用 Dataflow 和 SpannerIO 连接器来使用 Spanner 变更数据流,该连接器可针对 Spanner API 提供抽象层,以便查询变更数据流。使用此连接器时,您无需管理变更数据流分区生命周期,而直接使用 Spanner API 时则需要管理。该连接器可为您提供数据更改记录流,以便您更专注于应用逻辑,而无需过多关注特定 API 详细信息和动态变更数据流分区。在大多数需要读取变更数据流数据的情况下,我们建议使用 SpannerIO 连接器,而不是 Spanner API。

Dataflow 模板是预构建的 Dataflow 流水线,可实现常见的使用场景。如需了解概览,请参阅 Dataflow 模板

Dataflow 流水线

Spanner 变更数据流 Dataflow 流水线由四个主要部分组成:

  1. 具有变更数据流的 Spanner 数据库
  2. SpannerIO 连接器
  3. 用户定义的转换和接收器
  4. Apache Beam 接收器 I/O 写入器

图片

Spanner 变更数据流

如需详细了解如何创建变更数据流,请参阅创建变更数据流

Apache Beam SpannerIO 连接器

这是前面 Dataflow 部分中介绍的 SpannerIO 连接器。它是一个源 I/O 连接器,可向流水线的后续阶段发出 PCollection 数据更改记录。每个发出的数据更改记录的事件时间将是提交时间戳。请注意,发出的记录是无序的,并且 SpannerIO 连接器保证不会出现延迟记录

使用变更数据流时,Dataflow 会使用检查点。因此,每个工作器可能会在缓冲更改时等待长达配置的检查点间隔时间,然后再发送更改以进行进一步处理。

用户定义的转换

借助用户定义的转换,用户可以在 Dataflow 流水线中聚合、转换或修改处理数据。此功能的常见用途包括移除个人身份信息、满足下游数据格式要求和排序。如需查看有关转换的编程指南,请参阅官方 Apache Beam 文档。

Apache Beam 接收器 I/O 写入器

Apache Beam 包含内置 I/O 连接器,可用于将数据从 Dataflow 流水线写入 BigQuery 等数据接收器。系统本身支持最常见的数据接收器。

Dataflow 模板

Dataflow 模板提供了一种方法,可使用 Google Cloud 控制台、 Google Cloud CLI 或 REST API 调用,基于预构建的 Docker 映像为常见用例创建 Dataflow 作业。

对于 Spanner 变更数据流,我们提供了三个 Dataflow Flex 模板:

使用 Spanner change streams to Pub/Sub 模板时,存在以下限制:

为 Dataflow 模板设置 IAM 权限

在使用列出的三个Flex 模板创建 Dataflow 作业之前,请确保您拥有以下服务账号的必需 IAM 权限

如果您没有所需的 IAM 权限,则必须指定用户管理的工作器服务账号才能创建 Dataflow 作业。如需了解详情,请参阅 Dataflow 安全性和权限

如果您尝试在缺少所有必需权限的情况下从 Dataflow Flex 模板运行作业,则作业可能会失败,并显示无法读取结果文件错误资源权限遭拒错误。如需了解详情,请参阅排查 Flex 模板问题

构建 Dataflow 流水线

本部分介绍了连接器的初始配置,并提供了与 Spanner 变更数据流功能进行常见集成的示例。

如需按照这些步骤操作,您需要一个适用于 Dataflow 的 Java 开发环境。如需了解详情,请参阅使用 Java 创建 Dataflow 流水线

创建变更流

如需详细了解如何创建变更数据流,请参阅创建变更数据流。 如需继续执行后续步骤,您必须拥有已配置变更数据流的 Spanner 数据库。

授予精细访问权限控制权限

如果您预计任何精细访问权限控制用户都会运行 Dataflow 作业,请确保向这些用户授予对数据库角色的访问权限,该角色对变更数据流具有 SELECT 特权,对变更数据流的表值函数具有 EXECUTE 特权。另请确保主账号在 SpannerIO 配置或 Dataflow Flex 模板中指定数据库角色。

如需了解详情,请参阅精细访问权限控制简介

将 SpannerIO 连接器添加为依赖项

Apache Beam SpannerIO 连接器封装了使用 Cloud Spanner API 直接消耗变更数据流的复杂性,并向流水线的后续阶段发出变更数据流数据记录的 PCollection。

这些对象可在用户 Dataflow 流水线的其他阶段中使用。变更数据流集成是 SpannerIO 连接器的一部分。如需使用 SpannerIO 连接器,您需要将依赖项添加到 pom.xml 文件中:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

创建元数据数据库

在运行 Apache Beam 流水线时,连接器需要跟踪每个分区。它会将此元数据保存在连接器在初始化期间创建的 Spanner 表中。在配置连接器时,您需要指定将创建此表的数据库。

变更数据流最佳实践中所述,我们建议为此目的创建一个新数据库,而不是允许连接器使用应用的数据库来存储其元数据表。

使用 SpannerIO 连接器的 Dataflow 作业的所有者需要通过此元数据数据库设置以下 IAM 权限

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

配置连接器

Spanner 变更数据流连接器可按如下方式配置:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

以下是 readChangeStream() 选项的说明:

Spanner 配置(必需)

用于配置创建变更数据流的项目、实例和数据库,以及应从中查询变更数据流的项目、实例和数据库。 还可以选择性地指定当运行 Dataflow 作业的 IAM 主账号是精细访问权限控制用户时要使用的数据库角色。作业会代入此数据库角色,以访问变更数据流。如需了解详情,请参阅精细访问权限控制简介

变更数据流名称(必需)

此名称可唯一标识变更数据流。此处的名称必须与创建时使用的名称相同。

元数据实例 ID(可选)

此实例用于存储连接器使用的元数据,以控制变更数据流 API 数据的使用。

元数据数据库 ID(必需)

此数据库用于存储连接器使用的元数据,以控制变更数据流 API 数据的使用。

元数据表名称(可选)

此参数仅在更新现有流水线时使用。

这是连接器要使用的预先存在的元数据表名称。连接器使用此表来存储元数据,以控制变更数据流 API 数据的使用。如果省略此选项,Spanner 会在连接器初始化时创建一个具有生成名称的新表。

RPC 优先级(可选)

用于变更数据流查询的请求优先级。如果省略此参数,则将使用 high priority

InclusiveStartAt(必需)

自给定时间戳以来的更改会返回给调用方。

InclusiveEndAt(可选)

系统会将截至给定时间戳的更改返回给调用方。如果省略此参数,系统将无限期地发出更改。

添加转换和接收器以处理更改数据

完成上述步骤后,配置的 SpannerIO 连接器即可发出 DataChangeRecord 对象的 PCollection。如需查看以各种方式处理此流式数据的多个示例流水线配置,请参阅转换和接收器示例

请注意,SpannerIO 连接器发出的变更数据流记录是无序的。 这是因为 PCollection 不提供任何排序保证。如果您需要有序的数据流,则必须在流水线中将记录分组并排序为转换:请参阅示例:按键排序。您可以扩展此示例,以根据记录的任何字段(例如事务 ID)对记录进行排序。

转换和接收器示例

您可以定义自己的转换并指定用于写入数据的接收器。 Apache Beam 文档提供了大量可应用的转换,以及可用于将数据写入外部系统的现成 I/O 连接器

示例:按键排序

此代码示例使用 Dataflow 连接器按提交时间戳发出排序的数据更改记录,并按主键进行分组。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

此代码示例使用状态和计时器来缓冲每个键的记录,并将计时器的过期时间设置为未来某个用户配置的时间 T(在 BufferKeyUntilOutputTimestamp 函数中定义)。当 Dataflow 水印通过时间 T 时,此代码会刷新缓冲区中时间戳小于 T 的所有记录,按提交时间戳对这些记录进行排序,并输出一个键值对,其中:

  • 键是输入键,即经过哈希处理后得到的大小为 1000 的桶数组的主键。
  • 该值是为相应键缓冲的有序数据更改记录。

对于每个密钥,我们提供以下保证:

  • 系统保证计时器按到期时间戳的顺序触发。
  • 下游阶段保证会按照元素生成时的顺序接收元素。

例如,如果键的值为 100,则计时器会在 T1T10 分别触发,并在每个时间戳生成一组数据更改记录。由于在 T1 输出的数据更改记录是在 T10 输出的数据更改记录之前生成的,因此可以保证在 T10 输出的数据更改记录之前,下一个阶段也会收到 T1 输出的数据更改记录。此机制有助于我们保证下游处理的每个主键的严格提交时间戳顺序。

此过程将重复执行,直到流水线结束并且所有数据更改记录都已处理完毕(如果未指定结束时间,则会无限期重复执行)。

请注意,此代码示例使用状态和计时器(而非窗口)来按键执行排序。原因是,无法保证窗口按顺序处理。这意味着,旧窗口的处理时间可能会晚于新窗口,从而导致处理顺序混乱。

BreakRecordByModFn

每个数据更改记录可能包含多个 mod。每个 mod 都表示对单个主键值的插入、更新或删除。此函数会将每个数据更改记录拆分为单独的数据更改记录,每个 mod 对应一条记录。

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

此函数接受 DataChangeRecord,并输出一个以 Spanner 主键(已哈希处理为整数值)为键的 DataChangeRecord

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

计时器和缓冲区是按键设置的。此函数会缓冲每个数据更改记录,直到水印通过我们希望输出缓冲数据更改记录的时间戳。

此代码利用循环计时器来确定何时刷新缓冲区:

  1. 当它首次看到某个键的数据更改记录时,会将计时器设置为在数据更改记录的提交时间戳 + incrementIntervalSeconds(用户可配置的选项)时触发。
  2. 当计时器触发时,它会将缓冲区中时间戳小于计时器到期时间的所有数据更改记录添加到 recordsToOutput。如果缓冲区中的数据更改记录的时间戳大于或等于计时器的到期时间,则系统会将这些数据更改记录重新添加到缓冲区,而不是输出它们。然后,它将下一个计时器设置为当前计时器的到期时间加上 incrementIntervalInSeconds
  3. 如果 recordsToOutput 不为空,该函数会按提交时间戳和事务 ID 对 recordsToOutput 中的数据更改记录进行排序,然后输出这些记录。
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

事务排序

此流水线可以更改为按事务 ID 和提交时间戳排序。为此,请为每个事务 ID / 提交时间戳对缓冲记录,而不是为每个 Spanner 键缓冲记录。这需要修改 KeyByIdFn 中的代码。

示例:组合事务

此代码示例会读取数据更改记录,将属于同一事务的所有数据更改记录组装成一个元素,然后输出该元素。请注意,此示例代码输出的事务并非按提交时间戳排序。

此代码示例使用缓冲区从数据更改记录中组装事务。首次收到属于某个事务的数据更改记录时,它会读取数据更改记录中的 numberOfRecordsInTransaction 字段,该字段描述了属于相应事务的预期数据更改记录数。它会缓冲属于相应事务的数据更改记录,直到缓冲的记录数达到 numberOfRecordsInTransaction,然后输出捆绑的数据更改记录。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

此函数接受 DataChangeRecord,并输出以事务 ID 为键的 DataChangeRecord

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn 缓冲区从 KeyByTransactionIdFn 接收 {TransactionId, DataChangeRecord} 的键值对,并根据 TransactionId 将其分批缓冲。当缓冲的记录数等于整个事务中包含的记录数时,此函数会按记录序列对组中的 DataChangeRecord 对象进行排序,并输出 {CommitTimestamp, TransactionId}Iterable<DataChangeRecord> 的键值对。

在此,我们假设 SortKey 是表示 {CommitTimestamp, TransactionId} 对的用户定义类。如需详细了解 SortKey,请参阅实现示例

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

示例:按事务标记过滤

当修改用户数据的事务被标记时,相应的标记及其类型会作为 DataChangeRecord 的一部分存储。以下示例展示了如何根据用户定义的事务标记以及系统标记过滤变更数据流记录:

针对 my-tx-tag 的用户定义标记过滤:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

系统标记过滤/TTL 审核:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

示例:提取完整行

此示例适用于名为 Singer 的 Spanner 表,该表具有以下定义:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

在变更数据流的默认 OLD_AND_NEW_VALUES 值捕获模式下,当 Spanner 行发生更新时,收到的数据更改记录将仅包含已更改的列。跟踪但未更改的列将不会包含在记录中。mod 的主键可用于在数据更改记录的提交时间戳执行 Spanner 快照读取,以提取未更改的列,甚至检索完整行。

请注意,数据库保留政策可能需要更改为大于或等于变更数据流保留政策的值,才能成功进行快照读取。

另请注意,使用 NEW_ROW 值捕获类型是推荐的更高效的方法,因为此类型默认会返回行的所有跟踪列,并且不需要对 Spanner 进行额外的快照读取。

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

此转换将在收到的每条记录的提交时间戳执行过时数据读取,并将整行映射到 JSON。

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

此代码会创建一个 Spanner 数据库客户端来执行完整的行提取,并将会话池配置为仅包含少量会话,从而在 ToFullReowJsonFn 的一个实例中按顺序执行读取操作。Dataflow 会确保生成此函数的多个实例,每个实例都有自己的客户端池。

示例:Spanner 到 Pub/Sub

在此场景中,调用方会尽可能快地将记录流式传输到 Pub/Sub,而不会进行任何分组或聚合。这非常适合用于触发下游处理,例如将插入 Spanner 表中的所有新行以流式方式传输到 Pub/Sub 以进行进一步处理。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

请注意,Pub/Sub 接收器可以配置为确保正好一次语义。

示例:Spanner 到 Cloud Storage

在此场景中,调用方将给定窗口内的所有记录分组,并将该组保存到单独的 Cloud Storage 文件中。这非常适合用于分析和时间点归档,并且不受 Spanner 保留期限的限制。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

请注意,Cloud Storage 接收器默认提供“至少一次”语义。通过额外的处理,可以修改为具有“正好一次”语义。

我们还为此使用情形提供了一个 Dataflow 模板:请参阅将变更数据流连接到 Cloud Storage

示例:Spanner 到 BigQuery(账簿表)

在此示例中,调用方将变更记录流式传输到 BigQuery。每条数据变更记录都会在 BigQuery 中显示为一行。这非常适合用于分析。此代码使用前面提取完整行部分中定义的函数来检索记录的完整行,并将其写入 BigQuery。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

请注意,BigQuery 接收器默认提供“至少一次”语义。通过额外的处理,可以修改为具有“正好一次”语义。

我们还为此使用情形提供了一个 Dataflow 模板;请参阅将变更数据流连接到 BigQuery

监控流水线

有两类指标可用于监控变更数据流 Dataflow 流水线。

标准 Dataflow 指标

Dataflow 提供了多项指标来确保作业运行正常,例如数据新鲜度、系统延隔时间、作业吞吐量、工作器 CPU 利用率等。如需了解详情,请参阅对 Dataflow 流水线使用 Monitoring

对于变更数据流流水线,应考虑两个主要指标:系统延迟时间数据新鲜度

系统延迟时间是指当前某数据项已处理或等待处理的最长时长(以秒为单位)。

数据新鲜度会显示当前时间(实时)与输出水印之间的时间长度。时间为 T 的输出水印表示,所有事件时间(严格)早于 T 的元素都已处理完毕,可用于计算。换句话说,数据新鲜度用于衡量流水线在处理已接收的事件方面的最新程度。

如果流水线资源不足,您可以在这两个指标中看到相应的影响。系统延迟时间会增加,因为商品需要等待更长时间才能得到处理。数据新鲜度也会降低,因为流水线无法跟上接收到的数据量。

自定义变更数据流指标

这些指标在 Cloud Monitoring 中公开,包括:

  • 从记录在 Spanner 中提交到连接器将其发送到 PCollection 之间的分桶(直方图)延迟时间。此指标可用于查看流水线是否存在任何性能(延迟时间)问题。
  • 读取的数据记录总数。此指标用于总体指示连接器发出的记录数。此数字应不断增加,反映底层 Spanner 数据库中的写入趋势。
  • 正在读取的分区数。应始终有分区正在读取。如果此数字为零,则表示流水线中发生了错误。
  • 连接器执行期间发出的查询总数。此指标总体上指示了在流水线执行期间向 Spanner 实例发出的变更数据流查询。这可用于估算连接器对 Spanner 数据库的负载。

更新现有流水线

如果作业兼容性检查通过,则可以更新使用 SpannerIO 连接器处理变更数据流的正在运行的流水线。为此,您必须在更新新作业时明确设置其元数据表名称参数。使用您要更新的作业的 metadataTable 流水线选项的值。

如果您使用的是 Google 提供的 Dataflow 模板,请使用参数 spannerMetadataTableName 设置表名称。您还可以修改现有作业,以在连接器配置中使用方法 withMetadataTable(your-metadata-table-name) 明确使用元数据表。完成上述操作后,您可以按照 Dataflow 文档中启动替换作业中的说明更新正在运行的作业。

变更数据流和 Dataflow 的最佳实践

以下是使用 Dataflow 构建变更数据流连接的一些最佳实践。

使用单独的元数据数据库

我们建议您为 SpannerIO 连接器创建一个单独的数据库,用于存储元数据,而不是将其配置为使用应用数据库。

如需了解详情,请参阅考虑使用单独的元数据数据库

调整集群大小

对于 Spanner 变更数据流作业中的初始工作器数量,一个经验法则是每秒 1,000 次写入需要一个工作器。请注意,此估计值可能会因多种因素而异,例如每个事务的规模、单个事务产生的变更数据流记录数量,以及流水线中使用的其他转换、聚合或接收器。

在完成初始资源配置后,请务必跟踪监控流水线中提及的指标,以确保流水线运行正常。我们建议您先尝试设置初始工作器池大小,然后监控流水线处理负载的情况,并在必要时增加节点数量。CPU 利用率是检查负载是否正常以及是否需要更多节点的一项关键指标。

已知限制

将 Spanner 变更数据流与 Dataflow 搭配使用时存在一些已知限制:

自动扩缩

对于包含 SpannerIO.readChangeStream 的任何流水线,自动扩缩支持需要 Apache Beam 2.39.0 或更高版本。

如果您使用的 Apache Beam 版本早于 2.39.0,则包含 SpannerIO.readChangeStream 的流水线需要明确指定自动扩缩算法为 NONE,如横向自动扩缩中所述。

如需手动扩缩 Dataflow 流水线(而不是使用自动扩缩),请参阅手动扩缩流处理流水线

Runner V2

Spanner 变更数据流连接器需要 Dataflow Runner V2。必须在执行期间手动指定此值,否则系统会抛出错误。您可以通过使用 --experiments=use_unified_worker,use_runner_v2 配置作业来指定 Runner V2

快照

Spanner 变更数据流连接器不支持 Dataflow 快照

正在排空

Spanner 变更数据流连接器不支持排空作业。只能取消现有作业。

您还可以更新现有流水线,而无需停止该流水线。

OpenCensus

如需使用 OpenCensus 监控流水线,请指定版本 0.28.3 或更高版本。

NullPointerException 在流水线启动时

Apache Beam 版本 2.38.0 中的一个 bug 可能会导致在特定条件下启动流水线时出现 NullPointerException。这会阻止作业启动,并显示以下错误消息:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

如需解决此问题,请使用 Apache Beam 2.39.0 版或更高版本,或者手动将 beam-sdks-java-core 的版本指定为 2.37.0

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

更多信息