以處理量為優先的寫入作業

本頁說明如何設定最大提交 (寫入) 延遲時間,以最佳化 Spanner 中的寫入輸送量。

總覽

為確保資料一致性,Spanner 會將寫入要求傳送至資料庫中的所有投票備用資源。這項複製程序可能會造成運算負擔。詳情請參閱「複製」。

透過最佳化寫入作業,您可以選擇將這些運算成本攤銷,方法是將一組寫入作業一起執行。為此,Spanner 會稍微延遲,並收集需要傳送至相同投票參與者的寫入群組。以這種方式執行寫入作業可大幅提升總處理量,但延遲時間會稍微增加。

預設行為

如果您未設定提交延遲時間,Spanner 可能會為您設定少量延遲,以攤銷寫入作業的成本。

常見用途

您可以根據應用程式需求,手動設定寫入要求的延遲時間。您也可以將最大提交延遲時間設為 0 毫秒,針對延遲時間高度敏感的應用程式停用提交延遲。

如果您有可容許延遲的應用程式,且想盡量提高輸送量,設定較長的提交延遲時間可大幅提升輸送量,但每次寫入作業的延遲時間會較長。舉例來說,如果您要大量載入資料,且應用程式不在意 Spanner 寫入個別資料的速度,則可將提交延遲時間設為較長的值,例如 100 毫秒。建議您先從 100 毫秒開始,然後向上和向下調整,直到延遲和輸送量之間的取捨符合需求為止。對大多數應用程式而言,20 毫秒到 100 毫秒之間的值最合適。

如果您有對延遲時間要求嚴格的應用程式,Spanner 預設也會對延遲時間要求嚴格。如果工作負載有尖峰,Spanner 可能會設定少量延遲。您可以嘗試將值設為 0 毫秒,判斷以增加輸送量為代價來減少延遲,是否適合您的應用程式。

設定混合提交延遲時間

您可以針對寫入作業的子集設定不同的最大延遲時間。如果這麼做,Spanner 會使用為這組寫入作業設定的最短延遲時間。不過,我們建議在大多數用途中選擇單一值,因為這樣可預測的行為會更多。

限制

您可以將提交延遲時間設為 0 到 500 毫秒。如果提交延遲時間超過 500 毫秒,系統會顯示錯誤。

在提交要求中設定提交延遲時間上限

最大提交延遲參數是 CommitRequest 方法的一部分。您可以使用 RPC APIREST API 或 Cloud Spanner 用戶端程式庫存取這個方法。

C#


using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;

public class CommitDelayAsyncSample
{
    public async Task<int> CommitDelayAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        return await connection.RunWithRetriableTransactionAsync(async transaction =>
        {
            transaction.TransactionOptions.MaxCommitDelay = TimeSpan.FromMilliseconds(100);

            using var insertSingerCmd = connection.CreateInsertCommand("Singers",
                new SpannerParameterCollection
                {
                    { "SingerId", SpannerDbType.Int64, 1 },
                    { "FirstName", SpannerDbType.String, "Marc" },
                    { "LastName", SpannerDbType.String, "Richards" }
                });
            insertSingerCmd.Transaction = transaction;
            int rowsInserted = await insertSingerCmd.ExecuteNonQueryAsync();

            using var insertAlbumCmd = connection.CreateInsertCommand("Albums",
                new SpannerParameterCollection
                {
                    { "SingerId", SpannerDbType.Int64, 1 },
                    { "AlbumId", SpannerDbType.Int64, 2 },
                    { "AlbumTitle", SpannerDbType.String, "Go, Go, Go" }
                });
            insertAlbumCmd.Transaction = transaction;
            rowsInserted += await insertAlbumCmd.ExecuteNonQueryAsync();

            return rowsInserted;
        });
    }
}

Go


import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/spanner"
)

func setMaxCommitDelay(w io.Writer, db string) error {
	// db is the fully-qualified database name of the form `projects/<project>/instances/<instance-id>/database/<database-id>`
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return fmt.Errorf("setMaxCommitDelay.NewClient: %w", err)
	}
	defer client.Close()

	commitDelay := 100 * time.Millisecond
	resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		stmt := spanner.Statement{
			SQL: `INSERT Singers (SingerId, FirstName, LastName)
					VALUES (111, 'Virginia', 'Watson')`,
		}
		rowCount, err := txn.Update(ctx, stmt)
		if err != nil {
			return err
		}
		fmt.Fprintf(w, "%d record(s) inserted.\n", rowCount)
		return nil
	}, spanner.TransactionOptions{CommitOptions: spanner.CommitOptions{MaxCommitDelay: &commitDelay, ReturnCommitStats: true}})
	if err != nil {
		return fmt.Errorf("setMaxCommitDelay.ReadWriteTransactionWithOptions: %w", err)
	}
	fmt.Fprintf(w, "%d mutations in transaction\n", resp.CommitStats.MutationCount)
	return nil
}

