本页面介绍如何创建任务并将其放入推送队列。如果您要处理任务,必须创建一个新的任务对象,并将其放到队列中。您可以明确指定用于处理任务的服务和处理程序,并且可以选择将特定于任务的数据传递给处理程序。您还可以微调任务的配置,例如,安排将来执行任务的时间,或者限制失败时希望重试任务的次数。
新建任务
要创建任务并将其加入队列,请调用 taskqueue.add() 函数。
以下代码创建一个以名为 worker
的服务为目标的任务,并通过设置网址 /update-counter
来调用其处理程序:
您也可以创建一个 Task 对象并调用其 add() 方法。
指定工作器服务
当任务从队列中弹出时,任务队列服务会将该任务发送到工作器服务。每个任务都包含一个目标和一个网址,用于确定最终执行任务的服务和处理程序。
target
目标用于指定将接收 HTTP 请求来执行任务的服务。目标是一个字符串,用于以任意一种规范化格式指定服务/版本/实例。最常用的规范格式有:
service
version.service
instance.version.service
目标字符串将添加到应用的域名前面。
在构造任务时声明目标。 您可以使用 taskqueue.add() 函数中的
target
参数明确设置目标。请参阅上文中的示例。在
queue.yaml
中定义队列时加入target
指令,如queue-blue
的定义所述。添加到具有target
的队列的所有任务都将使用该目标,即便在构建任务时为任务分配了其他目标也是如此。如果没有采用前面两种方法中的任何一种指定目标,则任务的目标便是将任务加入队列的服务版本。请注意,如果以这种方式从默认服务和版本中将任务加入队列,而在任务执行之前默认版本发生了更改,则该任务将在新的默认版本中运行。
url
url
用来选择目标服务中的一个处理程序来执行任务。
此 url
应与目标服务中的一种处理程序网址格式相匹配。如果任务中指定的方法是 GET
或 PULL
,则 url
可以包含查询参数。如果未指定 url
,则会使用默认网址 /_ah/queue/[QUEUE_NAME]
,其中 [QUEUE_NAME]
是任务队列的名称。
将数据传递给处理程序
您可以将数据作为任务网址中的查询参数传递给处理程序,前提是任务中指定的方法为 GET
或 PULL
。
payload
,该字段在 HTTP 请求的正文中传递任务数据。params
以下三个调用是等价的:
taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')
命名任务
默认情况下,在您创建新任务时,App Engine 会为该任务分配一个唯一的名称。不过,您可以使用 name
参数自行为任务命名。自行为任务命名的好处是,系统会对命名的任务进行去重处理,这意味着您可以使用任务名称来保证任务只会被添加一次。系统会在任务被完成或删除后的 9 天之内继续进行去重处理。
请注意,去重逻辑会产生很大的性能开销,从而导致延迟时间增加,并可能导致与命名任务相关的错误率增加。如果任务名称是按顺序排列的(例如使用时间戳),则这些开销可能会明显增加。因此,如果您自行命名任务,我们建议您为任务名称使用分布合理的前缀,例如内容的哈希值。
如果您自行为任务命名,那么请注意,名称长度不能超过 500 个字符,名称可以包含大写和小写字母、数字、下划线和连字符。
taskqueue.add(url='/url/path', name='first-try')
异步添加任务
默认情况下,向队列添加任务的调用是同步的。对于大多数场景,同步调用都是适用的。将任务添加到队列通常是一项快速操作。一小部分的添加任务操作可能需要比其他操作长很多的时间,但添加任务所需时间的中间值不到 5 毫秒。
不能对不同队列的添加任务操作进行批处理,因此 Task Queue API 还提供了异步调用,使您能够并行添加这些任务,从而进一步缩短此延迟时间。如果要构建一个对延迟时间极其敏感的应用,该应用需要同时对不同队列执行多个添加任务操作,则异步调用会很有用。
如果要对任务队列进行异步调用,请使用 Queue 类和RPC 对象提供的异步方法。对返回的 RPC
对象调用 get_result()
会强制完成请求。在事务中异步添加任务时,您应在提交事务之前先对 RPC
对象调用 get_result()
,以确保请求已完成。
在 Cloud Datastore 事务中将任务加入队列
您可以将任务作为 Datastore 事务的一部分加入队列,以便只有在成功提交事务时才会将任务加入队列,并且在这种情况下保证任务入队。事务中添加的任务被视为事务的组成部分,且具有相同级别的隔离和一致性。
在单个事务期间,应用插入任务队列的事务性任务不能超过五个。事务性任务不能具有用户指定的名称。
以下代码示例显示如何将事务性任务作为 Datastore 事务的一部分插入到推送队列中。
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
@ndb.transactional
def do_something_in_transaction():
taskqueue.add(url='/path/to/my/worker', transactional=True)
#...
do_something_in_transaction()
使用延迟任务库而不是工作器服务
为每个不同的任务设置处理程序(如前面部分所述)可能会很麻烦,为任务序列化和反序列化复杂的参数可能同样麻烦 - 特别是如果您要在队列上运行许多不同但很小的任务时。Python SDK 包括的库 (google.appengine.ext.deferred
) 提供了一个简单的函数,使用它您可绕过设置专用的任务处理程序、序列化和反序列化参数等所有作业。
如需使用此库,您需要将 deferred
内置素添加到 app.yaml
。如需了解详情,请参阅 app.yaml
参考文档的内置处理程序部分。
如需使用 deferred
库,只需将函数及其参数传递给 deferred.defer()
:
import logging
from google.appengine.ext import deferred
def do_something_expensive(a, b, c=None):
logging.info("Doing something expensive!")
# Do your work here
# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)
deferred
库会打包您的函数调用及其参数,然后将其添加到任务队列。执行任务时,deferred
库执行 do_something_expensive("Hello, world!", 42, True)
。
在多租户应用中处理任务
默认情况下,推送队列使用创建任务时在命名空间管理器中设置的当前命名空间。如果应用使用多租户,请参阅 Namespaces Python API。
后续步骤
- 了解如何创建任务处理程序。