使用变更插入、更新和删除数据

本页面介绍如何使用变更插入、更新和删除数据。 变更表示插入、更新和删除等一系列操作,Spanner 会以原子方式将其应用于 Spanner 数据库中的不同行和表。

虽然您可以使用 gRPCREST 提交变更,但更常见的做法是通过客户端库访问 API。

本页面介绍插入、更新和删除的基本任务。如需查看更多示例,请参阅入门教程

如果您需要提交大量盲写,但不需要原子事务,您可以使用批量写入批量修改 Spanner 表。如需了解详情,请参阅使用批量写入修改数据

在表格中插入新行

C++

您可以使用 InsertMutationBuilder() 函数写入数据。Client::Commit() 会向表中添加新行。单个批处理中的所有插入均以原子方式应用。

此代码演示了如何写入数据:

void InsertData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto insert_singers = spanner::InsertMutationBuilder(
                            "Singers", {"SingerId", "FirstName", "LastName"})
                            .EmplaceRow(1, "Marc", "Richards")
                            .EmplaceRow(2, "Catalina", "Smith")
                            .EmplaceRow(3, "Alice", "Trentor")
                            .EmplaceRow(4, "Lea", "Martin")
                            .EmplaceRow(5, "David", "Lomond")
                            .Build();

  auto insert_albums = spanner::InsertMutationBuilder(
                           "Albums", {"SingerId", "AlbumId", "AlbumTitle"})
                           .EmplaceRow(1, 1, "Total Junk")
                           .EmplaceRow(1, 2, "Go, Go, Go")
                           .EmplaceRow(2, 1, "Green")
                           .EmplaceRow(2, 2, "Forever Hold Your Peace")
                           .EmplaceRow(2, 3, "Terrified")
                           .Build();

  auto commit_result =
      client.Commit(spanner::Mutations{insert_singers, insert_albums});
  if (!commit_result) throw std::move(commit_result).status();
  std::cout << "Insert was successful [spanner_insert_data]\n";
}

C#

您可以使用 connection.CreateInsertCommand() 方法插入数据,此方法会创建一个新的 SpannerCommand 以将行插入表中。SpannerCommand.ExecuteNonQueryAsync() 方法将向表中添加新行。

此代码演示了如何插入数据:


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

public class InsertDataAsyncSample
{
    public class Singer
    {
        public int SingerId { get; set; }
        public string FirstName { get; set; }
        public string LastName { get; set; }
    }

    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task InsertDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        List<Singer> singers = new List<Singer>
        {
            new Singer { SingerId = 1, FirstName = "Marc", LastName = "Richards" },
            new Singer { SingerId = 2, FirstName = "Catalina", LastName = "Smith" },
            new Singer { SingerId = 3, FirstName = "Alice", LastName = "Trentor" },
            new Singer { SingerId = 4, FirstName = "Lea", LastName = "Martin" },
            new Singer { SingerId = 5, FirstName = "David", LastName = "Lomond" },
        };
        List<Album> albums = new List<Album>
        {
            new Album { SingerId = 1, AlbumId = 1, AlbumTitle = "Total Junk" },
            new Album { SingerId = 1, AlbumId = 2, AlbumTitle = "Go, Go, Go" },
            new Album { SingerId = 2, AlbumId = 1, AlbumTitle = "Green" },
            new Album { SingerId = 2, AlbumId = 2, AlbumTitle = "Forever Hold your Peace" },
            new Album { SingerId = 2, AlbumId = 3, AlbumTitle = "Terrified" },
        };

        // Create connection to Cloud Spanner.
        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        await connection.RunWithRetriableTransactionAsync(async transaction =>
        {
            await Task.WhenAll(singers.Select(singer =>
            {
                // Insert rows into the Singers table.
                using var cmd = connection.CreateInsertCommand("Singers", new SpannerParameterCollection
                {
                        { "SingerId", SpannerDbType.Int64, singer.SingerId },
                        { "FirstName", SpannerDbType.String, singer.FirstName },
                        { "LastName", SpannerDbType.String, singer.LastName }
                });
                cmd.Transaction = transaction;
                return cmd.ExecuteNonQueryAsync();
            }));

            await Task.WhenAll(albums.Select(album =>
            {
                // Insert rows into the Albums table.
                using var cmd = connection.CreateInsertCommand("Albums", new SpannerParameterCollection
                {
                        { "SingerId", SpannerDbType.Int64, album.SingerId },
                        { "AlbumId", SpannerDbType.Int64, album.AlbumId },
                        { "AlbumTitle", SpannerDbType.String,album.AlbumTitle }
                });
                cmd.Transaction = transaction;
                return cmd.ExecuteNonQueryAsync();
            }));
        });
        Console.WriteLine("Data inserted.");
    }
}

Go

您可以使用 Mutation 来写入数据。Mutation 是容纳变更操作的容器。Mutation 表示插入、更新、删除等一系列操作,这些操作可以原子方式应用于 Spanner 数据库中的不同行和表。

使用 Mutation.InsertOrUpdate() 构建 INSERT_OR_UPDATE 变更,该变更会添加新行或更新列值(如果该行已经存在)。或者,使用 Mutation.Insert() 方法构造一项 INSERT 变更,该变更会添加新行。

