Cloud Datastore 事务

事务是指对最多 25 个实体组中的一个或多个实体执行的一组 Datastore 操作。每个事务一定是原子性的,因此事务决不会只应用一部分。事务中的所有操作要么都应用,要么都不应用。

使用事务

事务的最大时长为 60 秒,在 30 秒后有 10 秒的空闲到期时间。

出现以下情况时,操作可能会失败:

  • 尝试对同一个实体组进行太多并发修改。
  • 事务超出资源限制。
  • Datastore 遇到内部错误。

在上述所有情况下,Datastore API 都会返回错误。

事务是 Datastore 的可选功能;执行 Datastore 操作并非必须使用事务。

应用可以在单个事务中执行一组语句和 Datastore 操作,因此,如果任何语句或操作引发异常,系统将不会应用该组中的任何 Datastore 操作。应用会定义要在事务中执行的操作。

以下代码段展示了如何使用 Datastore API 执行事务。该代码段会将资金从一个账户转移到另一个账户。

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore C# API 参考文档

private void TransferFunds(Key fromKey, Key toKey, long amount)
    {
        using (var transaction = _db.BeginTransaction())
        {
            var entities = transaction.Lookup(fromKey, toKey);
            entities[0]["balance"].IntegerValue -= amount;
            entities[1]["balance"].IntegerValue += amount;
            transaction.Update(entities);
            transaction.Commit();
        }
    }

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Go API 参考文档

type BankAccount struct {
    	Balance int
    }

    const amount = 50
    keys := []*datastore.Key{to, from}
    tx, err := client.NewTransaction(ctx)
    if err != nil {
    	log.Fatalf("client.NewTransaction: %v", err)
    }
    accs := make([]BankAccount, 2)
    if err := tx.GetMulti(keys, accs); err != nil {
    	tx.Rollback()
    	log.Fatalf("tx.GetMulti: %v", err)
    }
    accs[0].Balance += amount
    accs[1].Balance -= amount
    if _, err := tx.PutMulti(keys, accs); err != nil {
    	tx.Rollback()
    	log.Fatalf("tx.PutMulti: %v", err)
    }
    if _, err = tx.Commit(); err != nil {
    	log.Fatalf("tx.Commit: %v", err)
    }

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Java API 参考文档

void transferFunds(Key fromKey, Key toKey, long amount) {
      Transaction txn = datastore.newTransaction();
      try {
        List<Entity> entities = txn.fetch(fromKey, toKey);
        Entity from = entities.get(0);
        Entity updatedFrom =
            Entity.newBuilder(from).set("balance", from.getLong("balance") - amount).build();
        Entity to = entities.get(1);
        Entity updatedTo = Entity.newBuilder(to).set("balance", to.getLong("balance") + amount)
            .build();
        txn.put(updatedFrom, updatedTo);
        txn.commit();
      } finally {
        if (txn.isActive()) {
          txn.rollback();
        }
      }
    }

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function transferFunds(fromKey, toKey, amount) {
      const transaction = datastore.transaction();
      await transaction.run();
      const results = await Promise.all([
        transaction.get(fromKey),
        transaction.get(toKey),
      ]);
      const accounts = results.map(result => result[0]);

      accounts[0].balance -= amount;
      accounts[1].balance += amount;

      transaction.save([
        {
          key: fromKey,
          data: accounts[0],
        },
        {
          key: toKey,
          data: accounts[1],
        },
      ]);

      return transaction.commit();
    }

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

/**
     * Update two entities in a transaction.
     *
     * @param DatastoreClient $datastore
     * @param Key $fromKey
     * @param Key $toKey
     * @param $amount
     */
    function transfer_funds(
        DatastoreClient $datastore,
        Key $fromKey,
        Key $toKey,
        $amount
    ) {
        $transaction = $datastore->transaction();
        // The option 'sort' is important here, otherwise the order of the result
        // might be different from the order of the keys.
        $result = $transaction->lookupBatch([$fromKey, $toKey], ['sort' => true]);
        if (count($result['found']) != 2) {
            $transaction->rollback();
        }
        $fromAccount = $result['found'][0];
        $toAccount = $result['found'][1];
        $fromAccount['balance'] -= $amount;
        $toAccount['balance'] += $amount;
        $transaction->updateBatch([$fromAccount, $toAccount]);
        $transaction->commit();
    }

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Python API 参考文档

def transfer_funds(client, from_key, to_key, amount):
        with client.transaction():
            from_account = client.get(from_key)
            to_account = client.get(to_key)

            from_account['balance'] -= amount
            to_account['balance'] += amount

            client.put_multi([from_account, to_account])

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

