トランザクションの概要

このページでは、Spanner でのトランザクションについて説明します。また、トランザクションの実行に関するサンプルコードも示します。

はじめに

Spanner 内のトランザクションは、データベース内の列、行、およびテーブルにまたがる時間内の単一論理ポイントにおいてアトミックに実行される読み取りと書き込みのセットです。

Spanner は、次のトランザクション モードをサポートしています。

  • ロック型読み取り / 書き込み。これらのトランザクションは悲観的ロックに依存し、必要に応じて、2 フェーズ commit を行います。ロック型読み取り / 書き込みトランザクションは中止されることがあり、その場合、アプリケーションの再試行が必要になります。

  • 読み取り専用。このトランザクション タイプでは、複数の読み取りにまたがって整合性が保証されますが、書き込みは許されていません。デフォルトでは、読み取り専用トランザクションはシステムが選択したタイムスタンプで実行され、外部整合性が保証されますが、過去のタイムスタンプで読み取るように構成することもできます。読み取り専用トランザクションは、commit する必要がなく、ロックされることもありません。また、読み取り専用トランザクションは、実行前に進行中の書き込みが完了するのを待つことがあります。

  • パーティション化された DMLこのトランザクション タイプでは、データ操作言語(DML)のステートメントをパーティション化された DML として実行します。パーティション化された DML は、一括更新と削除、特に定期的なクリーンアップとバックフィル用に設計されています。多数の盲目的書き込みを commit する必要があるが、アトミック トランザクションは必要ない場合は、バッチ書き込みを使用して Spanner テーブルを一括変更できます。詳細については、バッチ書き込みを使用してデータを変更するをご覧ください。

このページでは、Spanner のトランザクションの一般的なプロパティおよびセマンティクスについて説明し、Spanner のトランザクション(読み取り / 書き込み、読み取り専用、パーティション化された DML)のインターフェースについて紹介します。

読み取り / 書き込みトランザクション

以下のシナリオでは、ロック型読み取り / 書き込みトランザクションを使用する必要があります。

  • 1 つ以上の読み取りの結果に依存する書き込みを行う場合、それらの読み取り / 書き込みは、同一の読み取り / 書き込みトランザクション内で行う必要があります。
    • 例: 銀行口座 A の残高を倍にする。A の残高の読み取りは、残高を倍の値で置き換える書き込みと同一のトランザクション内で行う必要があります。

  • アトミックに commit する必要がある 1 つ以上の書き込みを行う場合、それらの書き込みは同一の読み取り / 書き込みトランザクション内で行う必要があります。
    • 例: 口座 A から口座 B に $200 を振り込む。2 つの書き込み(A で $200 を引く書き込みと、B で $200 を増やす書き込み)と、初期の口座残高の読み取りは、同一トランザクション内で行う必要があります。

  • 1 つ以上の読み取りの結果に応じて 1 つ以上の書き込みを行う可能性がある場合、最終的には書き込みの実行が必要にならないとしても、これらの読み取り / 書き込みは同一読み取り / 書き込みトランザクション内で行う必要があります。
    • 例: 銀行口座 A の現在の残高が $500 より多ければ、銀行口座 A から銀行口座 B に $200 を振り込む。トランザクションには、A の残高の読み取りと、書き込みが含まれている条件文とを含める必要があります。

次に、ロック型読み取り / 書き込みトランザクションを使用してはならないシナリオを示します。

  • 読み取りしか行わず、単一読み取りメソッドを使用して読み取りを表現できる場合は、その単一読み取りメソッド、つまり読み取り専用トランザクションを使用することをおすすめします。単一読み取りは、読み取り / 書き込みトランザクションとは異なり、ロックしません。

プロパティ

Spanner 内の読み取り / 書き込みトランザクションが、時間の単一論理ポイントで一連の読み取り / 書き込みをアトミックに実行します。また、読み書きを実行したタイムスタンプが実時間と一致し、直列化順序がタイムスタンプの順序と一致しています。

読み書きトランザクションを使用する理由ですが、読み書きトランザクションは、リレーショナル データベースの ACID 特性を備えています(実際には、Spanner の読み書きトランザクションは、従来のACIDより強力な保証を備えています。下のセマンティクス セクションを参照してください)。

分離

読み取り / 書き込みトランザクションと読み取り専用トランザクションの分離プロパティは次のとおりです。

読み取りと書き込みを行うトランザクション

一連の読み取り(またはクエリ)と書き込みを含むトランザクションが正常に commit された後に取得される分離プロパティは次のとおりです。

  • トランザクション内のすべての読み取りが、トランザクションの commit タイムスタンプで取得された一貫性のあるスナップショットを反映する値を返した。
  • commit 時に空の行または範囲が残っている。
  • トランザクション内のすべての書き込みは、トランザクションの commit タイムスタンプで commit された。
  • 書き込みは、トランザクションが commit されるまで、どのトランザクションにも認識されなかった。

一部の Spanner クライアント ドライバには、一時的なエラーをマスクするトランザクション再試行ロジックが含まれています。これは、トランザクションを再実行し、クライアントが検出したデータを検証することで行われます。

すべての読み取りおよび書き込みが、トランザクション自体の視点からも、Spanner データベースの他のリーダーおよびライターの視点からも、単一の時点で発生しているように見えるという効果があります。つまり、読み込みと書き込みは同じタイムスタンプで終了します(下記のシリアル化可能性と外部整合性セクションで、これを図式化してあります)。

読み取りのみを行うトランザクション

