使用批量写入修改数据

本页面介绍了 Spanner 批量写入请求,以及如何使用这些请求来修改 Spanner 数据。

您可以使用 Spanner 批量写入功能,在 Spanner 表中插入、更新或删除多行。Spanner 批量写入支持无需读取操作的低延迟写入,并可在批量应用更改时返回响应。如需使用批量写入,您需要将相关的变更组合在一起,并且一组中的所有变更都将以原子方式提交。组之间的变更以未指定的顺序应用,并且彼此独立(非原子)。Spanner 不需要等到所有变更应用完毕后才能发送响应,这意味着批量写入允许部分失败。您也可以一次执行多个批量写入。如需了解详情,请参阅如何使用批量写入

使用场景

如果您要提交大量写入而不执行读取操作,但不需要对所有变更执行原子事务,则 Spanner 批量写入尤其有用。

如果要批量处理 DML 请求,请使用批量 DML 修改 Spanner 数据。如需详细了解 DML 和变更之间的差异,请参阅比较 DML 和变更

对于单项更改请求,我们建议使用锁定读写事务

限制

Spanner 批量写入具有以下限制:

  • 使用 Google Cloud 控制台或 Google Cloud CLI 时,无法使用 Spanner 批量写入功能。它仅支持使用 REST API 和 RPC API 以及 Spanner Java 客户端库。

  • 使用批量写入时,不支持重放攻击防范更改可以多次应用,并且多次应用的变更可能会导致失败。例如,如果重放插入变更,则可能会生成已存在的错误,或者如果您在变更中使用生成或提交基于时间戳的键,则可能会导致额外行被添加到表中。我们建议您在构建写入操作时遵循幂等原则,以免出现此问题。

  • 您无法回滚已完成的批量写入请求。您可以取消正在进行的批量写入请求。如果您取消正在进行的批量写入,则会回滚未完成的组中的变更。已完成的组中的变更会被提交到数据库。

  • 批量写入请求的大小上限与提交请求的大小上限相同。如需了解详情,请参阅创建、读取、更新和删除数据的限制

如何使用批量写入

如需使用批量写入,您必须对要修改的数据库具备 spanner.databases.write 权限。您可以使用 RESTRPC API 请求调用,在单次调用中以非原子方式批量写入变更。

使用批量写入时,您应将以下变更类型归为一组:

  • 在父表和子表中插入具有相同主键前缀的行。
  • 将行插入在表之间具有外键关系的表中。
  • 其他类型的相关变更,具体取决于您的数据库架构和应用逻辑。

您还可以使用 Spanner Java 客户端库批量写入。以下代码示例使用新行更新 Singers 表。

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

后续步骤