本主题介绍了如何为使用 Spanner 执行的每个插入和更新操作编写提交时间戳。
提交时间戳概览
基于 TrueTime 技术的提交时间戳是在数据库中提交事务的时间。您可以通过原子方式将事务的提交时间戳存储到列中。借助存储在表中的提交时间戳,您可以确定变更的确切顺序并构建更改日志等功能。
要在数据库中插入提交时间戳,请完成以下步骤:
创建一个
SPANNER.COMMIT_TIMESTAMP
类型的列。例如:CREATE TABLE Performances ( ... LastUpdateTime SPANNER.COMMIT_TIMESTAMP NOT NULL, ... PRIMARY KEY (...) ) ;
如果使用 DML 执行插入或更新,请使用
SPANNER.PENDING_COMMIT_TIMESTAMP()
函数写入提交时间戳。如果要使用准备好的语句或变更执行插入或更新操作,请为提交时间戳列使用占位符字符串
SPANNER.COMMIT_TIMESTAMP()
。您还可以使用客户端库提供的提交时间戳常量。例如,Java 客户端中的此常量为Value.COMMIT_TIMESTAMP
。
当 Spanner 将这些占位符用作列值来提交事务时,实际提交时间戳会写入指定列。然后,您可以使用此列值创建表的更新历史记录。
提交时间戳的值不保证是唯一的。写入不重叠字段集的事务可能具有相同的时间戳。写入重叠字段集的事务具有唯一的时间戳。
Spanner 提交时间戳以微秒为单位,如果存储在 SPANNER.COMMIT_TIMESTAMP
列中,则会转换为纳秒。
键和索引
您可以使用提交时间戳列作为主键列或非键列。主键可以定义为 ASC
或 DESC
。
ASC
(默认)- 升序键适用于解答从特定时间往前的查询。DESC
- 降序键将最新的行保留在表的顶部,可提供对最近记录的快速访问。
避开热点
在以下情况下使用提交时间戳会产生热点,这会降低数据性能:
提交时间戳列作为表的主键的第一部分。
CREATE TABLE Users ( LastAccess SPANNER.COMMIT_TIMESTAMP NOT NULL, UserId INT64 NOT NULL, ... PRIMARY KEY (LastAccess, UserId) ) ;
提交时间戳主键列作为二级索引的第一部分。
CREATE INDEX UsersByLastAccess ON Users(LastAccess)
或
CREATE INDEX UsersByLastAccessAndName ON Users(LastAccess, FirstName)
热点会降低数据性能,即使写入速率较低也是如此。如果在没有索引的非键列上启用提交时间戳,则不会产生任何性能开销。
将提交时间戳列添加到现有表
要将提交时间戳列添加到现有表中,请使用 ALTER TABLE
语句。例如,要将 LastUpdateTime
列添加到 Performances
表中,请使用以下语句:
ALTER TABLE Performances ADD COLUMN LastUpdateTime SPANNER.COMMIT_TIMESTAMP
NOT NULL;
使用 DML 语句写入提交时间戳
使用 SPANNER.PENDING_COMMIT_TIMESTAMP()
函数在 DML 语句中写入提交时间戳。Spanner 会在事务提交时选择提交时间戳。
以下 DML 语句使用提交时间戳更新 Performances
表中的 LastUpdateTime
列:
UPDATE Performances SET LastUpdateTime = SPANNER.PENDING_COMMIT_TIMESTAMP()
WHERE SingerId=1 AND VenueId=2 AND EventDate="2015-10-21"
使用变更插入行
插入行时,只有当您将列包含在列列表中并传递 spanner.commit_timestamp()
占位符字符串(或客户端库常量)作为其值时,Spanner 才会写入提交时间戳值。例如:
C++
void InsertDataWithTimestamp(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
auto commit_result = client.Commit(spanner::Mutations{
spanner::InsertOrUpdateMutationBuilder(
"Performances",
{"SingerId", "VenueId", "EventDate", "Revenue", "LastUpdateTime"})
.EmplaceRow(1, 4, absl::CivilDay(2017, 10, 5), 11000,
spanner::CommitTimestamp{})
.EmplaceRow(1, 19, absl::CivilDay(2017, 11, 2), 15000,
spanner::CommitTimestamp{})
.EmplaceRow(2, 42, absl::CivilDay(2017, 12, 23), 7000,
spanner::CommitTimestamp{})
.Build()});
if (!commit_result) throw std::move(commit_result).status();
std::cout
<< "Update was successful [spanner_insert_data_with_timestamp_column]\n";
}
C#
using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
public class WriteDataWithTimestampAsyncSample
{
public class Performance
{
public int SingerId { get; set; }
public int VenueId { get; set; }
public DateTime EventDate { get; set; }
public long Revenue { get; set; }
}
public async Task<int> WriteDataWithTimestampAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
List<Performance> performances = new List<Performance>
{
new Performance { SingerId = 1, VenueId = 4, EventDate = DateTime.Parse("2017-10-05"), Revenue = 11000 },
new Performance { SingerId = 1, VenueId = 19, EventDate = DateTime.Parse("2017-11-02"), Revenue = 15000 },
new Performance { SingerId = 2, VenueId = 42, EventDate = DateTime.Parse("2017-12-23"), Revenue = 7000 },
};
// Create connection to Cloud Spanner.
using var connection = new SpannerConnection(connectionString);
await connection.OpenAsync();
// Insert rows into the Performances table.
var rowCountAarray = await Task.WhenAll(performances.Select(performance =>
{
var cmd = connection.CreateInsertCommand("Performances", new SpannerParameterCollection
{
{ "SingerId", SpannerDbType.Int64, performance.SingerId },
{ "VenueId", SpannerDbType.Int64, performance.VenueId },
{ "EventDate", SpannerDbType.Date, performance.EventDate },
{ "Revenue", SpannerDbType.Int64, performance.Revenue },
{ "LastUpdateTime", SpannerDbType.Timestamp, SpannerParameter.CommitTimestamp },
});
return cmd.ExecuteNonQueryAsync();
}));
return rowCountAarray.Sum();
}
}
Go
import (
"context"
"cloud.google.com/go/spanner"
)
func writeWithTimestamp(db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
performanceColumns := []string{"SingerId", "VenueId", "EventDate", "Revenue", "LastUpdateTime"}
m := []*spanner.Mutation{
spanner.InsertOrUpdate("Performances", performanceColumns, []interface{}{1, 4, "2017-10-05", 11000, spanner.CommitTimestamp}),
spanner.InsertOrUpdate("Performances", performanceColumns, []interface{}{1, 19, "2017-11-02", 15000, spanner.CommitTimestamp}),
spanner.InsertOrUpdate("Performances", performanceColumns, []interface{}{2, 42, "2017-12-23", 7000, spanner.CommitTimestamp}),
}
_, err = client.Apply(ctx, m)
return err
}
Java
static final List<Performance> PERFORMANCES =
Arrays.asList(
new Performance(1, 4, "2017-10-05", 11000),
new Performance(1, 19, "2017-11-02", 15000),
new Performance(2, 42, "2017-12-23", 7000));
static void writeExampleDataWithTimestamp(DatabaseClient dbClient) {
List<Mutation> mutations = new ArrayList<>();
for (Performance performance : PERFORMANCES) {
mutations.add(
Mutation.newInsertBuilder("Performances")
.set("SingerId")
.to(performance.singerId)
.set("VenueId")
.to(performance.venueId)
.set("EventDate")
.to(performance.eventDate)
.set("Revenue")
.to(performance.revenue)
.set("LastUpdateTime")
.to(Value.COMMIT_TIMESTAMP)
.build());
}
dbClient.write(mutations);
}
Node.js
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
// Instantiate Spanner table objects
const performancesTable = database.table('Performances');
const data = [
{
SingerId: '1',
VenueId: '4',
EventDate: '2017-10-05',
Revenue: '11000',
LastUpdateTime: 'spanner.commit_timestamp()',
},
{
SingerId: '1',
VenueId: '19',
EventDate: '2017-11-02',
Revenue: '15000',
LastUpdateTime: 'spanner.commit_timestamp()',
},
{
SingerId: '2',
VenueId: '42',
EventDate: '2017-12-23',
Revenue: '7000',
LastUpdateTime: 'spanner.commit_timestamp()',
},
];
// Inserts rows into the Singers table
// Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so
// they must be converted to strings before being inserted as INT64s
try {
await performancesTable.insert(data);
console.log('Inserted data.');
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished
database.close();
}
PHP
use Google\Cloud\Spanner\SpannerClient;
/**
* Inserts sample data into a table with a commit timestamp column.
*
* The database and table must already exist and can be created using
* `create_table_with_timestamp_column`.
* Example:
* ```
* insert_data_with_timestamp_column($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function insert_data_with_timestamp_column(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$operation = $database->transaction(['singleUse' => true])
->insertBatch('Performances', [
['SingerId' => 1, 'VenueId' => 4, 'EventDate' => '2017-10-05', 'Revenue' => 11000, 'LastUpdateTime' => $spanner->commitTimestamp()],
['SingerId' => 1, 'VenueId' => 19, 'EventDate' => '2017-11-02', 'Revenue' => 15000, 'LastUpdateTime' => $spanner->commitTimestamp()],
['SingerId' => 2, 'VenueId' => 42, 'EventDate' => '2017-12-23', 'Revenue' => 7000, 'LastUpdateTime' => $spanner->commitTimestamp()],
])
->commit();
print('Inserted data.' . PHP_EOL);
}
Python
def insert_data_with_timestamp(instance_id, database_id):
"""Inserts data with a COMMIT_TIMESTAMP field into a table."""
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.batch() as batch:
batch.insert(
table="Performances",
columns=("SingerId", "VenueId", "EventDate", "Revenue", "LastUpdateTime"),
values=[
(1, 4, "2017-10-05", 11000, spanner.COMMIT_TIMESTAMP),
(1, 19, "2017-11-02", 15000, spanner.COMMIT_TIMESTAMP),
(2, 42, "2017-12-23", 7000, spanner.COMMIT_TIMESTAMP),
],
)
print("Inserted data.")
Ruby
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
# Get commit_timestamp
commit_timestamp = client.commit_timestamp
client.commit do |c|
c.insert "Performances", [
{ SingerId: 1, VenueId: 4, EventDate: "2017-10-05", Revenue: 11_000, LastUpdateTime: commit_timestamp },
{ SingerId: 1, VenueId: 19, EventDate: "2017-11-02", Revenue: 15_000, LastUpdateTime: commit_timestamp },
{ SingerId: 2, VenueId: 42, EventDate: "2017-12-23", Revenue: 7000, LastUpdateTime: commit_timestamp }
]
end
puts "Inserted data"
如果在多个表的行中存在变更,则必须为每个表的提交时间戳列指定 spanner.commit_timestamp()
(或客户端库常量)。
使用变更更新行
更新行时,只有当您将列包含在列列表中并传递 spanner.commit_timestamp()
占位符字符串(或客户端库常量)作为其值时,Spanner 才会写入提交时间戳值。您无法更新某一行的主键。要更新主键,请删除现有行并创建一个新行。
例如,要更新名为 LastUpdateTime
的提交时间戳列,请执行以下操作:
C++
void UpdateDataWithTimestamp(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
auto commit_result = client.Commit(spanner::Mutations{
spanner::UpdateMutationBuilder(
"Albums",
{"SingerId", "AlbumId", "MarketingBudget", "LastUpdateTime"})
.EmplaceRow(1, 1, 1000000, spanner::CommitTimestamp{})
.EmplaceRow(2, 2, 750000, spanner::CommitTimestamp{})
.Build()});
if (!commit_result) throw std::move(commit_result).status();
std::cout
<< "Update was successful [spanner_update_data_with_timestamp_column]\n";
}
C#
using Google.Cloud.Spanner.Data;
using System;
using System.Threading.Tasks;
public class UpdateDataWithTimestampColumnAsyncSample
{
public async Task<int> UpdateDataWithTimestampColumnAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
using var connection = new SpannerConnection(connectionString);
var rowCount = 0;
using var updateCmd1 = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
{
{ "SingerId", SpannerDbType.Int64, 1 },
{ "AlbumId", SpannerDbType.Int64, 1 },
{ "MarketingBudget", SpannerDbType.Int64, 1000000 },
{ "LastUpdateTime", SpannerDbType.Timestamp, SpannerParameter.CommitTimestamp },
});
rowCount += await updateCmd1.ExecuteNonQueryAsync();
using var updateCmd2 = connection.CreateUpdateCommand("Albums", new SpannerParameterCollection
{
{ "SingerId", SpannerDbType.Int64, 2 },
{ "AlbumId", SpannerDbType.Int64, 2 },
{ "MarketingBudget", SpannerDbType.Int64, 750000 },
{ "LastUpdateTime", SpannerDbType.Timestamp, SpannerParameter.CommitTimestamp },
});
rowCount += await updateCmd2.ExecuteNonQueryAsync();
Console.WriteLine("Updated data.");
return rowCount;
}
}
Go
import (
"context"
"io"
"cloud.google.com/go/spanner"
)
func updateWithTimestamp(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
cols := []string{"SingerId", "AlbumId", "MarketingBudget", "LastUpdateTime"}
_, err = client.Apply(ctx, []*spanner.Mutation{
spanner.Update("Albums", cols, []interface{}{1, 1, 1000000, spanner.CommitTimestamp}),
spanner.Update("Albums", cols, []interface{}{2, 2, 750000, spanner.CommitTimestamp}),
})
return err
}
Java
static void updateWithTimestamp(DatabaseClient dbClient) {
// Mutation can be used to update/insert/delete a single row in a table. Here we use
// newUpdateBuilder to create update mutations.
List<Mutation> mutations =
Arrays.asList(
Mutation.newUpdateBuilder("Albums")
.set("SingerId")
.to(1)
.set("AlbumId")
.to(1)
.set("MarketingBudget")
.to(1000000)
.set("LastUpdateTime")
.to(Value.COMMIT_TIMESTAMP)
.build(),
Mutation.newUpdateBuilder("Albums")
.set("SingerId")
.to(2)
.set("AlbumId")
.to(2)
.set("MarketingBudget")
.to(750000)
.set("LastUpdateTime")
.to(Value.COMMIT_TIMESTAMP)
.build());
// This writes all the mutations to Cloud Spanner atomically.
dbClient.write(mutations);
}
Node.js
// ...
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
// Update a row in the Albums table
// Note: Cloud Spanner interprets Node.js numbers as FLOAT64s, so they
// must be converted to strings before being inserted as INT64s
const albumsTable = database.table('Albums');
const data = [
{
SingerId: '1',
AlbumId: '1',
MarketingBudget: '1000000',
LastUpdateTime: 'spanner.commit_timestamp()',
},
{
SingerId: '2',
AlbumId: '2',
MarketingBudget: '750000',
LastUpdateTime: 'spanner.commit_timestamp()',
},
];
try {
await albumsTable.update(data);
console.log('Updated data.');
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished
database.close();
}
PHP
use Google\Cloud\Spanner\SpannerClient;
/**
* Updates sample data in a table with a commit timestamp column.
*
* Before executing this method, a new column MarketingBudget has to be added to the Albums
* table by applying the DDL statement "ALTER TABLE Albums ADD COLUMN MarketingBudget INT64".
*
* In addition this update expects the LastUpdateTime column added by applying the DDL statement
* "ALTER TABLE Albums ADD COLUMN LastUpdateTime TIMESTAMP OPTIONS (allow_commit_timestamp=true)"
*
* Example:
* ```
* update_data_with_timestamp_column($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function update_data_with_timestamp_column(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$operation = $database->transaction(['singleUse' => true])
->updateBatch('Albums', [
['SingerId' => 1, 'AlbumId' => 1, 'MarketingBudget' => 1000000, 'LastUpdateTime' => $spanner->commitTimestamp()],
['SingerId' => 2, 'AlbumId' => 2, 'MarketingBudget' => 750000, 'LastUpdateTime' => $spanner->commitTimestamp()],
])
->commit();
print('Updated data.' . PHP_EOL);
}
Python
def update_data_with_timestamp(instance_id, database_id):
"""Updates Performances tables in the database with the COMMIT_TIMESTAMP
column.
This updates the `MarketingBudget` column which must be created before
running this sample. You can add the column by running the `add_column`
sample or by running this DDL statement against your database:
ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
In addition this update expects the LastUpdateTime column added by
applying this DDL statement against your database:
ALTER TABLE Albums ADD COLUMN LastUpdateTime TIMESTAMP
OPTIONS(allow_commit_timestamp=true)
"""
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.batch() as batch:
batch.update(
table="Albums",
columns=("SingerId", "AlbumId", "MarketingBudget", "LastUpdateTime"),
values=[
(1, 1, 1000000, spanner.COMMIT_TIMESTAMP),
(2, 2, 750000, spanner.COMMIT_TIMESTAMP),
],
)
print("Updated data.")
Ruby
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
commit_timestamp = client.commit_timestamp
client.commit do |c|
c.update "Albums", [
{ SingerId: 1, AlbumId: 1, MarketingBudget: 100_000, LastUpdateTime: commit_timestamp },
{ SingerId: 2, AlbumId: 2, MarketingBudget: 750_000, LastUpdateTime: commit_timestamp }
]
end
puts "Updated data"
如果在多个表的行中存在变更,则必须为每个表的提交时间戳列指定 spanner.commit_timestamp()
(或客户端库常量)。
查询提交时间戳列
以下示例展示了如何查询表的提交时间戳列。
C++
void QueryDataWithTimestamp(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
spanner::SqlStatement select(
"SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime"
" FROM Albums"
" ORDER BY LastUpdateTime DESC");
using RowType =
std::tuple<std::int64_t, std::int64_t, absl::optional<std::int64_t>,
absl::optional<spanner::Timestamp>>;
auto rows = client.ExecuteQuery(std::move(select));
for (auto& row : spanner::StreamOf<RowType>(rows)) {
if (!row) throw std::move(row).status();
std::cout << std::get<0>(*row) << " " << std::get<1>(*row);
auto marketing_budget = std::get<2>(*row);
if (!marketing_budget) {
std::cout << " NULL";
} else {
std::cout << ' ' << *marketing_budget;
}
auto last_update_time = std::get<3>(*row);
if (!last_update_time) {
std::cout << " NULL";
} else {
std::cout << ' ' << *last_update_time;
}
std::cout << "\n";
}
}
C#
using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class QueryDataWithTimestampColumnAsyncSample
{
public class Album
{
public int SingerId { get; set; }
public int AlbumId { get; set; }
public DateTime? LastUpdateTime { get; set; }
public long? MarketingBudget { get; set; }
}
public async Task<List<Album>> QueryDataWithTimestampColumnAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
using var connection = new SpannerConnection(connectionString);
using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime FROM Albums ORDER BY LastUpdateTime DESC");
var albums = new List<Album>();
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
albums.Add(new Album
{
SingerId = reader.GetFieldValue<int>("SingerId"),
AlbumId = reader.GetFieldValue<int>("AlbumId"),
LastUpdateTime = reader.IsDBNull(reader.GetOrdinal("LastUpdateTime")) ? (DateTime?)null : reader.GetFieldValue<DateTime>("LastUpdateTime"),
MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
});
}
return albums;
}
}
Go
import (
"context"
"fmt"
"io"
"strconv"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func queryWithTimestamp(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
stmt := spanner.Statement{
SQL: `SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime
FROM Albums ORDER BY LastUpdateTime DESC`}
iter := client.Single().Query(ctx, stmt)
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
var singerID, albumID int64
var marketingBudget spanner.NullInt64
var lastUpdateTime spanner.NullTime
if err := row.ColumnByName("SingerId", &singerID); err != nil {
return err
}
if err := row.ColumnByName("AlbumId", &albumID); err != nil {
return err
}
if err := row.ColumnByName("MarketingBudget", &marketingBudget); err != nil {
return err
}
budget := "NULL"
if marketingBudget.Valid {
budget = strconv.FormatInt(marketingBudget.Int64, 10)
}
if err := row.ColumnByName("LastUpdateTime", &lastUpdateTime); err != nil {
return err
}
timestamp := "NULL"
if lastUpdateTime.Valid {
timestamp = lastUpdateTime.String()
}
fmt.Fprintf(w, "%d %d %s %s\n", singerID, albumID, budget, timestamp)
}
}
Java
static void queryMarketingBudgetWithTimestamp(DatabaseClient dbClient) {
// Rows without an explicit value for MarketingBudget will have a MarketingBudget equal to
// null. A try-with-resource block is used to automatically release resources held by
// ResultSet.
try (ResultSet resultSet =
dbClient
.singleUse()
.executeQuery(
Statement.of(
"SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime FROM Albums"
+ " ORDER BY LastUpdateTime DESC"))) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s %s\n",
resultSet.getLong("SingerId"),
resultSet.getLong("AlbumId"),
// We check that the value is non null. ResultSet getters can only be used to retrieve
// non null values.
resultSet.isNull("MarketingBudget") ? "NULL" : resultSet.getLong("MarketingBudget"),
resultSet.isNull("LastUpdateTime") ? "NULL" : resultSet.getTimestamp("LastUpdateTime"));
}
}
}
Node.js
// ...
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const query = {
sql: `SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime
FROM Albums ORDER BY LastUpdateTime DESC`,
};
// Queries rows from the Albums table
try {
const [rows] = await database.run(query);
rows.forEach(row => {
const json = row.toJSON();
console.log(
`SingerId: ${json.SingerId}, AlbumId: ${
json.AlbumId
}, MarketingBudget: ${
json.MarketingBudget ? json.MarketingBudget : null
}, LastUpdateTime: ${json.LastUpdateTime}`
);
});
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished
database.close();
}
PHP
use Google\Cloud\Spanner\SpannerClient;
/**
* Queries sample data from a database with a commit timestamp column.
*
* This sample uses the `MarketingBudget` column. You can add the column
* by running the `add_column` sample or by running this DDL statement against
* your database:
*
* ALTER TABLE Albums ADD COLUMN MarketingBudget INT64
*
* This sample also uses the 'LastUpdateTime' commit timestamp column. You can
* add the column by running the `add_timestamp_column` sample or by running
* this DDL statement against your database:
*
* ALTER TABLE Albums ADD COLUMN LastUpdateTime TIMESTAMP OPTIONS (allow_commit_timestamp=true)
*
* Example:
* ```
* query_data_with_timestamp_column($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function query_data_with_timestamp_column(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$results = $database->execute(
'SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime ' .
' FROM Albums ORDER BY LastUpdateTime DESC'
);
foreach ($results as $row) {
if ($row['MarketingBudget'] == null) {
$row['MarketingBudget'] = 'NULL';
}
if ($row['LastUpdateTime'] == null) {
$row['LastUpdateTime'] = 'NULL';
}
printf('SingerId: %s, AlbumId: %s, MarketingBudget: %s, LastUpdateTime: %s' . PHP_EOL,
$row['SingerId'], $row['AlbumId'], $row['MarketingBudget'], $row['LastUpdateTime']);
}
}
Python
def query_data_with_timestamp(instance_id, database_id):
"""Queries sample data from the database using SQL.
This updates the `LastUpdateTime` column which must be created before
running this sample. You can add the column by running the
`add_timestamp_column` sample or by running this DDL statement
against your database:
ALTER TABLE Performances ADD COLUMN LastUpdateTime TIMESTAMP
OPTIONS (allow_commit_timestamp=true)
"""
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT SingerId, AlbumId, MarketingBudget FROM Albums "
"ORDER BY LastUpdateTime DESC"
)
for row in results:
print("SingerId: {}, AlbumId: {}, MarketingBudget: {}".format(*row))
Ruby
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
client.execute("SELECT SingerId, AlbumId, MarketingBudget, LastUpdateTime
FROM Albums ORDER BY LastUpdateTime DESC").rows.each do |row|
puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:MarketingBudget]} #{row[:LastUpdateTime]}"
end
为提交时间戳列提供您自己的值
在代码中,您可以为提交时间戳列提供自己的值,而不是传递 spanner.commit_timestamp()
(或可用的客户端库常量)作为列值。该值必须为过去的时间戳。此限制可确保写入时间戳的操作成本低且速度快。确认某个值为过去的时间戳的一种简单方法是将其与 CURRENT_TIMESTAMP
SQL 函数的返回值进行比较。如果指定了未来的时间戳,服务器将返回 FailedPrecondition
错误。
创建更新日志
假设您希望创建一个关于发生在表中的每个变更的更改日志,然后使用该更改日志进行审核。例如一个用于存储字处理文档的更改记录的表。提交时间戳使得创建更改日志更加容易,因为时间戳可以强制对更改日志条目进行排序。您可以使用类似以下示例的架构构建一个更改日志,以便将更改记录存储到指定的文档中:
CREATE TABLE Documents (
UserId int8 NOT NULL,
DocumentId int8 NOT NULL,
Contents text NOT NULL,
PRIMARY KEY (UserId, DocumentId)
);
CREATE TABLE DocumentHistory (
UserId int8 NOT NULL,
DocumentId int8 NOT NULL,
Ts SPANNER.COMMIT_TIMESTAMP NOT NULL,
Delta text,
PRIMARY KEY (UserId, DocumentId, Ts)
) INTERLEAVE IN PARENT Documents;
要创建一个更新日志,请在为 Document
插入或更新某一行的相同事务中为 DocumentHistory
插入一个新行。在插入 DocumentHistory
中的新行时,使用占位符 spanner.commit_timestamp()
(或客户端库常量)告知 Spanner 将提交时间戳写入 Ts
列。将 DocumentsHistory
表与 Documents
表交错将实现数据局部性及更高效的插入和更新。但是,它也存在必须一起删除父行和子行的限制。若要在 DocumentHistory
中的行删除后保留 Documents
中的行,请不要交错表。
使用提交时间戳优化近期数据查询
提交时间戳支持 Spanner 优化,可以在检索在特定时间之后写入的数据时减少查询 I/O。
如需激活此优化,查询的 WHERE
子句必须包含表的提交时间戳列与您提供的特定时间之间的比较,具有以下属性:
请以常量表达式的形式提供特定时间:字面量、参数或其参数求值为常量的函数。
通过
>
或>=
运算符比较提交时间戳是否晚于指定时间。(可选)使用
AND
向WHERE
子句添加进一步限制。使用OR
扩展该子句会使查询失去此优化的资格。
例如,请参考以下 Performances
表,其中包含一个提交时间戳列:
CREATE TABLE Performances (
SingerId bigint NOT NULL,
VenueId bigint NOT NULL,
EventDate timestamp with time zone NOT NULL,
Revenue bigint,
LastUpdateTime spanner.commit_timestamp,
PRIMARY KEY(SingerId, VenueId, EventDate)
);
此查询受益于前面介绍的提交时间戳优化,因为它在表的提交时间戳列和常量表达式(在本例中为字面量)之间具有大于或等于的比较:
SELECT * FROM Performances WHERE LastUpdateTime >= '2022-01-01';