使用批量写入修改数据

本页面介绍了 Spanner 批量写入请求以及如何使用 修改 Spanner 数据。

您可以使用 Spanner 批量写入在 Spanner 表中插入、更新或删除多行。Spanner 批量写入支持无需读取操作的低延迟写入, 因为批量应用变更。如需使用批量写入,您可以将相关更改分组在一起,并且一组中的所有更改都会以原子方式提交。跨组的更改以未指定顺序应用,并且 彼此独立(非原子化)。Spanner 不需要 等待所有变更应用完成后再发送响应,这意味着 批量写入允许部分失败。您还可以一次执行多个批量写入。如需了解详情,请参阅如何使用批量写入

使用场景

如果您想在不执行读取操作的情况下提交大量写入,但不需要对所有更改使用原子事务,Spanner 批量写入会特别有用。

如果您想批量处理 DML 请求,请使用批量 DML 修改 Spanner 数据。如需详细了解 请参阅比较 DML 和变更

对于单一变更请求,我们建议使用锁定读写 交易

限制

Spanner 批量写入具有以下限制:

  • Spanner 批量写入无法使用 Google Cloud 控制台或 Google Cloud CLI。它仅支持 REST 和 RPC API 和 Spanner Java 客户端库。

  • 不支持使用批量写入功能实现重放攻击防范有可能 并且多次应用的变更 都会导致测试失败例如,如果重放插入更改,可能会产生“已存在”错误;如果您在更改中使用基于生成的键或提交时间戳的键,则可能会导致向表中添加其他行。我们建议在设计写入操作时遵循幂等原则。 可以避免这个问题

  • 您无法回滚已完成的批量写入请求。您可以取消 进行中的批量写入请求。如果您取消正在进行的批量写入, 未完成组中的更改将回滚。已完成组中的变更 提交到数据库

  • 批量写入请求的大小上限与提交请求的上限相同。如需了解详情,请参阅 与创建、读取、更新和删除数据相关的限制

如何使用批量写入

如需使用批量写入,您必须对要修改的数据库拥有 spanner.databases.write 权限。您可以批量写入变更 使用 REST 在单个调用中以非原子方式执行 或 RPC API 请求调用。

使用批量写入时,您应将以下变更类型组合在一起:

  • 在父级和子级中插入具有相同主键前缀的行 表格。
  • 通过表之间的外键关系将行插入表中。
  • 其他类型的相关变更,具体取决于您的数据库架构和 应用逻辑

您还可以使用 Spanner Java 客户端库批量写入。 以下代码示例使用新行更新 Singers 表。

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

后续步骤