読み取りのみを行う読み書きトランザクションの保証は似ています。行が存在しない場合でも、そのトランザクション内のすべての読み取りは、同一のタイムスタンプからデータを返します。異なるのは、データを読み取り、後で書き込みを行わずに読み取り / 書き込みトランザクションを commit する場合、読み取りを行ってから commit までの間にデータベースのデータが変化しなかったことが保証されないことです。最後に読み取ってからデータが変化したかどうかを知りたい場合の最善のアプローチは、(読み取り / 書き込みトランザクションまたは強力な読み取りを使用して)データを再び読み取ることです。また、効率性のため、読み取りのみを行い、書き込みを行わないことが事前にわかっている場合は、読み取り / 書き込みトランザクションではなく読み取り専用トランザクションを使用する必要があります。

アトミック性、整合性、耐久性

Spanner は、分離プロパティに加えて、アトミック性(トランザクション内の書き込みの 1 つが commit すると、すべての書き込みが commit したことになる)、整合性(トランザクションの終了後、データベースは整合性のある状態を保つ)、および耐久性(commit されたデータは commit された状態を保つ)を備えています。

これらのプロパティの利点

これらのプロパティのため、アプリケーションのデベロッパーは、同時に実行される可能性がある他のトランザクションからその実行を保護することに煩わされることなく、独自に各トランザクションの正確さのみにフォーカスできます。

インターフェース

Spanner クライアント ライブラリは、トランザクションが中止した場合は再試行をしながら、読み取り / 書き込みのトランザクションのコンテキストにおける一連の作業を実行するためのインターフェースを提供します。このようなコンテキストの例として、Spanner のトランザクションは commit するまで何回か再試行しなくてはならない場合があります。たとえば、2 つのトランザクションが同時に特定のデータのオペレーションを試みる場合、Spanner は一方のトランザクションをアボートさせて、他方のトランザクションが先に進むことができるようにします(更に稀ですが、Spanner 内の一時的なイベントがトランザクションをアボートさせる原因になることもあります)。トランザクションはアトミックなので、アボートさせられたトランザクションによってデータベースが目に見えて変化することはありません。したがって、トランザクションは成功するまで再試行を繰り返す必要があります。

Spanner クライアント ライブラリでトランザクションを使用する場合、トランザクションの本文(データベース内の 1 つ以上のテーブルに対する読み取りと書き込み)を関数オブジェクトの形式で定義します。Spanner クライアント ライブラリ内部では、トランザクションが commit するまで、または再試行不可能なエラーが発生するまで、関数の実行を繰り返します。

たとえば、スキーマとデータモデルのページに示されている Albums テーブルMarketingBudget 列を追加したとします。

CREATE TABLE Albums (
  SingerId        INT64 NOT NULL,
  AlbumId         INT64 NOT NULL,
  AlbumTitle      STRING(MAX),
  MarketingBudget INT64
) PRIMARY KEY (SingerId, AlbumId);

マーケティング部門は Albums (1, 1) をキーとするアルバムのマーケティングを push することを決定し、Albums (2, 2) の予算から $200,000 を移す(ただし、そのアルバムの予算からその金額を使用できる場合に限り)ように依頼しました。このオペレーションでは、トランザクションは、読み取りの結果、書き込みを行う可能性があるので、ロック型読み取り / 書き込みトランザクションを使用する必要があります。

以下では、読み書きトランザクションを実行する方法を示します。

C++

void ReadWriteTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  using ::google::cloud::StatusOr;

  // A helper to read a single album MarketingBudget.
  auto get_current_budget =
      [](spanner::Client client, spanner::Transaction txn,
         std::int64_t singer_id,
         std::int64_t album_id) -> StatusOr<std::int64_t> {
    auto key = spanner::KeySet().AddKey(spanner::MakeKey(singer_id, album_id));
    auto rows = client.Read(std::move(txn), "Albums", std::move(key),
                            {"MarketingBudget"});
    using RowType = std::tuple<std::int64_t>;
    auto row = spanner::GetSingularRow(spanner::StreamOf<RowType>(rows));
    if (!row) return std::move(row).status();
    return std::get<0>(*std::move(row));
  };

  auto commit = client.Commit(
      [&client, &get_current_budget](
          spanner::Transaction const& txn) -> StatusOr<spanner::Mutations> {
        auto b1 = get_current_budget(client, txn, 1, 1);
        if (!b1) return std::move(b1).status();
        auto b2 = get_current_budget(client, txn, 2, 2);
        if (!b2) return std::move(b2).status();
        std::int64_t transfer_amount = 200000;

        return spanner::Mutations{
            spanner::UpdateMutationBuilder(
                "Albums", {"SingerId", "AlbumId", "MarketingBudget"})
                .EmplaceRow(1, 1, *b1 + transfer_amount)
                .EmplaceRow(2, 2, *b2 - transfer_amount)
                .Build()};
      });

  if (!commit) throw std::move(commit).status();
  std::cout << "Transfer was successful [spanner_read_write_transaction]\n";
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;
using System.Transactions;

public class ReadWriteWithTransactionAsyncSample
{
    public async Task<int> ReadWriteWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        // This sample transfers 200,000 from the MarketingBudget
        // field of the second Album to the first Album. Make sure to run
        // the Add Column and Write Data To New Column samples first,
        // in that order.

        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        decimal transferAmount = 200000;
        decimal secondBudget = 0;
        decimal firstBudget = 0;

        using var connection = new SpannerConnection(connectionString);
        using var cmdLookup1 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 2 AND AlbumId = 2");