def transfer_funds from_key, to_key, amount
      datastore.transaction do |tx|
        from = tx.find from_key
        from["balance"] -= amount
        to = tx.find to_key
        to["balance"] += amount
        tx.save from, to
      end
    end

请注意,为了使示例更加简洁,有时我们会在事务失败时省略 rollback。在用于生产环境的代码中,请务必确保明确提交或回滚每个事务。

可以在事务中执行的操作

事务中的所有 Datastore 操作最多可以在二十五个实体组上执行。这包括通过祖先查询实体、通过键检索实体、更新实体以及删除实体。

当两个或多个事务同时尝试修改一个或多个公共实体组中的实体时,只有第一个提交其更改的事务会成功;所有其他事务将提交失败。由于这种设计,使用实体组限制了您可以对组中的任何实体执行的并发写入数量。事务开始时,Datastore 通过检查事务中使用的实体组的上次更新时间来使用开放式并发控制。在为实体组提交事务后,Datastore 会再次检查事务中使用的实体组的上次更新时间。如果该时间自初次检查后发生了变化,将返回错误。如需了解实体组的说明,请参阅祖先路径

隔离和一致性

在事务之外,Datastore 的隔离级别最接近提交的读取操作。在事务之内,系统将采用可序列化隔离。 这意味着另一个事务不能并发修改由此事务读取或修改的数据。 如需详细了解隔离级别,请参阅可序列化隔离 Wiki 和事务隔离一文。

在事务中,所有读取都反映事务开始时 Datastore 的当前一致状态。事务内部的查询和查找可以保证在事务开始时看到单个一致的 Datastore 快照。事务的实体组中的实体和索引行将完全更新,以便查询返回一组完整、正确的结果实体,而不会出现事务隔离中所描述的可能发生在事务外部查询中的假正例或假负例。

在事务内,写入后发生的读取操作也会看到同样的快照视图。与大多数数据库不同,Datastore 事务内部的查询和获取不会看到该事务中先前写入的结果。具体而言,如果某个实体在事务中被修改或删除,则查询或查找操作会返回该实体在事务开始时的原始版本;如果当时并不存在该实体,则系统不会返回任何内容

事务的用途

事务的一种用途是使用相对于其当前值的新属性值更新实体。上面的 transferFunds 示例通过从一个账户中提取资金并将资金转移到另一个账户来针对两个实体实现此过程。Datastore API 不会自动重试事务,但您可以添加自己的逻辑来进行重试,例如,用来处理因其他请求同时更新同一实体而引发的冲突。

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore C# API 参考文档

/// <summary>
    /// Retry the action when a Grpc.Core.RpcException is thrown.
    /// </summary>
    private T RetryRpc<T>(Func<T> action)
    {
        List<Grpc.Core.RpcException> exceptions = null;
        var delayMs = _retryDelayMs;
        for (int tryCount = 0; tryCount < _retryCount; ++tryCount)
        {
            try
            {
                return action();
            }
            catch (Grpc.Core.RpcException e)
            {
                if (exceptions == null)
                    exceptions = new List<Grpc.Core.RpcException>();
                exceptions.Add(e);
            }
            System.Threading.Thread.Sleep(delayMs);
            delayMs *= 2;  // Exponential back-off.
        }
        throw new AggregateException(exceptions);
    }

    private void RetryRpc(Action action)
    {
        RetryRpc(() => { action(); return 0; });
    }

    [Fact]
    public void TestTransactionalRetry()
    {
        int tryCount = 0;
        var keys = UpsertBalances();
        RetryRpc(() =>
        {
            using (var transaction = _db.BeginTransaction())
            {
                TransferFunds(keys[0], keys[1], 10, transaction);
                // Insert a conflicting transaction on the first try.
                if (tryCount++ == 0)
                    TransferFunds(keys[1], keys[0], 5);
                transaction.Commit();
            }
        });
        Assert.Equal(2, tryCount);
    }

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Go API 参考文档

type BankAccount struct {
    	Balance int
    }

    const amount = 50
    _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
    	keys := []*datastore.Key{to, from}
    	accs := make([]BankAccount, 2)
    	if err := tx.GetMulti(keys, accs); err != nil {
    		return err
    	}
    	accs[0].Balance += amount
    	accs[1].Balance -= amount
    	_, err := tx.PutMulti(keys, accs)
    	return err
    })

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Java API 参考文档

