使用可重复读隔离

本页面介绍了如何在 Spanner 中使用可重复读隔离级别。

可重复读是一种隔离级别,可确保事务内的所有读取操作看到的都是事务开始时数据库的一致快照。在 Spanner 中,此隔离级别是通过一种也常被称为快照隔离的技术实现的。在高读写并发场景中,许多事务读取的数据可能正在被其他事务修改,这种方法在该场景非常有用。通过使用固定的快照,可重复读可避免更严格的串行化隔离级别带来的性能影响。读取操作无需获取锁即可执行,也不会阻塞并发写入,这可能会减少因串行化冲突而需要重试的中止事务数量。如需了解详情,请参阅隔离级别概览

设置隔离级别

您可以使用以下方法在事务级设置读写事务的隔离级别:

客户端库

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	pb "cloud.google.com/go/spanner/apiv1/spannerpb"
)

func writeWithTransactionUsingIsolationLevel(w io.Writer, db string) error {
	ctx := context.Background()

	// The isolation level specified at the client-level will be applied
	// to all RW transactions.
	cfg := spanner.ClientConfig{
		TransactionOptions: spanner.TransactionOptions{
			IsolationLevel: pb.TransactionOptions_SERIALIZABLE,
		},
	}
	client, err := spanner.NewClientWithConfig(ctx, db, cfg)
	if err != nil {
		return fmt.Errorf("failed to create client: %w", err)
	}
	defer client.Close()

	// The isolation level specified at the transaction-level takes
	// precedence over the isolation level configured at the client-level.
	// REPEATABLE_READ is used here to demonstrate overriding the client-level setting.
	txnOpts := spanner.TransactionOptions{
		IsolationLevel: pb.TransactionOptions_REPEATABLE_READ,
	}

	_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		// Read the current album title
		key := spanner.Key{1, 1}
		row, err := txn.ReadRow(ctx, "Albums", key, []string{"AlbumTitle"})
		if err != nil {
			return fmt.Errorf("failed to read album: %v", err)
		}
		var title string
		if err := row.Column(0, &title); err != nil {
			return fmt.Errorf("failed to get album title: %v", err)
		}
		fmt.Fprintf(w, "Current album title: %s\n", title)

		// Update the album title
		stmt := spanner.Statement{
			SQL: `UPDATE Albums
				SET AlbumTitle = @AlbumTitle
				WHERE SingerId = @SingerId AND AlbumId = @AlbumId`,
			Params: map[string]interface{}{
				"SingerId":   1,
				"AlbumId":    1,
				"AlbumTitle": "New Album Title",
			},
		}
		count, err := txn.Update(ctx, stmt)
		if err != nil {
			return fmt.Errorf("failed to update album: %v", err)
		}
		fmt.Fprintf(w, "Updated %d record(s).\n", count)
		return nil
	}, txnOpts)

	if err != nil {
		return fmt.Errorf("transaction failed: %v", err)
	}
	return nil
}

Java

static void isolationLevelSetting(DatabaseId db) {
  // The isolation level specified at the client-level will be applied to all
  // RW transactions.
  DefaultReadWriteTransactionOptions transactionOptions =
      DefaultReadWriteTransactionOptions.newBuilder()
          .setIsolationLevel(IsolationLevel.SERIALIZABLE)
          .build();
  SpannerOptions options =
      SpannerOptions.newBuilder()
          .setDefaultTransactionOptions(transactionOptions)
          .build();
  Spanner spanner = options.getService();
  DatabaseClient dbClient = spanner.getDatabaseClient(db);
  dbClient
      // The isolation level specified at the transaction-level takes precedence
      // over the isolation level configured at the client-level.
      .readWriteTransaction(Options.isolationLevel(IsolationLevel.REPEATABLE_READ))
      .run(transaction -> {
        // Read an AlbumTitle.
        String selectSql =
            "SELECT AlbumTitle from Albums WHERE SingerId = 1 and AlbumId = 1";
        ResultSet resultSet = transaction.executeQuery(Statement.of(selectSql));
        String title = null;
        while (resultSet.next()) {
          title = resultSet.getString("AlbumTitle");
        }
        System.out.printf("Current album title: %s\n", title);

        // Update the title.
        String updateSql =
            "UPDATE Albums "
                + "SET AlbumTitle = 'New Album Title' "
                + "WHERE SingerId = 1 and AlbumId = 1";
        long rowCount = transaction.executeUpdate(Statement.of(updateSql));
        System.out.printf("%d record updated.\n", rowCount);
        return null;
      });
}

