Daten mit Batch-Schreiben ändern

Auf dieser Seite werden Spanner-Batch-Schreibanfragen beschrieben und es wird erläutert, wie Sie damit Ihre Spanner-Daten ändern können.

Mit Spanner-Batch-Schreibvorgängen können Sie mehrere Zeilen in Ihre Spanner-Tabellen einfügen, aktualisieren oder löschen. Spanner-Batch-Schreibvorgänge unterstützen Schreibvorgänge mit niedriger Latenz ohne Lesevorgang und geben Antworten zurück, wenn Mutationen in Batches angewendet werden. Wenn Sie Batch-Schreibvorgänge verwenden möchten, gruppieren Sie zusammengehörige Mutationen. Alle Mutationen in einer Gruppe werden dann atomar ausgeführt. Die Mutationen in den Gruppen werden in einer nicht angegebenen Reihenfolge angewendet und sind voneinander unabhängig (nicht atomar). Spanner muss nicht warten, bis alle Mutationen angewendet wurden, bevor eine Antwort gesendet wird. Das bedeutet, dass bei Batch-Schreibvorgängen ein teilweiser Fehler möglich ist. Sie können auch mehrere Batch-Schreibvorgänge gleichzeitig ausführen. Weitere Informationen finden Sie unter Batch-Schreibvorgänge verwenden.

Anwendungsfälle

Das Batch-Schreiben in Spanner ist besonders nützlich, wenn Sie eine große Anzahl von Schreibvorgängen ohne Lesevorgang ausführen möchten, aber keine atomare Transaktion für alle Mutationen benötigen.

Wenn Sie Ihre DML-Anfragen in Batches zusammenfassen möchten, verwenden Sie Batch-DML, um Ihre Spanner-Daten zu ändern. Weitere Informationen zu den Unterschieden zwischen DML und Mutationen finden Sie unter DML und Mutationen vergleichen.

Für einzelne Mutationsanfragen empfehlen wir die Verwendung einer Lese-/Schreibtransaktion mit Sperrung.

Beschränkungen

Für Spanner-Batch-Schreibvorgänge gelten die folgenden Einschränkungen:

  • Der Spanner-Batch-Schreibvorgang ist nicht über dieGoogle Cloud -Konsole oder die Google Cloud CLI verfügbar. Sie ist nur über REST- und RPC-APIs und die Spanner-Clientbibliotheken verfügbar.

  • Wiederholungsschutz wird bei Batch-Schreibvorgängen nicht unterstützt. Es ist möglich, dass Mutationen mehr als einmal angewendet werden. Eine Mutation, die mehr als einmal angewendet wird, kann zu einem Fehler führen. Wenn beispielsweise eine Einfügemutation wiedergegeben wird, kann dies zu einem Fehler vom Typ „Bereits vorhanden“ führen. Wenn Sie generierte oder auf Commit-Zeitstempeln basierende Schlüssel in der Mutation verwenden, kann dies dazu führen, dass der Tabelle zusätzliche Zeilen hinzugefügt werden. Wir empfehlen, Ihre Schreibvorgänge so zu strukturieren, dass sie idempotent sind, um dieses Problem zu vermeiden.

  • Eine abgeschlossene Batchschreibanfrage kann nicht rückgängig gemacht werden. Sie können eine laufende Batchschreibanfrage abbrechen. Wenn Sie einen laufenden Batch-Schreibvorgang abbrechen, werden Mutationen in nicht abgeschlossenen Gruppen zurückgesetzt. Mutationen in abgeschlossenen Gruppen werden in der Datenbank gespeichert.

  • Die maximale Größe für eine Batch-Schreibanfrage entspricht dem Limit für eine Commit-Anfrage. Weitere Informationen finden Sie unter Limits beim Erstellen, Lesen, Aktualisieren und Löschen von Daten.

Batchschreibvorgänge verwenden

Wenn Sie Batch-Schreibvorgänge verwenden möchten, benötigen Sie die Berechtigung spanner.databases.write für die Datenbank, die Sie ändern möchten. Sie können Mutationen in einem einzelnen Aufruf nicht atomar im Batch schreiben, indem Sie einen REST- oder RPC API-Anfrageaufruf verwenden.

Die folgenden Mutationstypen sollten bei der Verwendung von Batch-Schreibvorgängen gruppiert werden:

  • Einfügen von Zeilen mit demselben Primärschlüsselpräfix in die übergeordnete und die untergeordnete Tabelle.
  • Einfügen von Zeilen in Tabellen mit einer Fremdschlüsselbeziehung zwischen den Tabellen.
  • Andere Arten von zugehörigen Mutationen, die von Ihrem Datenbankschema und Ihrer Anwendungslogik abhängen.

Sie können auch Batch-Schreibvorgänge mit den Spanner-Clientbibliotheken ausführen. Im folgenden Codebeispiel wird die Tabelle Singers mit neuen Zeilen aktualisiert.

Clientbibliotheken

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
	"google.golang.org/grpc/status"
)