int retries = 5;
    while (true) {
      try {
        transferFunds(fromKey, toKey, 10);
        break;
      } catch (DatastoreException e) {
        if (retries == 0) {
          throw e;
        }
        --retries;
      }
    }
    // Retry handling can also be configured and automatically applied using google-cloud-java.

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function transferFundsWithRetry() {
      const maxTries = 5;

      async function tryRequest(currentAttempt, delay) {
        try {
          await transferFunds(fromKey, toKey, 10);
        } catch (err) {
          if (currentAttempt <= maxTries) {
            // Use exponential backoff
            setTimeout(async () => {
              await tryRequest(currentAttempt + 1, delay * 2);
            }, delay);
          }
          throw err;
        }
      }

      await tryRequest(1, 100);
    }

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

$retries = 5;
    for ($i = 0; $i < $retries; $i++) {
        try {
            transfer_funds($datastore, $fromKey, $toKey, 10);
        } catch (Google\Cloud\Exception\ConflictException $e) {
            // if $i >= $retries, the failure is final
            continue;
        }
        // Succeeded!
        break;
    }

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Python API 参考文档

for _ in range(5):
        try:
            transfer_funds(client, account1.key, account2.key, 50)
            break
        except google.cloud.exceptions.Conflict:
            continue
    else:
        print('Transaction failed.')

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

(1..5).each do |i|
      begin
        return transfer_funds from_key, to_key, amount
      rescue Google::Cloud::Error => e
        raise e if i == 5
      end
    end

这需要使用事务,因为在此代码提取对象之后到保存修改后的对象之前的这段时段内,实体中 balance 的值可能会被其他用户更新。如果不使用事务,则用户的请求将使用另一用户进行更新前的 balance 值,且保存会覆盖新值。如果使用了事务,应用会被告知其他用户在进行更新。

事务的另一个常见用途是获取具有命名键的实体,或者如果实体尚不存在,则创建该实体(本示例基于创建实体中的 TaskList 示例):

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore C# API 参考文档

Entity task;
    using (var transaction = _db.BeginTransaction())
    {
        task = transaction.Lookup(_sampleTask.Key);
        if (task == null)
        {
            transaction.Insert(_sampleTask);
            transaction.Commit();
        }
    }

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Go API 参考文档

_, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
    	var task Task
    	if err := tx.Get(key, &task); err != datastore.ErrNoSuchEntity {
    		return err
    	}
    	_, err := tx.Put(key, &Task{
    		Category:    "Personal",
    		Done:        false,
    		Priority:    4,
    		Description: "Learn Cloud Datastore",
    	})
    	return err
    })

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Java API 参考文档

Entity task;
    Transaction txn = datastore.newTransaction();
    try {
      task = txn.get(taskKey);
      if (task == null) {
        task = Entity.newBuilder(taskKey).build();
        txn.put(task);
        txn.commit();
      }
    } finally {
      if (txn.isActive()) {
        txn.rollback();
      }
    }

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function getOrCreate(taskKey, taskData) {
      const taskEntity = {
        key: taskKey,
        data: taskData,
      };
      const transaction = datastore.transaction();

      try {
        await transaction.run();
        const [task] = await transaction.get(taskKey);
        if (task) {
          // The task entity already exists.
          transaction.rollback();
        } else {
          // Create the task entity.
          transaction.save(taskEntity);
          transaction.commit();
        }
        return taskEntity;
      } catch (err) {
        transaction.rollback();
      }
    }

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

$transaction = $datastore->transaction();
    $existed = $transaction->lookup($task->key());
    if ($existed === null) {
        $transaction->insert($task);
        $transaction->commit();
    }

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Python API 参考文档

with client.transaction():
        key = client.key('Task', datetime.datetime.utcnow().isoformat())

        task = client.get(key)

        if not task:
            task = datastore.Entity(key)
            task.update({
                'description': 'Example task'
            })
            client.put(task)

        return task

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

task = nil
    datastore.transaction do |tx|
      task = tx.find task_key
      if task.nil?
        task = datastore.entity task_key do |t|
          t["category"] = "Personal"
          t["done"] = false
          t["priority"] = 4
          t["description"] = "Learn Cloud Datastore"
        end
        tx.save task
      end
    end

如上文所述,对于有另一个用户尝试创建或更新具有相同字符串 ID 的实体的情况,需要使用事务来进行处理。如果不使用事务,当有两个用户尝试创建同一个不存在的实体时,第二个用户的操作会覆盖第一个用户所创建的实体,而他们对此却毫不知情。

当事务失败时,您可以让应用重试事务直到成功,或者可以将错误传递到应用的界面层,让用户处理该错误。不必对每个事务都创建重试循环。

