バッチ書き込みを使用したデータの変更

このページでは、CSpanner のバッチ書き込みリクエストと、それを使用して Spanner データを変更する方法について説明します。

Spanner のバッチ書き込みを使用すると、Spanner テーブルで複数の行を挿入、更新、削除できます。Spanner のバッチ書き込みは読み取りオペレーションのない低レイテンシの書き込みをサポートし、ミューテーションがバッチに適用されるときにレスポンスを返します。バッチ書き込みを使用するには、関連するミューテーションをグループ化し、グループ内のすべてのミューテーションをアトミックに commit します。グループ間のミューテーションは不特定の順序で適用され、互いに独立しています(非アトミック)。Spanner では、レスポンスの送信前にすべてのミューテーションが適用されるのを待つ必要はありません。つまり、バッチ書き込みでは部分的な障害が発生する可能性があります。複数のバッチ書き込みを一度に実行することもできます。詳細については、バッチ書き込みの使用方法をご覧ください。

ユースケース

Spanner のバッチ書き込みは、読み取りオペレーションを行わずに多数の書き込みを commit する必要がありますが、すべてのミューテーションでアトミック トランザクションを必要としない場合に便利です。

DML リクエストをバッチ処理する場合は、バッチ DML を使用して Spanner データを変更します。DML とミューテーションの違いの詳細については、DML とミューテーションの比較をご覧ください。

単一のミューテーション リクエストでは、ロック型読み取り / 書き込みトランザクションの使用をおすすめします。

制限事項

Spanner のバッチ書き込みには次の制限があります。

  • Spanner のバッチ書き込みは、Google Cloud コンソールまたは Google Cloud CLI では使用できません。REST API と RPC API と Spanner Java クライアント ライブラリでのみ使用できます。

  • バッチ書き込みを使用したリプレイ保護はサポートされていません。ミューテーションは複数回適用される可能性があり、複数回適用されたミューテーションは失敗する場合があります。たとえば、挿入ミューテーションがリプレイされると、すでに存在するというエラーが生成される場合や、ミューテーションで生成されたタイムスタンプベースのキーまたは commit タイムスタンプベースのキーを使用する場合にテーブルに行が追加されることがあります。この問題を回避するには、書き込みをべき等に構造化することをおすすめします。

  • 完了したバッチ書き込みをロールバックすることはできません。実行中のバッチ書き込みリクエストをキャンセルできます。実行中のバッチ書き込みをキャンセルすると、未完了のグループのミューテーションがロールバックされます。完了したグループのミューテーションはデータベースに commit されます。

  • バッチ書き込みリクエストの最大サイズは、commit リクエストの上限と同じです。詳細については、データの作成、読み取り、更新、削除の制限をご覧ください。

バッチ書き込みの使用方法

バッチ書き込みを使用するには、変更するデータベースに対する spanner.databases.write 権限が必要です。REST または RPC API リクエスト呼び出しを使用すると、1 回の呼び出しでミューテーションを非アトミックでバッチ書き込みできます。

バッチ書き込みを使用する場合は、次のミューテーション タイプをグループ化する必要があります。

  • 親テーブルと子テーブルの両方で同じ主キーの接頭辞を持つ行を挿入する。
  • テーブル間に外部キー関係があるテーブルに行を挿入する。
  • 他のタイプの関連するミューテーションは、データベース スキーマとアプリケーション ロジックによって異なる。

Spanner Java クライアント ライブラリを使用してバッチ書き込みを行うこともできます。次のコード例では、Singers テーブルを新しい行で更新します。

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

次のステップ