Client.Apply() 以不可分割的方式将变更应用于数据库。

此代码演示了如何写入数据:


import (
	"context"
	"io"

	"cloud.google.com/go/spanner"
)

func write(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	singerColumns := []string{"SingerId", "FirstName", "LastName"}
	albumColumns := []string{"SingerId", "AlbumId", "AlbumTitle"}
	m := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{2, "Catalina", "Smith"}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{3, "Alice", "Trentor"}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{4, "Lea", "Martin"}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{5, "David", "Lomond"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{1, 1, "Total Junk"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{1, 2, "Go, Go, Go"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{2, 1, "Green"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{2, 2, "Forever Hold Your Peace"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{2, 3, "Terrified"}),
	}
	_, err = client.Apply(ctx, m)
	return err
}

Java

您可以使用 Mutation 对象来写入数据。Mutation 对象是容纳变更操作的容器。Mutation 表示插入、更新和删除等一系列操作,Spanner 能够以原子方式应用于 Spanner 数据库中的不同行和表。

Mutation 类中的 newInsertBuilder() 方法可构造一项 INSERT 变更,该变更会在表中插入一个新行。如果该行已经存在,则写入失败。或者,您也可以使用 newInsertOrUpdateBuilder 方法构建 INSERT_OR_UPDATE 变更,该变更会在该行已经存在时更新列值。

DatabaseClient 类中的 write() 方法可写入变更。单个批处理中的所有变更均以原子方式应用。

此代码演示了如何写入数据:

static final List<Singer> SINGERS =
    Arrays.asList(
        new Singer(1, "Marc", "Richards"),
        new Singer(2, "Catalina", "Smith"),
        new Singer(3, "Alice", "Trentor"),
        new Singer(4, "Lea", "Martin"),
        new Singer(5, "David", "Lomond"));

static final List<Album> ALBUMS =
    Arrays.asList(
        new Album(1, 1, "Total Junk"),
        new Album(1, 2, "Go, Go, Go"),
        new Album(2, 1, "Green"),
        new Album(2, 2, "Forever Hold Your Peace"),
        new Album(2, 3, "Terrified"));
static void writeExampleData(DatabaseClient dbClient) {
  List<Mutation> mutations = new ArrayList<>();
  for (Singer singer : SINGERS) {
    mutations.add(
        Mutation.newInsertBuilder("Singers")
            .set("SingerId")
            .to(singer.singerId)
            .set("FirstName")
            .to(singer.firstName)
            .set("LastName")
            .to(singer.lastName)
            .build());
  }
  for (Album album : ALBUMS) {
    mutations.add(
        Mutation.newInsertBuilder("Albums")
            .set("SingerId")
            .to(album.singerId)
            .set("AlbumId")
            .to(album.albumId)
            .set("AlbumTitle")
            .to(album.albumTitle)
            .build());
  }
  dbClient.write(mutations);
}

Node.js

您可以使用 Table 对象来写入数据。Table.insert() 方法将向表中添加新行。单个批处理中的所有插入均以原子方式应用。

此代码演示了如何写入数据:

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Instantiate Spanner table objects
const singersTable = database.table('Singers');
const albumsTable = database.table('Albums');

// Inserts rows into the Singers table
// Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so
// they must be converted to strings before being inserted as INT64s
try {
  await singersTable.insert([
    {SingerId: '1', FirstName: 'Marc', LastName: 'Richards'},
    {SingerId: '2', FirstName: 'Catalina', LastName: 'Smith'},
    {SingerId: '3', FirstName: 'Alice', LastName: 'Trentor'},
    {SingerId: '4', FirstName: 'Lea', LastName: 'Martin'},
    {SingerId: '5', FirstName: 'David', LastName: 'Lomond'},
  ]);

  await albumsTable.insert([
    {SingerId: '1', AlbumId: '1', AlbumTitle: 'Total Junk'},
    {SingerId: '1', AlbumId: '2', AlbumTitle: 'Go, Go, Go'},
    {SingerId: '2', AlbumId: '1', AlbumTitle: 'Green'},
    {SingerId: '2', AlbumId: '2', AlbumTitle: 'Forever Hold your Peace'},
    {SingerId: '2', AlbumId: '3', AlbumTitle: 'Terrified'},
  ]);

  console.log('Inserted data.');
} catch (err) {
  console.error('ERROR:', err);
} finally {
  await database.close();
}

PHP

您可以使用 Database::insertBatch 方法来写入数据。insertBatch 会向表中添加新行。单个批处理中的所有插入均以原子方式应用。

此代码演示了如何写入数据:

use Google\Cloud\Spanner\SpannerClient;

/**
 * Inserts sample data into the given database.
 *
 * The database and table must already exist and can be created using
 * `create_database`.
 * Example:
 * ```
 * insert_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function insert_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $operation = $database->transaction(['singleUse' => true])
        ->insertBatch('Singers', [
            ['SingerId' => 1, 'FirstName' => 'Marc', 'LastName' => 'Richards'],
            ['SingerId' => 2, 'FirstName' => 'Catalina', 'LastName' => 'Smith'],
            ['SingerId' => 3, 'FirstName' => 'Alice', 'LastName' => 'Trentor'],
            ['SingerId' => 4, 'FirstName' => 'Lea', 'LastName' => 'Martin'],
            ['SingerId' => 5, 'FirstName' => 'David', 'LastName' => 'Lomond'],
        ])
        ->insertBatch('Albums', [
            ['SingerId' => 1, 'AlbumId' => 1, 'AlbumTitle' => 'Total Junk'],
            ['SingerId' => 1, 'AlbumId' => 2, 'AlbumTitle' => 'Go, Go, Go'],
            ['SingerId' => 2, 'AlbumId' => 1, 'AlbumTitle' => 'Green'],
            ['SingerId' => 2, 'AlbumId' => 2, 'AlbumTitle' => 'Forever Hold Your Peace'],
            ['SingerId' => 2, 'AlbumId' => 3, 'AlbumTitle' => 'Terrified']
        ])
        ->commit();

    print('Inserted data.' . PHP_EOL);
}

Python

您可以使用 Batch 对象来写入数据。Batch 对象是容纳变更操作的容器。变更表示插入、更新、删除等一系列操作,这些操作可以原子方式应用于 Spanner 数据库中的不同行和表。

Batch 类中的 insert() 方法用于向批处理添加一个或多个插入变更操作。单个批处理中的所有变更均以原子方式应用。

此代码演示了如何写入数据:

def insert_data(instance_id, database_id):
    """Inserts sample data into the given database.

    The database and table must already exist and can be created using
    `create_database`.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.batch() as batch:
        batch.insert(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (1, "Marc", "Richards"),
                (2, "Catalina", "Smith"),
                (3, "Alice", "Trentor"),
                (4, "Lea", "Martin"),
                (5, "David", "Lomond"),
            ],
        )

        batch.insert(
            table="Albums",
            columns=("SingerId", "AlbumId", "AlbumTitle"),
            values=[
                (1, 1, "Total Junk"),
                (1, 2, "Go, Go, Go"),
                (2, 1, "Green"),
                (2, 2, "Forever Hold Your Peace"),
                (2, 3, "Terrified"),
            ],
        )

    print("Inserted data.")

Ruby

您可以使用 Client 对象来写入数据。Client#commit 方法跨数据库中的列、行和表,为在单个逻辑时间点以原子方式执行的写入创建并提交事务。

此代码演示了如何写入数据:

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.commit do |c|
  c.insert "Singers", [
    { SingerId: 1, FirstName: "Marc",     LastName: "Richards" },
    { SingerId: 2, FirstName: "Catalina", LastName: "Smith"    },
    { SingerId: 3, FirstName: "Alice",    LastName: "Trentor"  },
    { SingerId: 4, FirstName: "Lea",      LastName: "Martin"   },
    { SingerId: 5, FirstName: "David",    LastName: "Lomond"   }
  ]
  c.insert "Albums", [
    { SingerId: 1, AlbumId: 1, AlbumTitle: "Total Junk" },
    { SingerId: 1, AlbumId: 2, AlbumTitle: "Go, Go, Go" },
    { SingerId: 2, AlbumId: 1, AlbumTitle: "Green" },
    { SingerId: 2, AlbumId: 2, AlbumTitle: "Forever Hold Your Peace" },
    { SingerId: 2, AlbumId: 3, AlbumTitle: "Terrified" }
  ]
end

puts "Inserted data"

更新表中的行

假设 Albums(1, 1) 的销售额低于预期。因此,您需要从 Albums(2, 2) 的营销预算中划拨 200000 美元到 Albums(1, 1),但前提是 Albums(2, 2) 的预算资金充裕。

由于您需要通过读取表中的数据来确定是否要写入新值,因此您应使用读写事务来自动执行读写操作。

C++

使用 Transaction() 函数为客户端运行事务。

以下是运行事务的代码:

void ReadWriteTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  using ::google::cloud::StatusOr;

  // A helper to read a single album MarketingBudget.
  auto get_current_budget =
      [](spanner::Client client, spanner::Transaction txn,
         std::int64_t singer_id,
         std::int64_t album_id) -> StatusOr<std::int64_t> {
    auto key = spanner::KeySet().AddKey(spanner::MakeKey(singer_id, album_id));
    auto rows = client.Read(std::move(txn), "Albums", std::move(key),
                            {"MarketingBudget"});
    using RowType = std::tuple<std::int64_t>;
    auto row = spanner::GetSingularRow(spanner::StreamOf<RowType>(rows));
    if (!row) return std::move(row).status();
    return std::get<0>(*std::move(row));
  };

  auto commit = client.Commit(
      [&client, &get_current_budget](
          spanner::Transaction const& txn) -> StatusOr<spanner::Mutations> {
        auto b1 = get_current_budget(client, txn, 1, 1);
        if (!b1) return std::move(b1).status();
        auto b2 = get_current_budget(client, txn, 2, 2);
        if (!b2) return std::move(b2).status();
        std::int64_t transfer_amount = 200000;

        return spanner::Mutations{
            spanner::UpdateMutationBuilder(
                "Albums", {"SingerId", "AlbumId", "MarketingBudget"})
                .EmplaceRow(1, 1, *b1 + transfer_amount)
                .EmplaceRow(2, 2, *b2 - transfer_amount)
                .Build()};
      });

  if (!commit) throw std::move(commit).status();
  std::cout << "Transfer was successful [spanner_read_write_transaction]\n";
}

C#

对于 .NET Standard 2.0(或 .NET 4.5)及更高版本,可以使用 .NET framework 的 TransactionScope() 来运行事务。对于所有受支持的 .NET 版本,您可以通过将 SpannerConnection.BeginTransactionAsync 的结果设置为 SpannerCommandTransaction 属性来创建事务。

下面是运行事务的两种方式:

.NET Standard 2.0


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;
using System.Transactions;

public class ReadWriteWithTransactionAsyncSample
{
    public async Task<int> ReadWriteWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        // This sample transfers 200,000 from the MarketingBudget
        // field of the second Album to the first Album. Make sure to run
        // the Add Column and Write Data To New Column samples first,
        // in that order.

        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        decimal transferAmount = 200000;
        decimal secondBudget = 0;
        decimal firstBudget = 0;

        using var connection = new SpannerConnection(connectionString);
        using var cmdLookup1 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 2 AND AlbumId = 2");

        using (var reader = await cmdLookup1.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                // Read the second album's budget.
                secondBudget = reader.GetFieldValue<decimal>("MarketingBudget");
                // Confirm second Album's budget is sufficient and
                // if not raise an exception. Raising an exception
                // will automatically roll back the transaction.
                if (secondBudget < transferAmount)
                {
                    throw new Exception($"The second album's budget {secondBudget} is less than the amount to transfer.");
                }
            }
        }

        // Read the first album's budget.
        using var cmdLookup2 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 1 and AlbumId = 1");
        using (var reader = await cmdLookup2.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                firstBudget = reader.GetFieldValue<decimal>("MarketingBudget");
            }
        }

        // Specify update command parameters.
        using var cmdUpdate = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
        {
            { "SingerId", SpannerDbType.Int64 },
            { "AlbumId", SpannerDbType.Int64 },
            { "MarketingBudget", SpannerDbType.Int64 },
        });

        // Update second album to remove the transfer amount.
        secondBudget -= transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 2;
        cmdUpdate.Parameters["AlbumId"].Value = 2;
        cmdUpdate.Parameters["MarketingBudget"].Value = secondBudget;
        var rowCount = await cmdUpdate.ExecuteNonQueryAsync();

        // Update first album to add the transfer amount.
        firstBudget += transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 1;
        cmdUpdate.Parameters["AlbumId"].Value = 1;
        cmdUpdate.Parameters["MarketingBudget"].Value = firstBudget;
        rowCount += await cmdUpdate.ExecuteNonQueryAsync();
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return rowCount;
    }
}

.NET Standard 1.5


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class ReadWriteWithTransactionCoreAsyncSample
{
    public async Task<int> ReadWriteWithTransactionCoreAsync(string projectId, string instanceId, string databaseId)
    {
        // This sample transfers 200,000 from the MarketingBudget
        // field of the second Album to the first Album. Make sure to run
        // the Add Column and Write Data To New Column samples first,
        // in that order.
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        decimal transferAmount = 200000;
        decimal secondBudget = 0;
        decimal firstBudget = 0;

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var transaction = await connection.BeginTransactionAsync();

        using var cmdLookup1 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 2 AND AlbumId = 2");
        cmdLookup1.Transaction = transaction;

        using (var reader = await cmdLookup1.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                // Read the second album's budget.
                secondBudget = reader.GetFieldValue<decimal>("MarketingBudget");
                // Confirm second Album's budget is sufficient and
                // if not raise an exception. Raising an exception
                // will automatically roll back the transaction.
                if (secondBudget < transferAmount)
                {
                    throw new Exception($"The second album's budget {secondBudget} contains less than the amount to transfer.");
                }
            }
        }
        // Read the first album's budget.
        using var cmdLookup2 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 1 and AlbumId = 1");
        cmdLookup2.Transaction = transaction;
        using (var reader = await cmdLookup2.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                firstBudget = reader.GetFieldValue<decimal>("MarketingBudget");
            }
        }

        // Specify update command parameters.
        using var cmdUpdate = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
        {
            { "SingerId", SpannerDbType.Int64 },
            { "AlbumId", SpannerDbType.Int64 },
            { "MarketingBudget", SpannerDbType.Int64 },
        });
        cmdUpdate.Transaction = transaction;

        // Update second album to remove the transfer amount.
        secondBudget -= transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 2;
        cmdUpdate.Parameters["AlbumId"].Value = 2;
        cmdUpdate.Parameters["MarketingBudget"].Value = secondBudget;
        var rowCount = await cmdUpdate.ExecuteNonQueryAsync();

        // Update first album to add the transfer amount.
        firstBudget += transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 1;
        cmdUpdate.Parameters["AlbumId"].Value = 1;
        cmdUpdate.Parameters["MarketingBudget"].Value = firstBudget;
        rowCount += await cmdUpdate.ExecuteNonQueryAsync();

        await transaction.CommitAsync();
        Console.WriteLine("Transaction complete.");
        return rowCount;
    }
}

Go

使用 ReadWriteTransaction 类型在读写事务的上下文中执行操作。Client.ReadWriteTransaction() 会返回一个 ReadWriteTransaction 对象。

以下示例使用 ReadWriteTransaction.ReadRow() 检索一行数据。

此示例还使用了 ReadWriteTransaction.BufferWrite(),该方法将一组变更添加到在事务提交时将会应用的更新集。

该示例还使用 Key 类型,该类型代表 Spanner 表或索引中的行键。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func writeWithTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		getBudget := func(key spanner.Key) (int64, error) {
			row, err := txn.ReadRow(ctx, "Albums", key, []string{"MarketingBudget"})
			if err != nil {
				return 0, err
			}
			var budget int64
			if err := row.Column(0, &budget); err != nil {
				return 0, err
			}
			return budget, nil
		}
		album2Budget, err := getBudget(spanner.Key{2, 2})
		if err != nil {
			return err
		}
		const transferAmt = 200000
		if album2Budget >= transferAmt {
			album1Budget, err := getBudget(spanner.Key{1, 1})
			if err != nil {
				return err
			}
			album1Budget += transferAmt
			album2Budget -= transferAmt
			cols := []string{"SingerId", "AlbumId", "MarketingBudget"}
			txn.BufferWrite([]*spanner.Mutation{
				spanner.Update("Albums", cols, []interface{}{1, 1, album1Budget}),
				spanner.Update("Albums", cols, []interface{}{2, 2, album2Budget}),
			})
			fmt.Fprintf(w, "Moved %d from Album2's MarketingBudget to Album1's.", transferAmt)
		}
		return nil
	})
	return err
}

Java

使用 TransactionRunner 接口在读写事务的上下文中执行操作。该接口包含的 run() 方法可用于执行读写事务,并根据需要进行重试。DatabaseClient 类的 readWriteTransaction 方法返回一个 TransactionRunner 对象,用于执行单个逻辑事务。

TransactionRunner.TransactionCallable 类包含一个 run() 方法,用于执行一个事务的单次尝试。run() 采用一个 TransactionContext 对象,该对象是事务的上下文。

以下示例使用 Struct 类,此类对于存储 readRow() 调用的结果十分方便。该示例还使用 Key 类,该类代表 Spanner 表或索引中的行键。

以下是运行事务的代码:

static void writeWithTransaction(DatabaseClient dbClient) {
  dbClient
      .readWriteTransaction()
      .run(transaction -> {
        // Transfer marketing budget from one album to another. We do it in a transaction to
        // ensure that the transfer is atomic.
        Struct row =
            transaction.readRow("Albums", Key.of(2, 2), Arrays.asList("MarketingBudget"));
        long album2Budget = row.getLong(0);
        // Transaction will only be committed if this condition still holds at the time of
        // commit. Otherwise it will be aborted and the callable will be rerun by the
        // client library.
        long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget =
              transaction
                  .readRow("Albums", Key.of(1, 1), Arrays.asList("MarketingBudget"))
                  .getLong(0);
          album1Budget += transfer;
          album2Budget -= transfer;
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(1)
                  .set("AlbumId")
                  .to(1)
                  .set("MarketingBudget")
                  .to(album1Budget)
                  .build());
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(2)
                  .set("AlbumId")
                  .to(2)
                  .set("MarketingBudget")
                  .to(album2Budget)
                  .build());
        }
        return null;
      });
}

Node.js

使用 Database.runTransaction() 运行事务。

以下是运行事务的代码:

// This sample transfers 200,000 from the MarketingBudget field
// of the second Album to the first Album, as long as the second
// Album has enough money in its budget. Make sure to run the
// addColumn and updateData samples first (in that order).

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const transferAmount = 200000;

database.runTransaction(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  let firstBudget, secondBudget;
  const queryOne = {
    columns: ['MarketingBudget'],
    keys: [[2, 2]], // SingerId: 2, AlbumId: 2
  };

  const queryTwo = {
    columns: ['MarketingBudget'],
    keys: [[1, 1]], // SingerId: 1, AlbumId: 1
  };

  Promise.all([
    // Reads the second album's budget
    transaction.read('Albums', queryOne).then(results => {
      // Gets second album's budget
      const rows = results[0].map(row => row.toJSON());
      secondBudget = rows[0].MarketingBudget;
      console.log(`The second album's marketing budget: ${secondBudget}`);

      // Makes sure the second album's budget is large enough
      if (secondBudget < transferAmount) {
        throw new Error(
          `The second album's budget (${secondBudget}) is less than the transfer amount (${transferAmount}).`
        );
      }
    }),

    // Reads the first album's budget
    transaction.read('Albums', queryTwo).then(results => {
      // Gets first album's budget
      const rows = results[0].map(row => row.toJSON());
      firstBudget = rows[0].MarketingBudget;
      console.log(`The first album's marketing budget: ${firstBudget}`);
    }),
  ])
    .then(() => {
      console.log(firstBudget, secondBudget);
      // Transfers the budgets between the albums
      firstBudget += transferAmount;
      secondBudget -= transferAmount;

      console.log(firstBudget, secondBudget);

      // Updates the database
      // Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so they
      // must be converted (back) to strings before being inserted as INT64s.
      transaction.update('Albums', [
        {
          SingerId: '1',
          AlbumId: '1',
          MarketingBudget: firstBudget.toString(),
        },
        {
          SingerId: '2',
          AlbumId: '2',
          MarketingBudget: secondBudget.toString(),
        },
      ]);
    })
    .then(() => {
      // Commits the transaction and send the changes to the database
      return transaction.commit();
    })
    .then(() => {
      console.log(
        `Successfully executed read-write transaction to transfer ${transferAmount} from Album 2 to Album 1.`
      );
    })
    .catch(err => {
      console.error('ERROR:', err);
    })
    .then(() => {
      transaction.end();
      // Closes the database when finished
      return database.close();
    });
});

PHP

使用 Database::runTransaction 运行事务。

以下是运行事务的代码:

use Google\Cloud\Spanner\SpannerClient;
use Google\Cloud\Spanner\Transaction;
use UnexpectedValueException;

/**
 * Performs a read-write transaction to update two sample records in the
 * database.
 *
 * This will transfer 200,000 from the `MarketingBudget` field for the second
 * Album to the first Album. If the `MarketingBudget` for the second Album is
 * too low, it will raise an exception.
 *
 * Before running this sample, you will need to run the `update_data` sample
 * to populate the fields.
 * Example:
 * ```
 * read_write_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_write_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $database->runTransaction(function (Transaction $t) use ($spanner) {
        $transferAmount = 200000;

        // Read the second album's budget.
        $secondAlbumKey = [2, 2];
        $secondAlbumKeySet = $spanner->keySet(['keys' => [$secondAlbumKey]]);
        $secondAlbumResult = $t->read(
            'Albums',
            $secondAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        $firstRow = $secondAlbumResult->rows()->current();
        $secondAlbumBudget = $firstRow['MarketingBudget'];
        if ($secondAlbumBudget < $transferAmount) {
            // Throwing an exception will automatically roll back the transaction.
            throw new UnexpectedValueException(
                'The second album\'s budget is lower than the transfer amount: ' . $transferAmount
            );
        }

        $firstAlbumKey = [1, 1];
        $firstAlbumKeySet = $spanner->keySet(['keys' => [$firstAlbumKey]]);
        $firstAlbumResult = $t->read(
            'Albums',
            $firstAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        // Read the first album's budget.
        $firstRow = $firstAlbumResult->rows()->current();
        $firstAlbumBudget = $firstRow['MarketingBudget'];

        // Update the budgets.
        $secondAlbumBudget -= $transferAmount;
        $firstAlbumBudget += $transferAmount;
        printf('Setting first album\'s budget to %s and the second album\'s ' .
            'budget to %s.' . PHP_EOL, $firstAlbumBudget, $secondAlbumBudget);

        // Update the rows.
        $t->updateBatch('Albums', [
            ['SingerId' => 1, 'AlbumId' => 1, 'MarketingBudget' => $firstAlbumBudget],
            ['SingerId' => 2, 'AlbumId' => 2, 'MarketingBudget' => $secondAlbumBudget],
        ]);

        // Commit the transaction!
        $t->commit();

        print('Transaction complete.' . PHP_EOL);
    });
}

Python

使用 Database 类的 run_in_transaction() 方法来运行事务。

以下是运行事务的代码:

def read_write_transaction(instance_id, database_id):
    """Performs a read-write transaction to update two sample records in the
    database.

    This will transfer 200,000 from the `MarketingBudget` field for the second
    Album to the first Album. If the `MarketingBudget` is too low, it will
    raise an exception.

    Before running this sample, you will need to run the `update_data` sample
    to populate the fields.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    def update_albums(transaction):
        # Read the second album budget.
        second_album_keyset = spanner.KeySet(keys=[(2, 2)])
        second_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=second_album_keyset,
            limit=1,
        )
        second_album_row = list(second_album_result)[0]
        second_album_budget = second_album_row[0]

        transfer_amount = 200000

        if second_album_budget < transfer_amount:
            # Raising an exception will automatically roll back the
            # transaction.
            raise ValueError("The second album doesn't have enough funds to transfer")

        # Read the first album's budget.
        first_album_keyset = spanner.KeySet(keys=[(1, 1)])
        first_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=first_album_keyset,
            limit=1,
        )
        first_album_row = list(first_album_result)[0]
        first_album_budget = first_album_row[0]

        # Update the budgets.
        second_album_budget -= transfer_amount
        first_album_budget += transfer_amount
        print(
            "Setting first album's budget to {} and the second album's "
            "budget to {}.".format(first_album_budget, second_album_budget)
        )

        # Update the rows.
        transaction.update(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            values=[(1, 1, first_album_budget), (2, 2, second_album_budget)],
        )

    database.run_in_transaction(update_albums)

    print("Transaction complete.")

Ruby

使用 Client 类的 transaction 方法来运行事务。

以下是运行事务的代码:

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner         = Google::Cloud::Spanner.new project: project_id
client          = spanner.client instance_id, database_id
transfer_amount = 200_000

client.transaction do |transaction|
  first_album  = transaction.read("Albums", [:MarketingBudget], keys: [[1, 1]]).rows.first
  second_album = transaction.read("Albums", [:MarketingBudget], keys: [[2, 2]]).rows.first

  raise "The second album does not have enough funds to transfer" if second_album[:MarketingBudget] < transfer_amount

  new_first_album_budget  = first_album[:MarketingBudget] + transfer_amount
  new_second_album_budget = second_album[:MarketingBudget] - transfer_amount

  transaction.update "Albums", [
    { SingerId: 1, AlbumId: 1, MarketingBudget: new_first_album_budget  },
    { SingerId: 2, AlbumId: 2, MarketingBudget: new_second_album_budget }
  ]
end

puts "Transaction complete"

删除表中的行

每个客户端库都提供了多种方式来删除行:

  • 删除表中的所有行。
  • 通过指定行的键列值来删除单个行。
  • 通过创建一个键范围来删除一组行。
  • 如果交错表在其架构定义中包含 ON DELETE CASCADE,则可以通过删除父行来删除交错表中的行。

C++

使用 DeleteMutationBuilder() 函数为客户端删除行。

此代码演示了如何删除数据:

void DeleteData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  // Delete the albums with key (2,1) and (2,3).
  auto delete_albums = spanner::DeleteMutationBuilder(
                           "Albums", spanner::KeySet()
                                         .AddKey(spanner::MakeKey(2, 1))
                                         .AddKey(spanner::MakeKey(2, 3)))
                           .Build();

  // Delete some singers using the keys in the range [3, 5]
  auto delete_singers_range =
      spanner::DeleteMutationBuilder(
          "Singers", spanner::KeySet().AddRange(spanner::MakeKeyBoundClosed(3),
                                                spanner::MakeKeyBoundOpen(5)))
          .Build();

  // Deletes remaining rows from the Singers table and the Albums table, because
  // the Albums table is defined with ON DELETE CASCADE.
  auto delete_singers_all =
      spanner::MakeDeleteMutation("Singers", spanner::KeySet::All());

  auto commit_result = client.Commit(spanner::Mutations{
      delete_albums, delete_singers_range, delete_singers_all});
  if (!commit_result) throw std::move(commit_result).status();
  std::cout << "Delete was successful [spanner_delete_data]\n";
}

C#

使用 connection.CreateDeleteCommand() 方法删除行,此方法会创建一个新的 SpannerCommand 以删除行。SpannerCommand.ExecuteNonQueryAsync() 方法从表中删除行。

以下示例逐一删除 Singers 表中的行。Albums 表中的行将被删除,原因是 Albums 表在 Singers 表中交错且定义为 ON DELETE CASCADE


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

public class DeleteDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<int> DeleteDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>
        {
            new Album { SingerId = 2, AlbumId = 1, AlbumTitle = "Green" },
            new Album { SingerId = 2, AlbumId = 3, AlbumTitle = "Terrified" },
        };

        int rowCount = 0;
        using (var connection = new SpannerConnection(connectionString))
        {
            await connection.OpenAsync();

            // Delete individual rows from the Albums table.
            await Task.WhenAll(albums.Select(async album =>
            {
                var cmd = connection.CreateDeleteCommand("Albums", new SpannerParameterCollection
                {
                    { "SingerId", SpannerDbType.Int64, album.SingerId },
                    { "AlbumId", SpannerDbType.Int64, album.AlbumId }
                });
                rowCount += await cmd.ExecuteNonQueryAsync();
            }));
            Console.WriteLine("Deleted individual rows in Albums.");

            // Delete a range of rows from the Singers table where the column key is >=3 and <5.
            var cmd = connection.CreateDmlCommand("DELETE FROM Singers WHERE SingerId >= 3 AND SingerId < 5");
            rowCount += await cmd.ExecuteNonQueryAsync();
            Console.WriteLine($"{rowCount} row(s) deleted from Singers.");

            // Delete remaining Singers rows, which will also delete the remaining
            // Albums rows since it was defined with ON DELETE CASCADE.
            cmd = connection.CreateDmlCommand("DELETE FROM Singers WHERE true");
            rowCount += await cmd.ExecuteNonQueryAsync();
            Console.WriteLine($"{rowCount} row(s) deleted from Singers.");
        }
        return rowCount;
    }
}

Go

使用 Mutation 删除行。使用 Mutation.Delete() 方法构造一项 DELETE 变更,该变更会删除一行。Client.Apply() 方法以原子方式将变更应用于数据库。

以下示例逐一删除 Albums 表中的行,然后使用 KeyRange 删除 Singers 表中的所有行。


import (
	"context"
	"io"

	"cloud.google.com/go/spanner"
)

func delete(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	m := []*spanner.Mutation{
		// spanner.Key can be used to delete a specific set of rows.
		// Delete the Albums with the key values (2,1) and (2,3).
		spanner.Delete("Albums", spanner.Key{2, 1}),
		spanner.Delete("Albums", spanner.Key{2, 3}),
		// spanner.KeyRange can be used to delete rows with a key in a specific range.
		// Delete a range of rows where the column key is >=3 and <5
		spanner.Delete("Singers", spanner.KeyRange{Start: spanner.Key{3}, End: spanner.Key{5}, Kind: spanner.ClosedOpen}),
		// spanner.AllKeys can be used to delete all the rows in a table.
		// Delete remaining Singers rows, which will also delete the remaining Albums rows since it was
		// defined with ON DELETE CASCADE.
		spanner.Delete("Singers", spanner.AllKeys()),
	}
	_, err = client.Apply(ctx, m)
	return err
}

Java

使用 Mutation.delete() 方法删除行。

以下示例使用 KeySet.all() 方法删除 Albums 表中的所有行。删除 Albums 表中的行后,该示例使用 KeySet.singleKey() 方法创建的键逐一删除 Singers 表中的行。

static void deleteExampleData(DatabaseClient dbClient) {
  List<Mutation> mutations = new ArrayList<>();

  // KeySet.Builder can be used to delete a specific set of rows.
  // Delete the Albums with the key values (2,1) and (2,3).
  mutations.add(
      Mutation.delete(
          "Albums", KeySet.newBuilder().addKey(Key.of(2, 1)).addKey(Key.of(2, 3)).build()));

  // KeyRange can be used to delete rows with a key in a specific range.
  // Delete a range of rows where the column key is >=3 and <5
  mutations.add(
      Mutation.delete("Singers", KeySet.range(KeyRange.closedOpen(Key.of(3), Key.of(5)))));

  // KeySet.all() can be used to delete all the rows in a table.
  // Delete remaining Singers rows, which will also delete the remaining Albums rows since it was
  // defined with ON DELETE CASCADE.
  mutations.add(Mutation.delete("Singers", KeySet.all()));

  dbClient.write(mutations);
  System.out.printf("Records deleted.\n");
}

Node.js

使用 table.deleteRows() 方法删除行。

以下示例使用 table.deleteRows() 方法从 Singers 表中删除所有行。Albums 表中的行将被删除,原因是 Albums 表在 Singers 表中交错且定义为 ON DELETE CASCADE

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Instantiate Spanner table object
const albumsTable = database.table('Albums');

// Deletes individual rows from the Albums table.
try {
  const keys = [
    [2, 1],
    [2, 3],
  ];
  await albumsTable.deleteRows(keys);
  console.log('Deleted individual rows in Albums.');
} catch (err) {
  console.error('ERROR:', err);
}

// Delete a range of rows where the column key is >=3 and <5
database.runTransaction(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  try {
    const [rowCount] = await transaction.runUpdate({
      sql: 'DELETE FROM Singers WHERE SingerId >= 3 AND SingerId < 5',
    });
    console.log(`${rowCount} records deleted from Singers.`);
  } catch (err) {
    console.error('ERROR:', err);
  }

  // Deletes remaining rows from the Singers table and the Albums table,
  // because Albums table is defined with ON DELETE CASCADE.
  try {
    // The WHERE clause is required for DELETE statements to prevent
    // accidentally deleting all rows in a table.
    // https://cloud.google.com/spanner/docs/dml-syntax#where_clause
    const [rowCount] = await transaction.runUpdate({
      sql: 'DELETE FROM Singers WHERE true',
    });
    console.log(`${rowCount} records deleted from Singers.`);
    await transaction.commit();
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    // Close the database when finished.
    await database.close();
  }
});

PHP

使用 Database::delete() method 删除行。Database::delete() 方法页面包含一个示例。

Python

使用 Batch.delete() 方法删除行。

以下示例使用 KeySet 对象逐一删除 AlbumsSingers 表中的所有行。

def delete_data(instance_id, database_id):
    """Deletes sample data from the given database.

    The database, table, and data must already exist and can be created using
    `create_database` and `insert_data`.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    # Delete individual rows
    albums_to_delete = spanner.KeySet(keys=[[2, 1], [2, 3]])

    # Delete a range of rows where the column key is >=3 and <5
    singers_range = spanner.KeyRange(start_closed=[3], end_open=[5])
    singers_to_delete = spanner.KeySet(ranges=[singers_range])

    # Delete remaining Singers rows, which will also delete the remaining
    # Albums rows because Albums was defined with ON DELETE CASCADE
    remaining_singers = spanner.KeySet(all_=True)

    with database.batch() as batch:
        batch.delete("Albums", albums_to_delete)
        batch.delete("Singers", singers_to_delete)
        batch.delete("Singers", remaining_singers)

    print("Deleted data.")

Ruby

使用 Client#delete 方法删除行。Client#delete 页面包含一个示例。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

# Delete individual rows
client.delete "Albums", [[2, 1], [2, 3]]

# Delete a range of rows where the column key is >=3 and <5
key_range = client.range 3, 5, exclude_end: true
client.delete "Singers", key_range

# Delete remaining Singers rows, which will also delete the remaining
# Albums rows because Albums was defined with ON DELETE CASCADE
client.delete "Singers"