事务

事务是指对一个或多个实体执行的一组操作。每个事务一定是原子性的,因此事务决不会只应用一部分。事务中的所有操作要么全都应用,要么一个也不应用。

使用事务

事务会在 270 秒后或在闲置 60 秒后过期。

以下情况下操作可能失败:

  • 尝试对同一个实体进行太多并发修改。
  • 事务超出资源限制。
  • Datastore 模式数据库遇到内部错误。

在上述所有情况下,Datastore API 都会返回错误。

事务是一项可选功能。执行数据库操作并非必须使用事务。

应用可以在单个事务中执行一组语句和操作,因此,如果任何语句或操作引发异常,系统将不会应用该组中的任何数据库操作。应用会定义要在事务中执行的操作。

以下代码段展示了如何执行事务。该代码段会将资金从一个账户转移到另一个账户。

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore C# API 参考文档

private void TransferFunds(Key fromKey, Key toKey, long amount)
{
    using (var transaction = _db.BeginTransaction())
    {
        var entities = transaction.Lookup(fromKey, toKey);
        entities[0]["balance"].IntegerValue -= amount;
        entities[1]["balance"].IntegerValue += amount;
        transaction.Update(entities);
        transaction.Commit();
    }
}

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Go API 参考文档

type BankAccount struct {
	Balance int
}

const amount = 50
keys := []*datastore.Key{to, from}
tx, err := client.NewTransaction(ctx)
if err != nil {
	log.Fatalf("client.NewTransaction: %v", err)
}
accs := make([]BankAccount, 2)
if err := tx.GetMulti(keys, accs); err != nil {
	tx.Rollback()
	log.Fatalf("tx.GetMulti: %v", err)
}
accs[0].Balance += amount
accs[1].Balance -= amount
if _, err := tx.PutMulti(keys, accs); err != nil {
	tx.Rollback()
	log.Fatalf("tx.PutMulti: %v", err)
}
if _, err = tx.Commit(); err != nil {
	log.Fatalf("tx.Commit: %v", err)
}

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Java API 参考文档

void transferFunds(Key fromKey, Key toKey, long amount) {
  Transaction txn = datastore.newTransaction();
  try {
    List<Entity> entities = txn.fetch(fromKey, toKey);
    Entity from = entities.get(0);
    Entity updatedFrom =
        Entity.newBuilder(from).set("balance", from.getLong("balance") - amount).build();
    Entity to = entities.get(1);
    Entity updatedTo = Entity.newBuilder(to).set("balance", to.getLong("balance") + amount)
        .build();
    txn.put(updatedFrom, updatedTo);
    txn.commit();
  } finally {
    if (txn.isActive()) {
      txn.rollback();
    }
  }
}

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function transferFunds(fromKey, toKey, amount) {
  const transaction = datastore.transaction();
  await transaction.run();
  const results = await Promise.all([
    transaction.get(fromKey),
    transaction.get(toKey),
  ]);
  const accounts = results.map(result => result[0]);

  accounts[0].balance -= amount;
  accounts[1].balance += amount;

  transaction.save([
    {
      key: fromKey,
      data: accounts[0],
    },
    {
      key: toKey,
      data: accounts[1],
    },
  ]);

  return await transaction.commit();
}

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

/**
 * Update two entities in a transaction.
 *
 * @param DatastoreClient $datastore
 * @param Key $fromKey
 * @param Key $toKey
 * @param $amount
 */
function transfer_funds(
    DatastoreClient $datastore,
    Key $fromKey,
    Key $toKey,
    $amount
) {
    $transaction = $datastore->transaction();
    // The option 'sort' is important here, otherwise the order of the result
    // might be different from the order of the keys.
    $result = $transaction->lookupBatch([$fromKey, $toKey], ['sort' => true]);
    if (count($result['found']) != 2) {
        $transaction->rollback();
    }
    $fromAccount = $result['found'][0];
    $toAccount = $result['found'][1];
    $fromAccount['balance'] -= $amount;
    $toAccount['balance'] += $amount;
    $transaction->updateBatch([$fromAccount, $toAccount]);
    $transaction->commit();
}

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Python API 参考文档

