吞吐量优化写入

本页面介绍了如何配置最长提交(写入)延迟时间, 优化 Spanner 中的写入吞吐量。

概览

为了确保数据一致性,Spanner 会向数据库中的所有投票副本发送写入请求。此复制过程可能会产生计算开销。如需了解详情,请参阅复制

吞吐量优化的写入提供分摊这些计算的选项 同时执行一组写入操作所产生的费用。为此,Spanner 会引入一小段延迟,并收集一组需要发送给同一投票参与者的写入。在此模式下执行写入操作 这种方法可以大幅提高吞吐量,但代价是 延迟时间增加

默认行为

如果您未设置提交延迟时间,Spanner 可能会根据自己的判断为您设置一个小延迟时间,以便摊销写入费用。

常见使用场景

您可以手动设置写入请求的延迟时间,具体取决于 应用需求。您还可以通过将提交延迟时间上限设置为 0 毫秒,为对延迟时间极其敏感的应用停用提交延迟。

如果您的应用具有容忍延迟,并且想要优化吞吐量, 设置更长的提交延迟时间可显著提高吞吐量,而 导致每次写入的延迟时间较长例如,如果您要批量加载大量数据,并且应用不关心 Spanner 写入任何单个数据的速度,则可以将提交延迟时间设置为较长的值(例如 100 毫秒)。我们建议您先将值设为 100 毫秒,然后上下调整,直到延迟时间和吞吐量权衡符合您的需求。对于大多数应用,最好将此值设置为介于 20 毫秒到 100 毫秒之间。

如果您的应用对延迟时间较为敏感,Spanner 的延迟时间也会默认较长。如果您有激增的工作负载 Spanner 可能会设置短暂的延迟。您可以尝试将值设置为 0 毫秒,以确定以增加吞吐量为代价来缩短延迟时间是否适合您的应用。

设置混合提交延迟时间

您可以为写入的子集配置不同的提交延迟时间上限。 如果您这样做,Spanner 会使用为这组写入配置的最短延迟时间。不过,我们建议您为大多数使用场景选择单个值 因为这会导致更可预测的行为。

限制

您可以设置一个介于 0 到 500 毫秒之间的提交延迟时间。设置提交延迟 超过 500 毫秒会导致错误。

设置提交请求的提交延迟时间上限

最大提交延迟时间参数是 CommitRequest 方法的一部分。您可以访问 RPC API 使用此方法, REST API、 或使用 Cloud Spanner 客户端库。

Go


import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/spanner"
)

// maxCommitDelay sets the maximum commit delay for a transaction.
func maxCommitDelay(w io.Writer, db string) error {
	// db = `projects/<project>/instances/<instance-id>/database/<database-id>`
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return fmt.Errorf("maxCommitDelay.NewClient: %w", err)
	}
	defer client.Close()

	// Set the maximum commit delay to 100ms.
	// This is the amount of latency this request is willing to incur in order
	// to improve throughput. If this field is not set, Spanner assumes requests
	// are relatively latency sensitive and automatically determines an
	// appropriate delay time. You can specify a batching delay value between 0 and 500 ms.
	// The transaction will also return the commit statistics.
	commitDelay := 100 * time.Millisecond
	resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		stmt := spanner.Statement{
			SQL: `INSERT Singers (SingerId, FirstName, LastName)
					VALUES (111, 'Virginia', 'Watson')`,
		}
		rowCount, err := txn.Update(ctx, stmt)
		if err != nil {
			return err
		}
		fmt.Fprintf(w, "%d record(s) inserted.\n", rowCount)
		return nil
	}, spanner.TransactionOptions{CommitOptions: spanner.CommitOptions{MaxCommitDelay: &commitDelay, ReturnCommitStats: true}})
	if err != nil {
		return fmt.Errorf("maxCommitDelay.ReadWriteTransactionWithOptions: %w", err)
	}
	fmt.Fprintf(w, "%d mutations in transaction\n", resp.CommitStats.MutationCount)
	return nil
}

Node.js

// Imports the Google Cloud client library.
const {Spanner, protos} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client.
const spanner = new Spanner({
  projectId: projectId,
});

async function spannerSetMaxCommitDelay() {
  // Gets a reference to a Cloud Spanner instance and database.
  const instance = spanner.instance(instanceId);
  const database = instance.database(databaseId);

  database.runTransaction(async (err, transaction) => {
    if (err) {
      console.error(err);
      return;
    }
    try {
      const [rowCount] = await transaction.runUpdate({
        sql: 'INSERT Singers (SingerId, FirstName, LastName) VALUES (111, @firstName, @lastName)',
        params: {
          firstName: 'Virginia',
          lastName: 'Watson',
        },
      });

      console.log(
        `Successfully inserted ${rowCount} record into the Singers table.`
      );

      await transaction.commit({
        // The maximum amount of time to delay the transaction to improve
        // throughput.
        maxCommitDelay: protos.google.protobuf.Duration({
          seconds: 0, // 0 seconds
          nanos: 100000000, // 100,000,000 nanoseconds = 100 milliseconds
        }),
      });
    } catch (err) {
      console.error('ERROR:', err);
    } finally {
      // Close the database when finished.
      database.close();
    }
  });
}
spannerSetMaxCommitDelay();

Python

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

def insert_singers(transaction):
    row_ct = transaction.execute_update(
        "INSERT Singers (SingerId, FirstName, LastName) "
        " VALUES (111, 'Grace', 'Bennis')"
    )

    print("{} record(s) inserted.".format(row_ct))

database.run_in_transaction(
    insert_singers, max_commit_delay=datetime.timedelta(milliseconds=100)
)

Ruby

require "google/cloud/spanner"

##
# This is a snippet for showcasing how to pass max_commit_delay in  commit_options.
#
# @param project_id  [String] The ID of the Google Cloud project.
# @param instance_id [String] The ID of the spanner instance.
# @param database_id [String] The ID of the database.
#
def spanner_set_max_commit_delay project_id:, instance_id:, database_id:
  # Instantiates a client
  spanner = Google::Cloud::Spanner.new project: project_id
  client  = spanner.client instance_id, database_id

  records = [
    { SingerId: 1, AlbumId: 1, MarketingBudget: 200_000 },
    { SingerId: 2, AlbumId: 2, MarketingBudget: 400_000 }
  ]
  # max_commit_delay is the amount of latency in millisecond, this request
  # is willing to incur in order to improve throughput.
  # The commit delay must be at least 0ms and at most 500ms.
  # Default value is nil.
  commit_options = {
    return_commit_stats: true,
    max_commit_delay: 100
  }
  resp = client.upsert "Albums", records, commit_options: commit_options
  puts "Updated data with #{resp.stats.mutation_count} mutations."
end

监控写入请求延迟时间

您可以使用 Google Cloud 控制台监控 Spanner 的 CPU 利用率和延迟时间。为写入请求设置较长的延迟时间后, CPU 利用率有可能 而延迟时间则会增加如需了解 Spanner 请求中的延迟时间,请参阅捕获和直观呈现 Spanner API 请求延迟时间