注意:强烈建议构建新应用的开发者使用 NDB 客户端库,它与 DB 客户端库相比具有多项优势,例如可通过 Memcache API 进行自动实体缓存。如果您当前使用的是较早的 DB 客户端库,请参阅 DB 到 NDB 的迁移指南
Datastore 支持事务。事务是一个或一组原子操作:事务中的所有操作要么都发生,要么都不发生。应用可以在单个事务中执行多个操作和计算。
使用事务
事务是对一个或多个实体执行的一组 Datastore 操作。每个事务都必须具有原子性,因此事务决不会只应用一部分。事务中的所有操作要么都应用,要么都不应用。事务的最长持续时间为 60 秒,在 30 秒后有 10 秒的空闲到期时间。
出现以下情况时,操作可能会执行失败:
- 尝试对同一实体组进行太多并发修改。
- 事务超出资源限制。
- Datastore 遇到内部错误。
在以上所有情况下,Datastore API 都会引发异常。
事务是 Datastore 的可选功能;执行 Datastore 操作并非必须使用事务。
应用可以在单个事务中执行一组语句和 Datastore 操作,因此,如果任何语句或操作引发异常,系统将不会应用该组中的任何 Datastore 操作。该应用使用 Python 函数定义了要在事务中执行的操作。应用使用 run_in_transaction
方法之一启动事务,具体取决于事务是访问单个实体组中的实体,还是事务是跨组事务。
若需了解仅在事务中使用的函数的常见用例,请使用 @db.transactional
修饰器:
from google.appengine.ext import db
class Accumulator(db.Model):
counter = db.IntegerProperty(default=0)
@db.transactional
def increment_counter(key, amount):
obj = db.get(key)
obj.counter += amount
obj.put()
q = db.GqlQuery("SELECT * FROM Accumulator")
acc = q.get()
increment_counter(acc.key(), 5)
如果有时在没有事务的情况下调用函数,无需对其进行修饰,而是调用 db.run_in_transaction()
并使用该函数作为参数:
from google.appengine.ext import db
class Accumulator(db.Model):
counter = db.IntegerProperty(default=0)
def increment_counter(key, amount):
obj = db.get(key)
obj.counter += amount
obj.put()
q = db.GqlQuery("SELECT * FROM Accumulator")
acc = q.get()
db.run_in_transaction(increment_counter, acc.key(), 5)
db.run_in_transaction()
使用函数对象以及要传递给该函数的位置和关键字参数。如果该函数返回一个值,则 db.run_in_transaction()
将返回该值。
如果该函数返回,则提交事务,并应用数据存储区操作的所有作用。如果该函数引发异常,则事务将“回滚”,并且不会应用效果。请参阅上面有关异常的说明。
当从一个事务中调用另一个事务函数时,@db.transactional
和 db.run_in_transaction()
的默认行为不同。@db.transactional
将允许存在这种情况,并且内部事务变为与外部事务相同的事务。调用 db.run_in_transaction()
会尝试在现有事务中“嵌套”另一个事务;但是系统尚不支持这种行为,且会抛出 db.BadRequestError
您可以指定其他行为;如需了解详情,请参阅事务选项上的函数引用。
使用跨组 (XG) 事务
跨组事务跨多个实体组运行,其行为类似于单组事务,但是,如果代码尝试更新多个实体组中的实体,则跨组事务不会失败。如需调用跨组事务,请使用事务选项。
使用 @db.transactional
:
from google.appengine.ext import db
@db.transactional(xg=True)
def make_things():
thing1 = Thing(a=3)
thing1.put()
thing2 = Thing(a=7)
thing2.put()
make_things()
使用 db.run_in_transaction_options
:
from google.appengine.ext import db
xg_on = db.create_transaction_options(xg=True)
def my_txn():
x = MyModel(a=3)
x.put()
y = MyModel(a=7)
y.put()
db.run_in_transaction_options(xg_on, my_txn)
可以在事务中执行的操作
Datastore 对可以在单个事务中执行的操作设定了限制。
如果事务属于单组事务,则事务中的所有 Datastore 操作都必须针对同一实体组中的实体;如果事务属于跨组事务,则这些操作最多可以针对 25 个实体组中的实体。这包括通过祖先实体查询实体、通过键检索实体、更新实体以及删除实体。请注意,每个根实体都属于单独的实体组,因此,除非是跨组事务,否则单个事务无法创建或操作多个根实体。
当两个或多个事务同时尝试修改一个或多个公共实体组中的实体时,只有第一个提交其更改的事务可以应用成功;所有其他事务均会提交失败。由于上述机制,使用实体组会限制您可以对组中任意实体执行的并发写入数量。事务开始时,Datastore 会通过检查事务中使用的实体组的上次更新时间来使用乐观并发控制。在为实体组提交事务后,Datastore 会再次检查事务中使用的实体组的上次更新时间。如果该时间自初次检查后发生了变化,则会引发异常。
应用可以在事务期间执行查询,但前提是查询包含祖先实体过滤条件。应用还可以在事务期间通过键获取 Datastore 实体。您可以在事务之前准备键,也可以在事务内通过键名称或 ID 来构建键。
事务函数中允许使用其他所有 Python 代码。您可以使用 db.is_in_transaction()
确定当前范围是否嵌套在事务函数中。除 Datastore 操作外,事务函数不应产生副作用。如果由于另一个用户同时更新实体组中的实体而导致 Datastore 操作失败,则该事务函数可以多次调用。如果发生这种情况,Datastore API 会重试该事务固定次数。如果尝试都失败了,则 db.run_in_transaction()
会抛出 TransactionFailedError
。您可以使用 db.run_in_transaction_custom_retries()
而不是 db.run_in_transaction() 来调整重试事务的次数。
同样,事务函数不应产生取决于该事务成功与否的副作用,除非调用此事务函数的代码明确如何撤消这些作用。例如,如果事务存储一个新 Datastore 实体并保存该创建的实体的 ID 以供以后使用,然后该事务失败,则保存的 ID 无法引用预期的实体,因为该实体创建回滚。在这种情况下,调用代码一定要谨慎,切勿使用已保存的 ID。
隔离和一致性
在事务之外,Datastore 的隔离级别最接近提交的读取操作。在事务之内,系统将采用可序列化隔离。这意味着另一个事务不能并发修改由此事务读取或修改的数据。
在事务中,所有读取都反映事务开始时 Datastore 的当前一致状态。事务内部的查询和获取操作可以保证在事务开始时看到单个一致的 Datastore 快照。事务的实体组中的实体和索引行将完全更新,以便查询操作返回一组完整且正确的结果实体,而不会出现可能在事务外部的查询操作中发生的假正例或假负例。
在事务内,此一致的快照视图还延伸到写入后的读取。与大多数数据库不同,Datastore 事务内部的查询和获取操作不会看到该事务中先前写入的结果。具体而言,如果某个实体在事务中被修改或删除,则查询或获取操作会返回该实体在事务开始时的原始版本;如果当时并不存在该实体,则系统不会返回任何内容。
事务的用途
此示例说明了事务的一个用途:使用相对于属性当前值的新属性值来更新实体。
def increment_counter(key, amount):
obj = db.get(key)
obj.counter += amount
obj.put()
这需要使用事务,因为在此代码提取对象之后到保存修改后的对象之前的这段时间内,其他用户可能会更新该值。如果不使用事务,则用户的请求将使用另一用户进行更新前的 count
值,且保存操作会覆盖新值。如果使用了事务,应用会被告知其他用户在进行更新。如果在事务期间实体发生更新,则系统将重试事务,直到所有步骤都无中断地完成。
事务的另一个常见用途是提取具有指定键的实体;如果实体尚不存在,则创建该实体:
class SalesAccount(db.Model):
address = db.PostalAddressProperty()
phone_number = db.PhoneNumberProperty()
def get_or_create(parent_key, account_id, address, phone_number):
obj = db.get(db.Key.from_path("SalesAccount", account_id, parent=parent_key))
if not obj:
obj = SalesAccount(key_name=account_id,
parent=parent_key,
address=address,
phone_number=phone_number)
obj.put()
else:
obj.address = address
obj.phone_number = phone_number
与之前一样,如果另一用户试图创建或更新具有同一字符串 ID 的实体,则需要使用事务来处理这种情况。不使用事务时,如果实体不存在且两位用户都试图创建它,则第二位用户会覆盖第一位用户的操作,而不知道实体已被创建。使用事务时,第二次重新尝试时会注意到该实体已存在,改为更新该实体。
当事务失败时,您可以让应用重试事务直到成功,或者可以将错误传递到应用的界面层,让用户处理该错误。您无需对每个事务都创建重试循环。
“获取或创建”(Get-or-create) 非常有用,可通过一个内置方法来实现:Model.get_or_insert()
接受一个键名、一个可选的父键,以及传递给模型构造函数的参数(如果不存在使用该名称和路径的实体)。获取尝试和创建发生在一个事务内,因此(如果事务成功)该方法总是返回一个代表实际实体的模型实例。
最后,您可以使用事务来读取一致的 Datastore 快照。在需要进行多次读取来呈现页面或导出完全一致的数据时,这非常有用。此类事务通常称为“只读”事务,因为它们不执行写入操作。只读单组事务始终不会由于并发修改而执行失败,所以您无需执行失败重试。但是,跨组事务可能由于并发修改而执行失败,因此这些事务应当执行重试。提交和回滚只读事务都无需运维。
class Customer(db.Model):
user = db.StringProperty()
class Account(db.Model):
"""An Account has a Customer as its parent."""
address = db.PostalAddressProperty()
balance = db.FloatProperty()
def get_all_accounts():
"""Returns a consistent view of the current user's accounts."""
accounts = []
for customer in Customer.all().filter('user =', users.get_current_user().user_id()):
accounts.extend(Account.all().ancestor(customer))
return accounts
将事务性任务加入队列
您可以将任务作为 Datastore 事务的一部分加入队列,使任务仅在事务成功提交的情况下才会加入队列。如果事务未提交,则任务不会入队。如果确实提交了事务,则任务会入队。一旦入队,任务将不会立即执行,因此该事务的任务没有原子化。不过,任务加入队列后会立即重试,直至执行成功。这一点适用于在 run_in_transaction()
函数执行期间入队的任何任务。
事务性任务非常有用,因为它们可让您将多个非 Datastore 操作组合为一个事务,进而以该事务的成功为决定因素(例如发送电子邮件以确认购买)。也可以将 Datastore 操作绑定到事务,例如,当且仅当事务成功时才提交在事务外部对实体组所做的更改。
在单个事务期间,应用插入任务队列的事务性任务不能超过五个。事务性任务不能采用用户指定的名称。
def do_something_in_transaction(...)
taskqueue.add(url='/path/to/my/worker', transactional=True)
...
db.run_in_transaction(do_something_in_transaction, ....)