// batchWrite demonstrates writing mutations to a Spanner database through
// BatchWrite API - https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite
func batchWrite(w io.Writer, db string) error {
	// db := "projects/my-project/instances/my-instance/databases/my-database"
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	// Database is assumed to exist - https://cloud.google.com/spanner/docs/getting-started/go#create_a_database
	singerColumns := []string{"SingerId", "FirstName", "LastName"}
	albumColumns := []string{"SingerId", "AlbumId", "AlbumTitle"}
	mutationGroups := make([]*spanner.MutationGroup, 2)

	mutationGroup1 := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{16, "Scarlet", "Terry"}),
	}
	mutationGroups[0] = &spanner.MutationGroup{Mutations: mutationGroup1}

	mutationGroup2 := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{17, "Marc", ""}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{18, "Catalina", "Smith"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{17, 1, "Total Junk"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{18, 2, "Go, Go, Go"}),
	}
	mutationGroups[1] = &spanner.MutationGroup{Mutations: mutationGroup2}
	iter := client.BatchWrite(ctx, mutationGroups)
	// See https://pkg.go.dev/cloud.google.com/go/spanner#BatchWriteResponseIterator.Do
	doFunc := func(response *sppb.BatchWriteResponse) error {
		if err = status.ErrorProto(response.GetStatus()); err == nil {
			fmt.Fprintf(w, "Mutation group indexes %v have been applied with commit timestamp %v",
				response.GetIndexes(), response.GetCommitTimestamp())
		} else {
			fmt.Fprintf(w, "Mutation group indexes %v could not be applied with error %v",
				response.GetIndexes(), err)
		}
		// Return an actual error as needed.
		return nil
	}
	return iter.Do(doFunc)
}

Knoten


// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Create Mutation Groups
/**
 * Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
 * A group must contain related mutations.
 * Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
 * for more details and examples.
 */
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
  SingerId: 1,
  FirstName: 'Scarlet',
  LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
  SingerId: 2,
  FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
  SingerId: 3,
  FirstName: 'Catalina',
  LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
  AlbumId: 1,
  SingerId: 2,
  AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
  AlbumId: 2,
  SingerId: 3,
  AlbumTitle: 'Go, Go, Go',
});

const options = {
  transactionTag: 'batch-write-tag',
};

try {
  database
    .batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
    .on('error', console.error)
    .on('data', response => {
      // Check the response code of each response to determine whether the mutation group(s) were applied successfully.
      if (response.status.code === 0) {
        console.log(
          `Mutation group indexes ${
            response.indexes
          }, have been applied with commit timestamp ${Spanner.timestamp(
            response.commitTimestamp,
          ).toJSON()}`,
        );
      }
      // Mutation groups that fail to commit trigger a response with a non-zero status code.
      else {
        console.log(
          `Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`,
        );
      }
    })
    .on('end', () => {
      console.log('Request completed successfully');
    });
} catch (err) {
  console.log(err);
}

Python

def batch_write(instance_id, database_id):
    """Inserts sample data into the given database via BatchWrite API.

    The database and table must already exist and can be created using
    `create_database`.
    """
    from google.rpc.code_pb2 import OK

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.mutation_groups() as groups:
        group1 = groups.group()
        group1.insert_or_update(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (16, "Scarlet", "Terry"),
            ],
        )

        group2 = groups.group()
        group2.insert_or_update(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (17, "Marc", ""),
                (18, "Catalina", "Smith"),
            ],
        )
        group2.insert_or_update(
            table="Albums",
            columns=("SingerId", "AlbumId", "AlbumTitle"),
            values=[
                (17, 1, "Total Junk"),
                (18, 2, "Go, Go, Go"),
            ],
        )

        for response in groups.batch_write():
            if response.status.code == OK:
                print(
                    "Mutation group indexes {} have been applied with commit timestamp {}".format(
                        response.indexes, response.commit_timestamp
                    )
                )
            else:
                print(
                    "Mutation group indexes {} could not be applied with error {}".format(
                        response.indexes, response.status
                    )
                )

C++

namespace spanner = ::google::cloud::spanner;
// Use upserts as mutation groups are not replay protected.
auto commit_results = client.CommitAtLeastOnce({
    // group #0
    spanner::Mutations{
        spanner::InsertOrUpdateMutationBuilder(
            "Singers", {"SingerId", "FirstName", "LastName"})
            .EmplaceRow(16, "Scarlet", "Terry")
            .Build(),
    },
    // group #1
    spanner::Mutations{
        spanner::InsertOrUpdateMutationBuilder(
            "Singers", {"SingerId", "FirstName", "LastName"})
            .EmplaceRow(17, "Marc", "")
            .EmplaceRow(18, "Catalina", "Smith")
            .Build(),
        spanner::InsertOrUpdateMutationBuilder(
            "Albums", {"SingerId", "AlbumId", "AlbumTitle"})
            .EmplaceRow(17, 1, "Total Junk")
            .EmplaceRow(18, 2, "Go, Go, Go")
            .Build(),
    },
});
for (auto& commit_result : commit_results) {
  if (!commit_result) throw std::move(commit_result).status();
  std::cout << "Mutation group indexes [";
  for (auto index : commit_result->indexes) std::cout << " " << index;
  std::cout << " ]: ";
  if (commit_result->commit_timestamp) {
    auto const& ts = *commit_result->commit_timestamp;
    std::cout << "Committed at " << ts.get<absl::Time>().value();
  } else {
    std::cout << commit_result->commit_timestamp.status();
  }
  std::cout << "\n";
}

Nächste Schritte