def transfer_funds(client, from_key, to_key, amount):
    with client.transaction():
        from_account = client.get(from_key)
        to_account = client.get(to_key)

        from_account['balance'] -= amount
        to_account['balance'] += amount

        client.put_multi([from_account, to_account])

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

def transfer_funds from_key, to_key, amount
  datastore.transaction do |tx|
    from = tx.find from_key
    from["balance"] -= amount
    to = tx.find to_key
    to["balance"] += amount
    tx.save from, to
  end
end

注意,为了使示例更加简洁,有时我们会在事务失败时省略 rollback。在用于生产环境的代码中,请务必确保明确提交或回滚每个事务。

可以在事务中执行的操作

事务可以查询或查找任意数量的实体。在每个事务中,您最多可以创建、更新或删除 500 个实体。事务的最大大小为 10 MiB。您可以使用读写事务或只读事务。

隔离和一致性

Datastore 模式数据库会强制执行可序列化隔离。不能同时修改某个事务所读取或修改的数据。

一个事务中的各查询和查找操作会看到一致的数据库状态快照。此快照一定会包含在该事务开始之前完成的所有事务和写入操作的结果。

在事务内,写入后发生的读取操作也会看到同样的快照视图。与大多数数据库不同,Datastore 模式事务中的查询和查找操作不会看到该事务中先前写入操作的结果。具体而言,如果某个实体在事务中被修改或删除,则查询或查找操作会返回该实体在事务开始时的原始版本;如果当时并不存在该实体,则系统不会返回任何内容

事务外部的查询和查找操作也采用可序列化隔离。

事务锁定

读写事务使用读取者/写入者锁定来强制执行隔离和可序列化。如果两个并发读写事务读取或写入相同数据,则一个事务所持有的锁会延迟另一个事务。

如果您的事务不需要任何写入,您可以使用只读事务来提高性能并避免与其他事务发生争用。

与 Cloud Datastore 的区别

在 Cloud Datastore 中,旧版 Datastore 模式 Firestore 事务使用了乐观并发控制。事务并未延迟其他事务,但更有可能在提交时报告争用。如需详细了解旧版中的事务,请参阅 Cloud Datastore 事务

事务的用途

事务的一种用途是使用新的属性值(相对于当前值)更新实体。上面的 transferFunds 示例通过从一个账户中提取资金并将资金转移到另一个账户来针对两个实体实现此过程。Datastore API 不会自动重试事务,但您可以添加自己的逻辑来进行重试,例如,用来处理因其他请求同时更新同一实体而引发的冲突。

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore C# API 参考文档

/// <summary>
/// Retry the action when a Grpc.Core.RpcException is thrown.
/// </summary>
private T RetryRpc<T>(Func<T> action)
{
    List<Grpc.Core.RpcException> exceptions = null;
    var delayMs = _retryDelayMs;
    for (int tryCount = 0; tryCount < _retryCount; ++tryCount)
    {
        try
        {
            return action();
        }
        catch (Grpc.Core.RpcException e)
        {
            if (exceptions == null)
                exceptions = new List<Grpc.Core.RpcException>();
            exceptions.Add(e);
        }
        System.Threading.Thread.Sleep(delayMs);
        delayMs *= 2;  // Exponential back-off.
    }
    throw new AggregateException(exceptions);
}

private void RetryRpc(Action action)
{
    RetryRpc(() => { action(); return 0; });
}

[Fact]
public void TestTransactionalRetry()
{
    int tryCount = 0;
    var keys = UpsertBalances();
    RetryRpc(() =>
    {
        using (var transaction = _db.BeginTransaction())
        {
            TransferFunds(keys[0], keys[1], 10, transaction);
            // Insert a conflicting transaction on the first try.
            if (tryCount++ == 0)
                TransferFunds(keys[1], keys[0], 5);
            transaction.Commit();
        }
    });
    Assert.Equal(2, tryCount);
}

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Go API 参考文档

type BankAccount struct {
	Balance int
}

