创建推送任务

本页面介绍如何创建任务并将其放入推送队列。如果您要处理任务,必须创建一个新的任务对象,并将其放到队列中。您可以明确指定用于处理任务的服务和处理程序,并且可以选择将特定于任务的数据传递给处理程序。您还可以微调任务的配置,例如,安排将来执行任务的时间,或者限制失败时希望重试任务的次数。

新建任务

要创建任务并将其加入队列,请调用 taskqueue.add() 函数。 以下代码创建一个以名为 worker 的服务为目标的任务,并通过设置网址 /update-counter 来调用其处理程序:

class EnqueueTaskHandler(webapp2.RequestHandler):
    def post(self):
        amount = int(self.request.get('amount'))

        task = taskqueue.add(
            url='/update_counter',
            target='worker',
            params={'amount': amount})

        self.response.write(
            'Task {} enqueued, ETA {}.'.format(task.name, task.eta))

您也可以创建一个 Task 对象并调用其 add() 方法。

指定工作器服务

当任务从队列中弹出时,任务队列服务会将该任务发送到工作器服务。每个任务都包含一个目标和一个网址,用于确定最终执行任务的服务和处理程序。

target

目标用于指定将接收 HTTP 请求来执行任务的服务。目标是一个字符串,用于以任意一种规范化格式指定服务/版本/实例。最常用的规范格式有:

    service
    version.service
    instance.version.service

目标字符串将添加到应用的域名前面。

  • 在构造任务时声明目标。 您可以使用 taskqueue.add() 函数中的 target 参数明确设置目标。请参阅上文中的示例。

  • queue.yaml 中定义队列时加入 target 指令,如 queue-blue定义所述。添加到具有 target 的队列的所有任务都将使用该目标,即便在构建任务时为任务分配了其他目标也是如此。

  • 如果没有采用前面两种方法中的任何一种指定目标,则任务的目标便是将任务加入队列的服务版本。请注意,如果以这种方式从默认服务和版本中将任务加入队列,而在任务执行之前默认版本发生了更改,则该任务将在新的默认版本中运行。

url

url 用来选择目标服务中的一个处理程序来执行任务。

url 应与目标服务中的一种处理程序网址格式相匹配。如果任务中指定的方法是 GETPULL,则 url 可以包含查询参数。如果未指定 url,则会使用默认网址 /_ah/queue/[QUEUE_NAME],其中 [QUEUE_NAME] 是任务队列的名称。

将数据传递给处理程序

您可以将数据作为任务网址中的查询参数传递给处理程序,前提是任务中指定的方法为 GETPULL

您还可以使用以下任何字段向任务中添加数据:

  • payload,该字段在 HTTP 请求的正文中传递任务数据。
  • params

以下三个调用是等价的:

taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')

命名任务

默认情况下,在您创建新任务时,App Engine 会为该任务分配一个唯一的名称。不过,您可以使用 name 参数自行为任务命名。自行为任务命名的好处是,系统会对命名的任务进行去重处理,这意味着您可以使用任务名称来保证任务只会被添加一次。系统会在任务被完成或删除后的 9 天之内继续进行去重处理。

请注意,去重逻辑会产生很大的性能开销,从而导致延迟时间增加,并可能导致与命名任务相关的错误率增加。如果任务名称是按顺序排列的(例如使用时间戳),则这些开销可能会明显增加。因此,如果您自行命名任务,我们建议您为任务名称使用分布合理的前缀,例如内容的哈希值。

如果您自行为任务命名,那么请注意,名称长度不能超过 500 个字符,名称可以包含大写和小写字母、数字、下划线和连字符。

taskqueue.add(url='/url/path', name='first-try')

异步添加任务

默认情况下,向队列添加任务的调用是同步的。对于大多数场景,同步调用都是适用的。将任务添加到队列通常是一项快速操作。一小部分的添加任务操作可能需要比其他操作长很多的时间,但添加任务所需时间的中间值不到 5 毫秒。

不能对不同队列的添加任务操作进行批处理,因此 Task Queue API 还提供了异步调用,使您能够并行添加这些任务,从而进一步缩短此延迟时间。如果要构建一个需要同时向不同队列添加多个任务、对延迟时间极其敏感的应用,异步调用会很有用。

如果要对任务队列进行异步调用,请使用 Queue 类和远程过程调用 (RPC) 对象提供的异步方法。对返回的 RPC 对象调用 get_result() 会强制完成请求。在事务中异步添加任务时,您应在提交事务之前先对 RPC 对象调用 get_result(),以确保请求已完成。

在 Cloud Datastore 事务中将任务加入队列

您可以将任务作为 Datastore 事务的一部分加入队列,以便只有在成功提交事务时才会将任务加入队列,并且在这种情况下保证任务入队。事务中添加的任务被视为事务的组成部分,且具有相同级别的隔离和一致性

在单个事务期间,应用插入任务队列的事务性任务不能超过五个。事务性任务不能具有用户指定的名称。

以下代码示例显示如何将事务性任务作为 Datastore 事务的一部分插入到推送队列中。

from google.appengine.api import taskqueue
from google.appengine.ext import ndb

@ndb.transactional
def do_something_in_transaction():
  taskqueue.add(url='/path/to/my/worker', transactional=True)
  #...

do_something_in_transaction()

使用延迟任务库而不是工作器服务

为每个不同的任务设置处理程序(如前面部分所述)可能会很麻烦,为任务序列化和反序列化复杂的参数可能同样麻烦 - 特别是如果您要在队列上运行许多不同但很小的任务时。Python SDK 包括的库 (google.appengine.ext.deferred) 提供了一个简单的函数,使用它您可绕过设置专用的任务处理程序、序列化和反序列化参数等所有作业。

如需使用此库,您需要将 deferred 内置素添加到 app.yaml。如需了解详情,请参阅 app.yaml 参考文档的内置处理程序部分

如需使用 deferred 库,只需将函数及其参数传递给 deferred.defer()

import logging

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)

deferred 库会打包您的函数调用及其参数,然后将其添加到任务队列。执行任务时,deferred 库执行 do_something_expensive("Hello, world!", 42, True)

在多租户应用中处理任务

默认情况下,推送队列使用创建任务时在命名空间管理器中设置的当前命名空间。如果应用使用多租户,请参阅 Namespaces Python API

后续步骤