バッチ書き込みを使用してデータを変更する

このページでは、CSpanner のバッチ書き込みリクエストと、それを使用して Spanner データを変更する方法について説明します。

Spanner のバッチ書き込みを使用すると、Spanner テーブルで複数の行を挿入、更新、削除できます。Spanner のバッチ書き込みは、読み取りオペレーションのない低レイテンシの書き込みをサポートし、ミューテーションがバッチに適用されるときにレスポンスを返します。バッチ書き込みを使用するには、関連するミューテーションをグループ化します。グループ内のすべてのミューテーションはアトミックに commit されます。グループ間のミューテーションは不特定の順序で適用され、互いに独立しています(非アトミック)。Spanner では、すべてのミューテーションが適用されるまで待機してからレスポンスを送信する必要はありません。つまり、バッチ書き込みでは部分的な失敗が許容されます。複数のバッチ書き込みを一度に実行することもできます。詳細については、バッチ書き込みの使用方法をご覧ください。

ユースケース

Spanner のバッチ書き込みは、読み取りオペレーションを行わずに多数の書き込みを commit する必要がありますが、すべてのミューテーションでアトミック トランザクションを必要としない場合に便利です。

DML リクエストをバッチ処理する場合は、バッチ DML を使用して Spanner データを変更します。DML とミューテーションの違いの詳細については、DML とミューテーションの比較をご覧ください。

単一のミューテーション リクエストでは、ロック型読み取り / 書き込みトランザクションの使用をおすすめします。

制限事項

Spanner のバッチ書き込みには次の制限があります。

  • Spanner のバッチ書き込みは、Google Cloud コンソールまたは Google Cloud CLI では使用できません。REST API と RPC API と Spanner Java クライアント ライブラリでのみ使用できます。

  • バッチ書き込みを使用したリプレイ保護はサポートされていません。ミューテーションは複数回適用される可能性があり、複数回適用されたミューテーションは失敗する場合があります。たとえば、挿入ミューテーションがリプレイされると、すでに存在するというエラーが生成される場合や、ミューテーションで生成されたタイムスタンプベースのキーまたは commit タイムスタンプベースのキーを使用する場合にテーブルに行が追加されることがあります。この問題を回避するには、書き込みをべき等になるように構成することをおすすめします。

  • 完了したバッチ書き込みをロールバックすることはできません。進行中の一括書き込みリクエストをキャンセルできます。進行中のバッチ書き込みをキャンセルすると、完了していないグループ内のミューテーションはロールバックされます。完了したグループのミューテーションはデータベースに commit されます。

  • バッチ書き込みリクエストの最大サイズは、commit リクエストの上限と同じです。詳細については、データの作成、読み取り、更新、削除に関する上限をご覧ください。

バッチ書き込みの使用方法

バッチ書き込みを使用するには、変更するデータベースに対する spanner.databases.write 権限が必要です。REST または RPC API リクエスト呼び出しを使用すると、1 回の呼び出しでミューテーションを非アトミックでバッチ書き込みできます。

バッチ書き込みを使用する場合は、次のミューテーション タイプをグループ化する必要があります。

  • 親テーブルと子テーブルの両方で同じ主キーの接頭辞を持つ行を挿入する。
  • テーブル間に外部キー関係があるテーブルに行を挿入する。
  • 他のタイプの関連するミューテーションは、データベース スキーマとアプリケーション ロジックによって異なる。

Spanner Java クライアント ライブラリを使用してバッチ書き込みを行うこともできます。次のコード例では、新しい行で Singers テーブルを更新します。

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

次のステップ