        using (var reader = await cmdLookup1.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                // Read the second album's budget.
                secondBudget = reader.GetFieldValue<decimal>("MarketingBudget");
                // Confirm second Album's budget is sufficient and
                // if not raise an exception. Raising an exception
                // will automatically roll back the transaction.
                if (secondBudget < transferAmount)
                {
                    throw new Exception($"The second album's budget {secondBudget} is less than the amount to transfer.");
                }
            }
        }

        // Read the first album's budget.
        using var cmdLookup2 = connection.CreateSelectCommand("SELECT * FROM Albums WHERE SingerId = 1 and AlbumId = 1");
        using (var reader = await cmdLookup2.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                firstBudget = reader.GetFieldValue<decimal>("MarketingBudget");
            }
        }

        // Specify update command parameters.
        using var cmdUpdate = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
        {
            { "SingerId", SpannerDbType.Int64 },
            { "AlbumId", SpannerDbType.Int64 },
            { "MarketingBudget", SpannerDbType.Int64 },
        });

        // Update second album to remove the transfer amount.
        secondBudget -= transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 2;
        cmdUpdate.Parameters["AlbumId"].Value = 2;
        cmdUpdate.Parameters["MarketingBudget"].Value = secondBudget;
        var rowCount = await cmdUpdate.ExecuteNonQueryAsync();

        // Update first album to add the transfer amount.
        firstBudget += transferAmount;
        cmdUpdate.Parameters["SingerId"].Value = 1;
        cmdUpdate.Parameters["AlbumId"].Value = 1;
        cmdUpdate.Parameters["MarketingBudget"].Value = firstBudget;
        rowCount += await cmdUpdate.ExecuteNonQueryAsync();
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return rowCount;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func writeWithTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		getBudget := func(key spanner.Key) (int64, error) {
			row, err := txn.ReadRow(ctx, "Albums", key, []string{"MarketingBudget"})
			if err != nil {
				return 0, err
			}
			var budget int64
			if err := row.Column(0, &budget); err != nil {
				return 0, err
			}
			return budget, nil
		}
		album2Budget, err := getBudget(spanner.Key{2, 2})
		if err != nil {
			return err
		}
		const transferAmt = 200000
		if album2Budget >= transferAmt {
			album1Budget, err := getBudget(spanner.Key{1, 1})
			if err != nil {
				return err
			}
			album1Budget += transferAmt
			album2Budget -= transferAmt
			cols := []string{"SingerId", "AlbumId", "MarketingBudget"}
			txn.BufferWrite([]*spanner.Mutation{
				spanner.Update("Albums", cols, []interface{}{1, 1, album1Budget}),
				spanner.Update("Albums", cols, []interface{}{2, 2, album2Budget}),
			})
			fmt.Fprintf(w, "Moved %d from Album2's MarketingBudget to Album1's.", transferAmt)
		}
		return nil
	})
	return err
}

Java

static void writeWithTransaction(DatabaseClient dbClient) {
  dbClient
      .readWriteTransaction()
      .run(transaction -> {
        // Transfer marketing budget from one album to another. We do it in a transaction to
        // ensure that the transfer is atomic.
        Struct row =
            transaction.readRow("Albums", Key.of(2, 2), Arrays.asList("MarketingBudget"));
        long album2Budget = row.getLong(0);
        // Transaction will only be committed if this condition still holds at the time of
        // commit. Otherwise it will be aborted and the callable will be rerun by the
        // client library.
        long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget =
              transaction
                  .readRow("Albums", Key.of(1, 1), Arrays.asList("MarketingBudget"))
                  .getLong(0);
          album1Budget += transfer;
          album2Budget -= transfer;
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(1)
                  .set("AlbumId")
                  .to(1)
                  .set("MarketingBudget")
                  .to(album1Budget)
                  .build());
          transaction.buffer(
              Mutation.newUpdateBuilder("Albums")
                  .set("SingerId")
                  .to(2)
                  .set("AlbumId")
                  .to(2)
                  .set("MarketingBudget")
                  .to(album2Budget)
                  .build());
        }
        return null;
      });
}

Node.js

// This sample transfers 200,000 from the MarketingBudget field
// of the second Album to the first Album, as long as the second
// Album has enough money in its budget. Make sure to run the
// addColumn and updateData samples first (in that order).

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const transferAmount = 200000;

database.runTransaction(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  let firstBudget, secondBudget;
  const queryOne = {
    columns: ['MarketingBudget'],
    keys: [[2, 2]], // SingerId: 2, AlbumId: 2
  };

  const queryTwo = {
    columns: ['MarketingBudget'],
    keys: [[1, 1]], // SingerId: 1, AlbumId: 1
  };

  Promise.all([
    // Reads the second album's budget
    transaction.read('Albums', queryOne).then(results => {
      // Gets second album's budget
      const rows = results[0].map(row => row.toJSON());
      secondBudget = rows[0].MarketingBudget;
      console.log(`The second album's marketing budget: ${secondBudget}`);

      // Makes sure the second album's budget is large enough
      if (secondBudget < transferAmount) {
        throw new Error(
          `The second album's budget (${secondBudget}) is less than the transfer amount (${transferAmount}).`
        );
      }
    }),

    // Reads the first album's budget
    transaction.read('Albums', queryTwo).then(results => {
      // Gets first album's budget
      const rows = results[0].map(row => row.toJSON());
      firstBudget = rows[0].MarketingBudget;
      console.log(`The first album's marketing budget: ${firstBudget}`);
    }),
  ])
    .then(() => {
      console.log(firstBudget, secondBudget);
      // Transfers the budgets between the albums
      firstBudget += transferAmount;
      secondBudget -= transferAmount;

      console.log(firstBudget, secondBudget);

      // Updates the database
      // Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so they
      // must be converted (back) to strings before being inserted as INT64s.
      transaction.update('Albums', [
        {
          SingerId: '1',
          AlbumId: '1',
          MarketingBudget: firstBudget.toString(),
        },
        {
          SingerId: '2',
          AlbumId: '2',
          MarketingBudget: secondBudget.toString(),
        },
      ]);
    })
    .then(() => {
      // Commits the transaction and send the changes to the database
      return transaction.commit();
    })
    .then(() => {
      console.log(
        `Successfully executed read-write transaction to transfer ${transferAmount} from Album 2 to Album 1.`
      );
    })
    .catch(err => {
      console.error('ERROR:', err);
    })
    .then(() => {
      transaction.end();
      // Closes the database when finished
      return database.close();
    });
});

