创建任务处理程序

本页面介绍了如何创建任务处理程序,即处理推送任务的代码。您必须提供请求处理程序才能处理任务。与任何其他请求处理程序一样,从请求网址到相应处理程序的映射是在服务的 app.yaml 中声明的。由于您可以控制如何将任务请求映射到处理程序,因此可自由地组织您的任务处理程序。如果您的应用要处理许多不同种类的任务,那么您可以将所有处理程序添加到一项服务,也可以将它们分布在多项服务中。

编写推送任务请求处理程序

在队列中,任务队列服务会创建一个 HTTP 标头,然后将其发送到任务目标指定的工作器服务的实例。任务队列请求从 IP 地址 0.1.0.2 发送。

如果处理程序在单独的服务中,则编写处理程序所用的语言不必与创建任务以及将任务排入队列时使用的语言相同。

在编写处理程序时,请遵循以下准则:

  • 代码必须返回介于 200-299 范围之内的 HTTP 状态代码以指示成功。任何其他代码均表示任务失败。

  • 推送任务具有固定的完成截止时间,具体取决于运行它们的服务的扩缩类型。 自动扩缩服务必须在 10 分钟之内完成。手动和基本扩缩服务最长可以运行 24 小时。如果您的处理程序错过了截止时间,则任务队列服务会假定任务失败并重试。

    如果任务的执行时间临近截止时间,App Engine 会在到达截止时间之前引发 DeadlineExceededError(从模块 google.appengine.runtime),以便您保存工作或记录当前进度。

  • 处理程序必须具有幂等性。App Engine 的 Task Queue API 旨在提供“至少一次”传递;也就是说,如果成功添加了某个任务,App Engine 会至少将其传递给处理程序一次。请注意,在极少数情况下,系统可能会多次执行同一任务,因此您的代码必须确保重复执行不会产生任何不好的副作用。

以下示例演示如何从请求中检索整数值,并将该值添加到 Cloud Datastore 中的计数器:


from google.appengine.ext import ndb
import webapp2

COUNTER_KEY = 'default counter'

class Counter(ndb.Model):
    count = ndb.IntegerProperty(indexed=False)

class UpdateCounterHandler(webapp2.RequestHandler):
    def post(self):
        amount = int(self.request.get('amount'))

        # This task should run at most once per second because of the datastore
        # transaction write throughput.
        @ndb.transactional
        def update_counter():
            counter = Counter.get_or_insert(COUNTER_KEY, count=0)
            counter.count += amount
            counter.put()

        update_counter()

app = webapp2.WSGIApplication([
    ('/update_counter', UpdateCounterHandler)
], debug=True)

从任务的网址 /update-counter 到类 UpdateCounterHandler 的映射在 WSGIApplication 内部完成。

worker.yaml 文件将创建名为“worker”的服务,然后将工作器代码添加到该服务中。请注意,由于处理程序的网址指定 login:admin,因此该网址是安全的

runtime: python27
api_version: 1
threadsafe: true
service: worker

handlers:
- url: /.*
  script: worker.app
  login: admin

任务队列使用处理程序响应中的 HTTP 代码来确定任务是否成功。只有任务队列服务可以看到来自处理程序的响应,该响应仅用于确定任务是否成功。队列会忽略响应中的所有其他字段。然后,服务会舍弃响应。源应用绝不接收任何数据。如果某个任务失败,则任务队列服务将通过发送另一个请求来重试该任务。

用户提供的数据可以作为查询字符串或请求正文中的负载传递给请求中的处理程序。创建任务中介绍了插入用户数据的方式。如果请求包含数据,则处理程序必须了解它是如何被插入请求的。用于从请求中提取数据的具体代码取决于您使用的特定 Web 框架。

要测试某个任务处理程序,请以管理员身份登录并在浏览器中访问该处理程序的网址。

读取请求标头

推送任务 HTTP 请求具有由 App Engine 设置的特殊标头,其中包含处理程序可以使用的特定于任务的信息。

如果这些标头出现在应用的外部用户请求中,则会被删除和替换掉。唯一的例外情况是来自应用的已登录管理员的请求,管理员可以出于测试目的而设置标头。另一方面,当您的应用在开发服务器中运行时,标头不会被删除。

来自任务队列的请求将始终包含以下标头:

标头 说明
X-Appengine-QueueName 队列的名称(对于默认推送队列,该名称可能是“default”)。
X-Appengine-TaskName 任务的名称,如果未指定名称,则为系统生成的唯一 ID。
X-Appengine-TaskRetryCount 此任务已经重试的次数。对于第一次尝试,该值为 0。此数字包括由于缺少可用实例导致任务失败而从未到达执行阶段的尝试次数。
X-Appengine-TaskExecutionCount 此任务以前在执行阶段期间失败的次数。此数字不包括由于缺少可用实例而导致的失败次数。
X-Appengine-TaskETA 任务的目标执行时间,自 1970 年 1 月 1 日开始算起,以秒为单位。

如果您的请求处理程序找到上面列出的任何标头,便可以信任该请求是任务队列请求。

此外,来自任务队列的请求可以包含以下标头:

标头 说明
X-Appengine-TaskPreviousResponse 来自上一次重试的 HTTP 响应代码。
X-Appengine-TaskRetryReason 重试任务的原因。
X-Appengine-FailFast 表示任务在现有实例不可用时立即运行失败

保护任务处理程序网址

如果任务执行敏感操作(例如修改数据),您可能希望保护处理程序网址的安全,以防止恶意外部用户直接调用它。您可以通过将访问权限限制在 App Engine 管理员来防止用户访问任务网址。任务请求本身由 App Engine 发布,始终可以定位受限制的网址。

您可以通过向您的 app.yaml 文件中的处理程序配置添加 login: admin 元素来限制网址。

例如:

handlers:
- url: /your-task
  script: worker.app
  login: admin

后续步骤