最后,您可以使用事务来读取一致的 Datastore 快照。这在需要进行多次读取来呈现页面或导出必须一致的数据时非常有用。这些种类的事务通常称为“只读”事务,因为它们不执行写入操作。只读单组事务始终不会由于并发修改而失败,所以您不必执行失败重试。但是,多实体组事务可能由于并发修改而失败,因此这些事务应当需要重试。

C#

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore C# API 参考文档

Entity taskList;
    IReadOnlyList<Entity> tasks;
    using (var transaction = _db.BeginTransaction(TransactionOptions.CreateReadOnly()))
    {
        taskList = transaction.Lookup(taskListKey);
        var query = new Query("Task")
        {
            Filter = Filter.HasAncestor(taskListKey)
        };
        tasks = transaction.RunQuery(query).Entities;
        transaction.Commit();
    }

Go

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Go API 参考文档

tx, err := client.NewTransaction(ctx, datastore.ReadOnly)
    if err != nil {
    	log.Fatalf("client.NewTransaction: %v", err)
    }
    defer tx.Rollback() // Transaction only used for read.

    ancestor := datastore.NameKey("TaskList", "default", nil)
    query := datastore.NewQuery("Task").Ancestor(ancestor).Transaction(tx)
    var tasks []Task
    _, err = client.GetAll(ctx, query, &tasks)

Java

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Java API 参考文档

Entity taskList;
    QueryResults<Entity> tasks;
    Transaction txn = datastore.newTransaction(
        TransactionOptions.newBuilder()
            .setReadOnly(ReadOnly.newBuilder().build())
            .build()
    );
    try {
      taskList = txn.get(taskListKey);
      Query<Entity> query = Query.newEntityQueryBuilder()
          .setKind("Task")
          .setFilter(PropertyFilter.hasAncestor(taskListKey))
          .build();
      tasks = txn.run(query);
      txn.commit();
    } finally {
      if (txn.isActive()) {
        txn.rollback();
      }
    }

Node.js

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Node.js API 参考文档

async function getTaskListEntities() {
      const transaction = datastore.transaction({readOnly: true});
      try {
        const taskListKey = datastore.key(['TaskList', 'default']);

        await transaction.run();
        const [taskList] = await transaction.get(taskListKey);
        const query = datastore.createQuery('Task').hasAncestor(taskListKey);
        const [taskListEntities] = await transaction.runQuery(query);
        await transaction.commit();
        return [taskList, taskListEntities];
      } catch (err) {
        await transaction.rollback();
      }
    }

PHP

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore PHP API 参考文档

$transaction = $datastore->readOnlyTransaction();
    $taskListKey = $datastore->key('TaskList', 'default');
    $query = $datastore->query()
        ->kind('Task')
        ->hasAncestor($taskListKey);
    $result = $transaction->runQuery($query);
    $taskListEntities = [];
    /* @var Entity $task */
    foreach ($result as $task) {
        $taskListEntities[] = $task;
    }

Python

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Python API 参考文档

with client.transaction(read_only=True):
        task_list_key = client.key('TaskList', 'default')

        task_list = client.get(task_list_key)

        query = client.query(kind='Task', ancestor=task_list_key)
        tasks_in_list = list(query.fetch())

        return task_list, tasks_in_list

Ruby

如需了解如何安装和使用 Cloud Datastore 客户端库,请参阅 Cloud Datastore 客户端库。如需了解详情,请参阅 Cloud Datastore Ruby API 参考文档

task_list_key = datastore.key "TaskList", "default"
    datastore.read_only_transaction do |tx|
      task_list = tx.find task_list_key
      query = datastore.query("Task").ancestor(task_list)
      tasks_in_list = tx.run query
    end

事务和实体组

实体组是一组通过祖先实体连接到共同根元素的实体。将数据整理成实体组可以限制可执行的事务:

  • 一个事务所访问的所有数据都必须包含在最多 25 个实体组中。
  • 如果想在事务内使用查询,必须将数据整理成实体组,如此用户才可以指定与正确数据匹配的祖先过滤器。
  • 单个实体组的写入吞吐量限制为每秒约一个事务。存在此限制是因为 Datastore 会在大范围的地理区域内,对每个实体组执行无主同步复制,以提供高可靠性和容错性。

在许多应用中,可在广泛查看互不相关数据时使用最终一致性(即跨多个实体组进行非祖先查询,有时可能会返回稍过时的数据),然后在查看或修改一组高度相关的数据时,使用高度一致性(祖先查询,或使用 lookup 方法查询单个实体)。在此类应用中,通常适合为每组高度相关的数据使用独立的实体组。 如需了解详情,请参阅数据一致性

后续步骤