Java


import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import java.time.Duration;
import java.util.Arrays;

public class SetMaxCommitDelaySample {

  static void setMaxCommitDelay() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";

    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      final DatabaseClient databaseClient = spanner
          .getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
      setMaxCommitDelay(databaseClient);
    }
  }

  static void setMaxCommitDelay(DatabaseClient databaseClient) {
    final CommitResponse commitResponse = databaseClient.writeWithOptions(Arrays.asList(
        Mutation.newInsertOrUpdateBuilder("Albums")
            .set("SingerId")
            .to("1")
            .set("AlbumId")
            .to("1")
            .set("MarketingBudget")
            .to("200000")
            .build(),
        Mutation.newInsertOrUpdateBuilder("Albums")
            .set("SingerId")
            .to("2")
            .set("AlbumId")
            .to("2")
            .set("MarketingBudget")
            .to("400000")
            .build()
    ), Options.maxCommitDelay(Duration.ofMillis(100)));

    System.out.println(
        "Updated data with timestamp + " + commitResponse.getCommitTimestamp() + ".");
  }
}

Node.js

const {Spanner, protos} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client.
const spanner = new Spanner({
  projectId: projectId,
});

async function setMaxCommitDelay() {
  const instance = spanner.instance(instanceId);
  const database = instance.database(databaseId);

  database.runTransaction(async (err, transaction) => {
    if (err) {
      console.error(err);
      return;
    }
    try {
      const [rowCount] = await transaction.runUpdate({
        sql: 'INSERT Singers (SingerId, FirstName, LastName) VALUES (111, @firstName, @lastName)',
        params: {
          firstName: 'Virginia',
          lastName: 'Watson',
        },
      });

      console.log(
        `Successfully inserted ${rowCount} record into the Singers table.`,
      );

      await transaction.commit({
        maxCommitDelay: protos.google.protobuf.Duration({
          seconds: 0, // 0 seconds
          nanos: 100000000, // 100 milliseconds
        }),
      });
    } catch (err) {
      console.error('ERROR:', err);
    } finally {
      // Close the database when finished.
      database.close();
    }
  });
}
setMaxCommitDelay();

Python

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

def insert_singers(transaction):
    row_ct = transaction.execute_update(
        "INSERT Singers (SingerId, FirstName, LastName) "
        " VALUES (111, 'Grace', 'Bennis')"
    )

    print("{} record(s) inserted.".format(row_ct))

database.run_in_transaction(
    insert_singers, max_commit_delay=datetime.timedelta(milliseconds=100)
)

Ruby

require "google/cloud/spanner"

##
# This is a snippet for showcasing how to pass max_commit_delay in  commit_options.
#
# @param project_id  [String] The ID of the Google Cloud project.
# @param instance_id [String] The ID of the spanner instance.
# @param database_id [String] The ID of the database.
#
def spanner_set_max_commit_delay project_id:, instance_id:, database_id:
  # Instantiates a client
  spanner = Google::Cloud::Spanner.new project: project_id
  client  = spanner.client instance_id, database_id

  records = [
    { SingerId: 1, AlbumId: 1, MarketingBudget: 200_000 },
    { SingerId: 2, AlbumId: 2, MarketingBudget: 400_000 }
  ]
  # max_commit_delay is the amount of latency in millisecond, this request
  # is willing to incur in order to improve throughput.
  # The commit delay must be at least 0ms and at most 500ms.
  # Default value is nil.
  commit_options = {
    return_commit_stats: true,
    max_commit_delay: 100
  }
  resp = client.upsert "Albums", records, commit_options: commit_options
  puts "Updated data with #{resp.stats.mutation_count} mutations."
end

監控寫入要求延遲時間

您可以使用Google Cloud 控制台監控 Spanner CPU 使用率和延遲時間。如果寫入要求延遲時間較長,CPU 使用率可能會降低,但延遲時間會增加。如要瞭解 Spanner 要求的延遲時間,請參閱「擷取並以視覺化方式呈現 Spanner API 要求延遲時間」。