const amount = 50
_, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
	keys := []*datastore.Key{to, from}
	accs := make([]BankAccount, 2)
	if err := tx.GetMulti(keys, accs); err != nil {
		return err
	}
	accs[0].Balance += amount
	accs[1].Balance -= amount
	_, err := tx.PutMulti(keys, accs)
	return err
})

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Java API 参考文档

int retries = 5;
while (true) {
  try {
    transferFunds(fromKey, toKey, 10);
    break;
  } catch (DatastoreException e) {
    if (retries == 0) {
      throw e;
    }
    --retries;
  }
}
// Retry handling can also be configured and automatically applied using google-cloud-java.

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function transferFundsWithRetry() {
  const maxTries = 5;

  async function tryRequest(currentAttempt, delay) {
    try {
      await transferFunds(fromKey, toKey, 10);
    } catch (err) {
      if (currentAttempt <= maxTries) {
        // Use exponential backoff
        setTimeout(async () => {
          await tryRequest(currentAttempt + 1, delay * 2);
        }, delay);
      }
      throw err;
    }
  }

  await tryRequest(1, 100);
}

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

$retries = 5;
for ($i = 0; $i < $retries; $i++) {
    try {
        transfer_funds($datastore, $fromKey, $toKey, 10);
    } catch (Google\Cloud\Exception\ConflictException $e) {
        // if $i >= $retries, the failure is final
        continue;
    }
    // Succeeded!
    break;
}

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Python API 参考文档

for _ in range(5):
    try:
        transfer_funds(client, account1.key, account2.key, 50)
        break
    except google.cloud.exceptions.Conflict:
        continue
else:
    print('Transaction failed.')

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

(1..5).each do |i|
  begin
    return transfer_funds from_key, to_key, amount
  rescue Google::Cloud::Error => e
    raise e if i == 5
  end
end

这需要使用事务,因为在此代码提取对象之后到保存修改后的对象之前的这段时段内,实体中 balance 的值可能会被其他用户更新。如果不使用事务,则用户的请求将使用另一用户进行更新前的 balance 值,且保存会覆盖新值。如果使用了事务,应用会被告知其他用户在进行更新。

事务的另一个常见用途是获取具有命名键的实体,或者如果实体尚不存在,则创建该实体(本示例基于创建实体中的 TaskList 示例):

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore C# API 参考文档

Entity task;
using (var transaction = _db.BeginTransaction())
{
    task = transaction.Lookup(_sampleTask.Key);
    if (task == null)
    {
        transaction.Insert(_sampleTask);
        transaction.Commit();
    }
}

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Go API 参考文档

_, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
	var task Task
	if err := tx.Get(key, &task); err != datastore.ErrNoSuchEntity {
		return err
	}
	_, err := tx.Put(key, &Task{
		Category:    "Personal",
		Done:        false,
		Priority:    4,
		Description: "Learn Cloud Datastore",
	})
	return err
})

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Java API 参考文档

Entity task;
Transaction txn = datastore.newTransaction();
try {
  task = txn.get(taskKey);
  if (task == null) {
    task = Entity.newBuilder(taskKey).build();
    txn.put(task);
    txn.commit();
  }
} finally {
  if (txn.isActive()) {
    txn.rollback();
  }
}

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function getOrCreate(taskKey, taskData) {
  const taskEntity = {
    key: taskKey,
    data: taskData,
  };
  const transaction = datastore.transaction();

  try {
    await transaction.run();
    const [task] = await transaction.get(taskKey);
    if (task) {
      // The task entity already exists.
      await transaction.rollback();
    } else {
      // Create the task entity.
      transaction.save(taskEntity);
      await transaction.commit();
    }
    return taskEntity;
  } catch (err) {
    await transaction.rollback();
  }
}

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

$transaction = $datastore->transaction();
$existed = $transaction->lookup($task->key());
if ($existed === null) {
    $transaction->insert($task);
    $transaction->commit();
}

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Python API 参考文档

with client.transaction():
    key = client.key('Task', datetime.datetime.utcnow().isoformat())

    task = client.get(key)

    if not task:
        task = datastore.Entity(key)
        task.update({
            'description': 'Example task'
        })
        client.put(task)

    return task

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