PHP

use Google\Cloud\Spanner\SpannerClient;
use Google\Cloud\Spanner\Transaction;
use UnexpectedValueException;

/**
 * Performs a read-write transaction to update two sample records in the
 * database.
 *
 * This will transfer 200,000 from the `MarketingBudget` field for the second
 * Album to the first Album. If the `MarketingBudget` for the second Album is
 * too low, it will raise an exception.
 *
 * Before running this sample, you will need to run the `update_data` sample
 * to populate the fields.
 * Example:
 * ```
 * read_write_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_write_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $database->runTransaction(function (Transaction $t) use ($spanner) {
        $transferAmount = 200000;

        // Read the second album's budget.
        $secondAlbumKey = [2, 2];
        $secondAlbumKeySet = $spanner->keySet(['keys' => [$secondAlbumKey]]);
        $secondAlbumResult = $t->read(
            'Albums',
            $secondAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        $firstRow = $secondAlbumResult->rows()->current();
        $secondAlbumBudget = $firstRow['MarketingBudget'];
        if ($secondAlbumBudget < $transferAmount) {
            // Throwing an exception will automatically roll back the transaction.
            throw new UnexpectedValueException(
                'The second album\'s budget is lower than the transfer amount: ' . $transferAmount
            );
        }

        $firstAlbumKey = [1, 1];
        $firstAlbumKeySet = $spanner->keySet(['keys' => [$firstAlbumKey]]);
        $firstAlbumResult = $t->read(
            'Albums',
            $firstAlbumKeySet,
            ['MarketingBudget'],
            ['limit' => 1]
        );

        // Read the first album's budget.
        $firstRow = $firstAlbumResult->rows()->current();
        $firstAlbumBudget = $firstRow['MarketingBudget'];

        // Update the budgets.
        $secondAlbumBudget -= $transferAmount;
        $firstAlbumBudget += $transferAmount;
        printf('Setting first album\'s budget to %s and the second album\'s ' .
            'budget to %s.' . PHP_EOL, $firstAlbumBudget, $secondAlbumBudget);

        // Update the rows.
        $t->updateBatch('Albums', [
            ['SingerId' => 1, 'AlbumId' => 1, 'MarketingBudget' => $firstAlbumBudget],
            ['SingerId' => 2, 'AlbumId' => 2, 'MarketingBudget' => $secondAlbumBudget],
        ]);

        // Commit the transaction!
        $t->commit();

        print('Transaction complete.' . PHP_EOL);
    });
}

Python

def read_write_transaction(instance_id, database_id):
    """Performs a read-write transaction to update two sample records in the
    database.

    This will transfer 200,000 from the `MarketingBudget` field for the second
    Album to the first Album. If the `MarketingBudget` is too low, it will
    raise an exception.

    Before running this sample, you will need to run the `update_data` sample
    to populate the fields.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    def update_albums(transaction):
        # Read the second album budget.
        second_album_keyset = spanner.KeySet(keys=[(2, 2)])
        second_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=second_album_keyset,
            limit=1,
        )
        second_album_row = list(second_album_result)[0]
        second_album_budget = second_album_row[0]

        transfer_amount = 200000

        if second_album_budget < transfer_amount:
            # Raising an exception will automatically roll back the
            # transaction.
            raise ValueError("The second album doesn't have enough funds to transfer")

        # Read the first album's budget.
        first_album_keyset = spanner.KeySet(keys=[(1, 1)])
        first_album_result = transaction.read(
            table="Albums",
            columns=("MarketingBudget",),
            keyset=first_album_keyset,
            limit=1,
        )
        first_album_row = list(first_album_result)[0]
        first_album_budget = first_album_row[0]

        # Update the budgets.
        second_album_budget -= transfer_amount
        first_album_budget += transfer_amount
        print(
            "Setting first album's budget to {} and the second album's "
            "budget to {}.".format(first_album_budget, second_album_budget)
        )

        # Update the rows.
        transaction.update(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            values=[(1, 1, first_album_budget), (2, 2, second_album_budget)],
        )

    database.run_in_transaction(update_albums)

    print("Transaction complete.")

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner         = Google::Cloud::Spanner.new project: project_id
client          = spanner.client instance_id, database_id
transfer_amount = 200_000

client.transaction do |transaction|
  first_album  = transaction.read("Albums", [:MarketingBudget], keys: [[1, 1]]).rows.first
  second_album = transaction.read("Albums", [:MarketingBudget], keys: [[2, 2]]).rows.first

  raise "The second album does not have enough funds to transfer" if second_album[:MarketingBudget] < transfer_amount

  new_first_album_budget  = first_album[:MarketingBudget] + transfer_amount
  new_second_album_budget = second_album[:MarketingBudget] - transfer_amount

  transaction.update "Albums", [
    { SingerId: 1, AlbumId: 1, MarketingBudget: new_first_album_budget  },
    { SingerId: 2, AlbumId: 2, MarketingBudget: new_second_album_budget }
  ]
end

puts "Transaction complete"

セマンティクス

直列化可能性と外部整合性

Spanner は「シリアル化可能性」を備えています。これは、異なるトランザクションの読み取り、書き込み、その他のオペレーションの一部が実際には並列に発生したとしても、すべてのトランザクションが直列に実行されるように見えることを意味しています。Spanner は、commit したトランザクションの順序を反映した commit タイムスタンプを割り当てます。実際には、Spanner は外部整合性と呼ばれるシリアル化可能性より強い保証を備えています。すなわち、トランザクションの commit 順序は commit タイムスタンプに反映され、これらの commit タイムスタンプは「リアルタイム」であるため、実時間と比較できます。トランザクション内の読み取りでは、トランザクション commit 前に commit されたすべてのものを認識でき、トランザクション commit 後に開始されたすべてのものが書き込みを認識できます。

たとえば、下の図で示した 2 つのトランザクションの実行について考えます。

同じデータを読み取る 2 つのトランザクションの実行を示しているタイムライン

青色のトランザクション Txn1 は、データ A を読み取り、A への書き込みをバッファリングして、正常に commit します。緑色のトランザクション Txn2Txn1 の後に開始し、データ B を読み取った後、データ A を読み取ります。Txn2Txn1A への書き込みを commit した後に A の値を読み取るので、Txn1 の完了前に Txn2 が開始したとしても、Txn1 による A への書き込みの影響を Txn2 に与えます。

Txn1Txn2 の実行時間には多少のオーバーラップがあるものの、それらの commit タイムスタンプである c1c2 は線形的なトランザクション順序に従います。つまり、Txn1 の読み取りと書き込みのすべての効果が単一の時点(c1)で発生したように見え、Txn2 の読み取りと書き込みのすべての効果が単一の時点(c2)で発生したように見えます。さらに、c1 < c2 となります(これは、Txn1Txn2 の両方が書き込みを commit することで保証されます。これは書き込みが異なるマシンで実行された場合でも、同様に保証されます)。これは Txn1 の順序が Txn2 よりも前になることを意味します(ただし、トランザクションで Txn2 が読み取りのみを行った場合は、c1 <= c2 となります)。

読み取りでは commit 履歴のプレフィックスを監視します。読み取りに Txn2 が影響を及ぼした場合、Txn1 も影響を及ぼします。正常に commit したすべてのトランザクションにこの特性があります。

読み取りと書き込みの保証

トランザクション実行の呼び出しが失敗した場合、読み取りと書き込みが保証されるかどうかは、下位の commit 呼び出しが失敗する原因となったエラーのタイプに依存します。

たとえば、「行が見つかりません」、「行はすでに存在します」などのエラーは、バッファリングされた変異の書き込みで、クライアントがアップデートを試みた行が存在しないなど、なんらかのエラーに遭遇したことを意味します。このような場合、読み取りは整合性があることが保証され、書き込みは該当せず、行の不在は読み取りと整合性があることが保証されます。

トランザクション オペレーションのキャンセル

非同期読み取りオペレーションは、トランザクション内の他の既存のオペレーションに影響を与えることなく、ユーザーがいつでも(たとえば、高位のオペレーションをキャンセルしたり、最初の読み取りから受け取った結果に基づいて、読み取りを停止することを決定したとき)キャンセルできます。

ただし、読み取りのキャンセルを試みたとしても、Spanner によって読み取りが実際にキャンセルされることが保証されるわけではありません。読み取りのキャンセルを依頼しても、その読み取りが正常に終了したり、他の理由(アボートなど)で失敗することもあり得ます。さらに、そのキャンセルした読み取りが実際になんらかの結果を返してきて、その完了していない可能性のある結果が、トランザクションの commit の一部として確認されることもあり得ます。

読み取りとは異なり、トランザクションの commit オペレーションをキャンセルすると、トランザクションをアボートすることになることに注意してください(トランザクションがすでに別の理由で commit した場合、または失敗した場合を除く)。

パフォーマンス

ロック

Spanner では、複数のクライアントが同時に同一データベースと対話できます。Spanner は、複数の並列トランザクションの整合性を保証するために、共有的ロックと排他的ロックの組み合わせを使用して、データへのアクセスを制御します。トランザクションの一環で読み取りを実行すると、Spanner は共有的読み取りロックを取得するので、トランザクションが commit する準備ができるまで、他の読み取りは相変わらずデータにアクセスできます。トランザクションが commit し、書き込みに適用すると、トランザクションは排他的ロックへのアップグレードを試みます。データに対する新しい共有読み取りロックをブロックし、既存の共有読み取りロックが解除されるのを待ってから、データへの排他アクセスのための排他ロックを設定します。

ロックについての注意事項:

  • ロックは、行および列の粒度で実施されます。トランザクション T1 が行「foo」の列「A」をロックしているときに、トランザクション T2 が行「foo」の列「B」に書き込んでも、競合は発生しません。
  • 書き込む前のデータを読み取らない、データ項目への書き込み(「盲目的書き込み」と呼ばれることもある)は、同一項目に対する他の盲目的ライターと競合しません(各書き込みの commit タイムスタンプが、書き込みがデータベースに適用される順序を決定します)。そのため、書き込むデータをすでに読み取っている場合は、Spanner には排他的ロックへアップグレードすることのみが必要になります。それ以外の場合、Spanner はライター共有的ロックと呼ばれる共有的ロックを使用します。
  • 読み取り / 書き込みトランザクション内で行検索を実行する場合は、セカンダリ インデックスを使用して、スキャンされる行の範囲を絞ります。これにより、Spanner によりロックされるテーブルの行数が少なくなり、範囲外の行を同時に変更できるようになります。
  • Spanner 以外のリソースへの排他的なアクセスを確保するために、ロックを使用しないでください。たとえば、インスタンスのコンピューティング リソース周辺でのデータの移動を可能にしたときなど、いくつかの理由で、Spanner によりトランザクションが中断される場合があります。トランザクションが再試行された場合、アプリケーション コードによって明示的に試行されたか、Spanner JDBC ドライバなどのクライアント コードによって暗黙に試行されたかにかかわらず、トランザクションの試行が実際に行われている間に、ロックが実施されたことのみが保証されます。

  • ロックの統計情報のイントロスペクション ツールを使用して、データベースでロックの競合を調査できます。

デッドロックの検出

Spanner は、複数のトランザクションがデッドロックに陥る可能性を検出し、1 つを除くすべてのトランザクションを強制的に中止します。たとえば、次のシナリオについて考えてみましょう。トランザクション Txn1 はレコード A のロックを保持しており、レコード B がロックできる状態になるのを待っています。Txn2 はレコード B のロックを保持しており、レコード A がロックできる状態になるのを待っています。このような状況を打開するための唯一の方法は、一方のトランザクションを中止し、そのロックを解放することで、もう一方のトランザクションの処理を行うことです。

Spanner は、標準的な「wound-wait」アルゴリズムを使用して、デッドロックの検出を処理します。Spanner は、競合するロックをリクエストする各トランザクションの経過時間を内部で追跡します。さらに、古いトランザクションが後から開始したトランザクションを中止できるようにします(「古い」トランザクションとは、トランザクション内で最も早い読み取り、クエリ、commit が、より早く発生したトランザクションを意味します)。

Spanner は、古いトランザクションを優先することで、すべてのトランザクションが、時間の経過に伴い、他のトランザクションよりも古くなって優先順位が高くなることで、ロックを取得できるようにしています。たとえば、リーダー共有的ロックを取得しているトランザクションが、ライター共有的ロックを必要とする、より古いトランザクションによって中止されることがあります。

分散実行

Spanner は、複数のサーバーにまたがるデータに対してトランザクションを実行できます。この機能を実現することで、単一サーバー トランザクションと比較して、パフォーマンスが低くなります。

分散できるトランザクションのタイプSpanner は、内部的に、多くのサーバー間でデータベースの行の責任を分割できます。行と、それに対応するインターリーブ テーブル内の行は、近隣のキーを持つ同一テーブル内の 2 行として、通常、同じサーバーによってサービスを提供されます。Spanner は、さまざまなサーバー上の行にまたがるトランザクションを実行できます。ただし、経験則として、同じ場所に配置された多くの行に関わるトランザクションは、データベース全体または大きなテーブル全体に散在する多くの行に関わるトランザクションよりも高速かつ低コストです。

Spanner のトランザクションとしては、アトミックに適用される読み取りと書き込みのみが含まれるものが最も効率的です。すべての読み取りと書き込みがキースペースの同じ部分にあるデータにアクセスする場合に、トランザクションは最も速くなります。

読み取り専用トランザクション

Spanner は、ロック型読み書きトランザクションに加えて、読み取り専用トランザクションも備えています。

同一タイムスタンプで複数の読み取りを実行する必要がある場合、読み取り専用トランザクションを使用してください。読み取りを Spanner の単一読み取りメソッドで表現できる場合、代わりにその単一読み取りメソッドを使用する必要があります。このような単一読み取りメソッドを使用した場合のパフォーマンスは、読み取り専用トランザクションで実行される単一の読み取りのパフォーマンスに匹敵する必要があります。

大量のデータを読み取る場合は、パーティションを使用してデータを並列で読み取ることを検討してください。

読み取り専用トランザクションは書き込まないので、ロックを保持することはなく、他のトランザクションをブロックすることもありません。読み取り専用トランザクションはトランザクションの commit 履歴の整合性のあるプレフィックスを監視しているので、アプリケーションは常に整合性のあるデータを取得できます。

プロパティ

Spanner の読み取り専用トランザクションは、読み取り専用トランザクション自体の視点からも、Spanner データベースの他のリーダーおよびライターの両方の視点からも、時間的に単一の論理ポイントで、一連の読み取りを実行します。これは、読み取り専用トランザクションは、トランザクション履歴の一定の時点のデータベースの整合性のある状態を常に監視していることを意味します。

インターフェース

Spanner は、トランザクションが中止した場合は再試行をしながら、読み取り専用トランザクションのコンテキストにおける一連の作業を実行するためのインターフェースを提供します。

以下は、読み取り専用トランザクションを使用して、同一のタイムスタンプの 2 つの読み取りで整合性のあるデータを取得する方法を示しています。

C++

void ReadOnlyTransaction(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto read_only = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  // Read#1.
  auto rows1 = client.ExecuteQuery(read_only, select);
  std::cout << "Read 1 results\n";
  for (auto& row : spanner::StreamOf<RowType>(rows1)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
  // Read#2. Even if changes occur in-between the reads the transaction ensures
  // that Read #1 and Read #2 return the same data.
  auto rows2 = client.ExecuteQuery(read_only, select);
  std::cout << "Read 2 results\n";
  for (auto& row : spanner::StreamOf<RowType>(rows2)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;

public class QueryDataWithTransactionAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QueryDataWithTransactionAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        using var connection = new SpannerConnection(connectionString);

        // Opens the connection so that the Spanner transaction included in the TransactionScope
        // is read-only TimestampBound.Strong.
        await connection.OpenAsync(SpannerTransactionCreationOptions.ReadOnly, options: null, cancellationToken: default);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        // Read #1.
        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                Console.WriteLine("SingerId : " + reader.GetFieldValue<string>("SingerId")
                    + " AlbumId : " + reader.GetFieldValue<string>("AlbumId")
                    + " AlbumTitle : " + reader.GetFieldValue<string>("AlbumTitle"));
            }
        }

        // Read #2. Even if changes occur in-between the reads,
        // the transaction ensures that Read #1 and Read #2
        // return the same data.
        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                albums.Add(new Album
                {
                    AlbumId = reader.GetFieldValue<int>("AlbumId"),
                    SingerId = reader.GetFieldValue<int>("SingerId"),
                    AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
                });
            }
        }
        scope.Complete();
        Console.WriteLine("Transaction complete.");
        return albums;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readOnlyTransaction(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction()
	defer ro.Close()
	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := ro.Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}

	iter = ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

static void readOnlyTransaction(DatabaseClient dbClient) {
  // ReadOnlyTransaction must be closed by calling close() on it to release resources held by it.
  // We use a try-with-resource block to automatically do so.
  try (ReadOnlyTransaction transaction = dbClient.readOnlyTransaction()) {
    ResultSet queryResultSet =
        transaction.executeQuery(
            Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"));
    while (queryResultSet.next()) {
      System.out.printf(
          "%d %d %s\n",
          queryResultSet.getLong(0), queryResultSet.getLong(1), queryResultSet.getString(2));
    }
    try (ResultSet readResultSet =
        transaction.read(
            "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
      while (readResultSet.next()) {
        System.out.printf(
            "%d %d %s\n",
            readResultSet.getLong(0), readResultSet.getLong(1), readResultSet.getString(2));
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Gets a transaction object that captures the database state
// at a specific point in time
database.getSnapshot(async (err, transaction) => {
  if (err) {
    console.error(err);
    return;
  }
  const queryOne = 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums';

  try {
    // Read #1, using SQL
    const [qOneRows] = await transaction.run(queryOne);

    qOneRows.forEach(row => {
      const json = row.toJSON();
      console.log(
        `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
      );
    });

    const queryTwo = {
      columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
    };

    // Read #2, using the `read` method. Even if changes occur
    // in-between the reads, the transaction ensures that both
    // return the same data.
    const [qTwoRows] = await transaction.read('Albums', queryTwo);

    qTwoRows.forEach(row => {
      const json = row.toJSON();
      console.log(
        `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
      );
    });

    console.log('Successfully executed read-only transaction.');
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    transaction.end();
    // Close the database when finished.
    await database.close();
  }
});

PHP

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads data inside of a read-only transaction.
 *
 * Within the read-only transaction, or "snapshot", the application sees
 * consistent view of the database at a particular timestamp.
 * Example:
 * ```
 * read_only_transaction($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_only_transaction(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $snapshot = $database->snapshot();
    $results = $snapshot->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );
    print('Results from the first read:' . PHP_EOL);
    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }

    // Perform another read using the `read` method. Even if the data
    // is updated in-between the reads, the snapshot ensures that both
    // return the same data.
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    print('Results from the second read:' . PHP_EOL);
    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

def read_only_transaction(instance_id, database_id):
    """Reads data inside of a read-only transaction.

    Within the read-only transaction, or "snapshot", the application sees
    consistent view of the database at a particular timestamp.
    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot(multi_use=True) as snapshot:
        # Read using SQL.
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        print("Results from first read:")
        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

        # Perform another read using the `read` method. Even if the data
        # is updated in-between the reads, the snapshot ensures that both
        # return the same data.
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        print("Results from second read:")
        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.snapshot do |snapshot|
  snapshot.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
    puts "#{row[:AlbumId]} #{row[:AlbumTitle]} #{row[:SingerId]}"
  end

  # Even if changes occur in-between the reads, the transaction ensures that
  # both return the same data.
  snapshot.read("Albums", [:AlbumId, :AlbumTitle, :SingerId]).rows.each do |row|
    puts "#{row[:AlbumId]} #{row[:AlbumTitle]} #{row[:SingerId]}"
  end
end

パーティション化 DML トランザクション

パーティション化されたデータ操作言語(パーティション化 DML)を使用すると、大規模な UPDATE ステートメントや DELETE ステートメントを実行できます。実行中に、トランザクション数の上限に達することも、テーブル全体がロックされることもありません。Spanner はキースペースをパーティション化し、別々の読み取り / 書き込みトランザクションで各パーティションの DML ステートメントを実行します。

読み書きトランザクションで DML ステートメントを実行するには、ステートメントをコード内で明示的に作成します。詳しくは、DML の使用をご覧ください。

プロパティ

クライアント ライブラリのメソッドと Google Cloud CLI のいずれを使用する場合でも、パーティション化 DML ステートメントは一度に 1 つしか実行できません。

パーティション化されたトランザクションは、commit またはロールバックをサポートしていません。Spanner は、DML ステートメントをすぐに実行し、適用します。オペレーションをキャンセルした場合やオペレーションが失敗した場合、Spanner は実行中のすべてのパーティションをキャンセルし、残りのパーティションを開始しません。Spanner は、すでに実行されているパーティションをロールバックしません。

インターフェース

Spanner は、単一のパーティション化 DML ステートメントを実行するインターフェースを提供します。

次のコード例は、Albums テーブルの MarketingBudget 列を更新します。

C++

ExecutePartitionedDml() 関数を使用して、パーティション化 DML ステートメントを実行します。

void DmlPartitionedUpdate(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto result = client.ExecutePartitionedDml(
      spanner::SqlStatement("UPDATE Albums SET MarketingBudget = 100000"
                            "  WHERE SingerId > 1"));
  if (!result) throw std::move(result).status();
  std::cout << "Updated at least " << result->row_count_lower_bound
            << " row(s) [spanner_dml_partitioned_update]\n";
}

C#

ExecutePartitionedUpdateAsync() メソッドを使用して、パーティション化 DML ステートメントを実行します。


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class UpdateUsingPartitionedDmlCoreAsyncSample
{
    public async Task<long> UpdateUsingPartitionedDmlCoreAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var cmd = connection.CreateDmlCommand("UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1");
        long rowCount = await cmd.ExecutePartitionedUpdateAsync();

        Console.WriteLine($"{rowCount} row(s) updated...");
        return rowCount;
    }
}

Go

PartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func updateUsingPartitionedDML(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"}
	rowCount, err := client.PartitionedUpdate(ctx, stmt)
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "%d record(s) updated.\n", rowCount)
	return nil
}

Java

executePartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。

static void updateUsingPartitionedDml(DatabaseClient dbClient) {
  String sql = "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1";
  long rowCount = dbClient.executePartitionedUpdate(Statement.of(sql));
  System.out.printf("%d records updated.\n", rowCount);
}

Node.js

runPartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

try {
  const [rowCount] = await database.runPartitionedUpdate({
    sql: 'UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1',
  });
  console.log(`Successfully updated ${rowCount} records.`);
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

executePartitionedUpdate() メソッドを使用して、パーティション化 DML ステートメントを実行します。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Updates sample data in the database by partition with a DML statement.
 *
 * This updates the `MarketingBudget` column which must be created before
 * running this sample. You can add the column by running the `add_column`
 * sample or by running this DDL statement against your database:
 *
 *     ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
 *
 * Example:
 * ```
 * update_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function update_data_with_partitioned_dml(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $rowCount = $database->executePartitionedUpdate(
        'UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1'
    );

    printf('Updated %d row(s).' . PHP_EOL, $rowCount);
}

Python

execute_partitioned_dml() メソッドを使用して、パーティション化 DML ステートメントを実行します。

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"

spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

row_ct = database.execute_partitioned_dml(
    "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"
)

print("{} records updated.".format(row_ct))

Ruby

execute_partitioned_update() メソッドを使用して、パーティション化 DML ステートメントを実行します。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

row_count = client.execute_partition_update(
  "UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1"
)

puts "#{row_count} records updated."

次のコード例では、SingerId 列に基づいて Singers テーブルから行を削除します。

C++

void DmlPartitionedDelete(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto result = client.ExecutePartitionedDml(
      spanner::SqlStatement("DELETE FROM Singers WHERE SingerId > 10"));
  if (!result) throw std::move(result).status();
  std::cout << "Deleted at least " << result->row_count_lower_bound
            << " row(s) [spanner_dml_partitioned_delete]\n";
}

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class DeleteUsingPartitionedDmlCoreAsyncSample
{
    public async Task<long> DeleteUsingPartitionedDmlCoreAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var cmd = connection.CreateDmlCommand("DELETE FROM Singers WHERE SingerId > 10");
        long rowCount = await cmd.ExecutePartitionedUpdateAsync();

        Console.WriteLine($"{rowCount} row(s) deleted...");
        return rowCount;
    }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

func deleteUsingPartitionedDML(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: "DELETE FROM Singers WHERE SingerId > 10"}
	rowCount, err := client.PartitionedUpdate(ctx, stmt)
	if err != nil {
		return err

	}
	fmt.Fprintf(w, "%d record(s) deleted.", rowCount)
	return nil
}

Java

static void deleteUsingPartitionedDml(DatabaseClient dbClient) {
  String sql = "DELETE FROM Singers WHERE SingerId > 10";
  long rowCount = dbClient.executePartitionedUpdate(Statement.of(sql));
  System.out.printf("%d records deleted.\n", rowCount);
}

Node.js

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

try {
  const [rowCount] = await database.runPartitionedUpdate({
    sql: 'DELETE FROM Singers WHERE SingerId > 10',
  });
  console.log(`Successfully deleted ${rowCount} records.`);
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

use Google\Cloud\Spanner\SpannerClient;

/**
 * Delete sample data in the database by partition with a DML statement.
 *
 * This updates the `MarketingBudget` column which must be created before
 * running this sample. You can add the column by running the `add_column`
 * sample or by running this DDL statement against your database:
 *
 *     ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
 *
 * Example:
 * ```
 * update_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function delete_data_with_partitioned_dml(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $rowCount = $database->executePartitionedUpdate(
        'DELETE FROM Singers WHERE SingerId > 10'
    );

    printf('Deleted %d row(s).' . PHP_EOL, $rowCount);
}

Python

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

row_ct = database.execute_partitioned_dml("DELETE FROM Singers WHERE SingerId > 10")

print("{} record(s) deleted.".format(row_ct))

Ruby

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

row_count = client.execute_partition_update(
  "DELETE FROM Singers WHERE SingerId > 10"
)

puts "#{row_count} records deleted."