Node.js

// Imports the Google Cloud Spanner client library
const {Spanner, protos} = require('@google-cloud/spanner');
// The isolation level specified at the client-level will be applied
// to all RW transactions.
const isolationOptionsForClient = {
  defaultTransactionOptions: {
    isolationLevel:
      protos.google.spanner.v1.TransactionOptions.IsolationLevel.SERIALIZABLE,
  },
};

// Instantiates a client with defaultTransactionOptions
const spanner = new Spanner({
  projectId: projectId,
  defaultTransactionOptions: isolationOptionsForClient,
});

function runTransactionWithIsolationLevel() {
  // Gets a reference to a Cloud Spanner instance and database
  const instance = spanner.instance(instanceId);
  const database = instance.database(databaseId);
  // The isolation level specified at the request level takes precedence over the isolation level configured at the client level.
  const isolationOptionsForTransaction = {
    isolationLevel:
      protos.google.spanner.v1.TransactionOptions.IsolationLevel
        .REPEATABLE_READ,
  };

  database.runTransaction(
    isolationOptionsForTransaction,
    async (err, transaction) => {
      if (err) {
        console.error(err);
        return;
      }
      try {
        const query =
          'SELECT AlbumTitle FROM Albums WHERE SingerId = 1 AND AlbumId = 1';
        const results = await transaction.run(query);
        // Gets first album's title
        const rows = results[0].map(row => row.toJSON());
        const albumTitle = rows[0].AlbumTitle;
        console.log(`previous album title ${albumTitle}`);

        const update =
          "UPDATE Albums SET AlbumTitle = 'New Album Title' WHERE SingerId = 1 AND AlbumId = 1";
        const [rowCount] = await transaction.runUpdate(update);
        console.log(
          `Successfully updated ${rowCount} record in Albums table.`,
        );
        await transaction.commit();
        console.log(
          'Successfully executed read-write transaction with isolationLevel option.',
        );
      } catch (err) {
        console.error('ERROR:', err);
      } finally {
        transaction.end();
        // Close the database when finished.
        await database.close();
      }
    },
  );
}
runTransactionWithIsolationLevel();

Python

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"

# The isolation level specified at the client-level will be applied to all RW transactions.
isolation_options_for_client = TransactionOptions.IsolationLevel.SERIALIZABLE

spanner_client = spanner.Client(
    default_transaction_options=DefaultTransactionOptions(
        isolation_level=isolation_options_for_client
    )
)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

# The isolation level specified at the request level takes precedence over the isolation level configured at the client level.
isolation_options_for_transaction = (
    TransactionOptions.IsolationLevel.REPEATABLE_READ
)

def update_albums_with_isolation(transaction):
    # Read an AlbumTitle.
    results = transaction.execute_sql(
        "SELECT AlbumTitle from Albums WHERE SingerId = 1 and AlbumId = 1"
    )
    for result in results:
        print("Current Album Title: {}".format(*result))

    # Update the AlbumTitle.
    row_ct = transaction.execute_update(
        "UPDATE Albums SET AlbumTitle = 'A New Title' WHERE SingerId = 1 and AlbumId = 1"
    )

    print("{} record(s) updated.".format(row_ct))

database.run_in_transaction(
    update_albums_with_isolation, isolation_level=isolation_options_for_transaction
)

REST

您可以使用 TransactionOptions.isolation_level REST API 在事务级设置读写事务的隔离级别。有效选项包括 TransactionOptions.SERIALIZABLETransactionOptions.REPEATABLE_READ。默认情况下,Spanner 会将隔离级别设置为串行化隔离。

RPC

不受支持的应用场景

  • 您只能在读写事务中使用可重复读隔离级别。如果您在只读事务中设置此隔离级别,Spanner 会返回错误。所有只读事务都以固定快照运行,不需要获取锁。
  • 您无法使用可重复读隔离来修改在列上定义了全文搜索索引的表。

后续步骤