task = nil
datastore.transaction do |tx|
  task = tx.find task_key
  if task.nil?
    task = datastore.entity task_key do |t|
      t["category"] = "Personal"
      t["done"] = false
      t["priority"] = 4
      t["description"] = "Learn Cloud Datastore"
    end
    tx.save task
  end
end

如上文所述,对于处理另一个用户尝试创建或更新具有相同字符串 ID 的实体的情况,事务是必要的。在没有事务时,如果实体不存在并且有两个用户尝试创建,则第二个创建者会在不知情的情况下覆盖第一个创建的实体。

当事务失败时,您可以让应用重试事务直到成功,或者可以将错误传递到应用的界面层,让用户处理该错误。您无需对每个事务都创建重试循环。

只读事务

最后,您可以使用事务来读取一致的数据库快照。如果您需要多次执行读取操作来呈现页面或导出必须保持一致的数据,这种方法非常实用。在这些情况下,您可以创建只读事务。

只读事务不能修改实体,因此,此类事务不会与其他任何事务发生争用,也不需要重试。如果您在某个常规的读写事务中仅执行读取操作,则该事务可能会与修改相同数据的事务之间发生争用。

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore C# API 参考文档

Entity taskList;
IReadOnlyList<Entity> tasks;
using (var transaction = _db.BeginTransaction(TransactionOptions.CreateReadOnly()))
{
    taskList = transaction.Lookup(taskListKey);
    var query = new Query("Task")
    {
        Filter = Filter.HasAncestor(taskListKey)
    };
    tasks = transaction.RunQuery(query).Entities;
    transaction.Commit();
}

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Go API 参考文档

tx, err := client.NewTransaction(ctx, datastore.ReadOnly)
if err != nil {
	log.Fatalf("client.NewTransaction: %v", err)
}
defer tx.Rollback() // Transaction only used for read.

ancestor := datastore.NameKey("TaskList", "default", nil)
query := datastore.NewQuery("Task").Ancestor(ancestor).Transaction(tx)
var tasks []Task
_, err = client.GetAll(ctx, query, &tasks)

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Java API 参考文档

Entity taskList;
QueryResults<Entity> tasks;
Transaction txn = datastore.newTransaction(
    TransactionOptions.newBuilder()
        .setReadOnly(ReadOnly.newBuilder().build())
        .build()
);
try {
  taskList = txn.get(taskListKey);
  Query<Entity> query = Query.newEntityQueryBuilder()
      .setKind("Task")
      .setFilter(PropertyFilter.hasAncestor(taskListKey))
      .build();
  tasks = txn.run(query);
  txn.commit();
} finally {
  if (txn.isActive()) {
    txn.rollback();
  }
}

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function getTaskListEntities() {
  const transaction = datastore.transaction({readOnly: true});
  try {
    const taskListKey = datastore.key(['TaskList', 'default']);

    await transaction.run();
    const [taskList] = await transaction.get(taskListKey);
    const query = datastore.createQuery('Task').hasAncestor(taskListKey);
    const [taskListEntities] = await transaction.runQuery(query);
    await transaction.commit();
    return [taskList, taskListEntities];
  } catch (err) {
    await transaction.rollback();
  }
}

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

$transaction = $datastore->readOnlyTransaction();
$taskListKey = $datastore->key('TaskList', 'default');
$query = $datastore->query()
    ->kind('Task')
    ->hasAncestor($taskListKey);
$result = $transaction->runQuery($query);
$taskListEntities = [];
/* @var Entity $task */
foreach ($result as $task) {
    $taskListEntities[] = $task;
}

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Python API 参考文档

with client.transaction(read_only=True):
    task_list_key = client.key('TaskList', 'default')

    task_list = client.get(task_list_key)

    query = client.query(kind='Task', ancestor=task_list_key)
    tasks_in_list = list(query.fetch())

    return task_list, tasks_in_list

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。 如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

# task_list_name = "default"
task_list_key = datastore.key "TaskList", task_list_name
datastore.read_only_transaction do |tx|
  task_list = tx.find task_list_key
  query = datastore.query("Task").ancestor(task_list)
  tasks_in_list = tx.run query
end

后续步骤