创建推送任务

本页面介绍如何创建任务并将其放入推送队列。如果您要处理任务,必须创建一个新的任务对象,并将其放到队列中。您可以明确指定用于处理任务的服务和处理程序,并且可以选择将特定于任务的数据传递给处理程序。您还可以微调任务的配置,例如,安排将来执行任务的时间,或者限制失败时希望重试任务的次数。

新建任务

如需创建任务并将其加入队列,请使用 QueueFactory 获取 Queue,并调用其 add() 方法。您可以使用工厂的 getQueue() 方法获取 queue.xml 文件中指定的命名队列,也可以使用 getDefaultQueue() 获取默认队列。您可以使用 TaskOptions 实例(由 TaskOptions.Builder 生成)调用 Queueadd() 方法,也可以不带任何参数调用该方法,以便利用队列的默认选项创建任务。

import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
Queue queue = QueueFactory.getDefaultQueue();
queue.add(TaskOptions.Builder.withUrl("/worker").param("key", key));

指定工作器服务

当任务从队列中弹出时,任务队列服务会将该任务发送到工作器服务。每个任务都包含一个目标和一个网址,用于确定最终执行任务的服务和处理程序。

target

目标用于指定将接收 HTTP 请求来执行任务的服务。目标是一个字符串,用于以任意一种规范化格式指定服务/版本/实例。最常用的规范格式有:

    service
    version.service
    instance.version.service

目标字符串将添加到应用的域名前面。

  • 在构建任务时声明目标。通过使用 TaskOptions 设置 Host 标头,可以在创建任务时明确设置目标:

    taskOptions.header("Host", versionHostname)
    

  • queue.xml 中定义队列时加入 target 指令,如 queue-blue定义所述。添加到具有 target 的队列的所有任务都将使用该目标,即便在构建任务时为任务分配了其他目标也是如此。

  • 如果没有采用前面两种方法中的任何一种指定目标,则任务的目标便是将任务加入队列的服务版本。请注意,如果以这种方式从默认服务和版本中将任务加入队列,而在任务执行之前默认版本发生了更改,则该任务将在新的默认版本中运行。

url

url 用来选择目标服务中的一个处理程序来执行任务。

url 应与目标服务中的一种处理程序网址格式相匹配。如果任务中指定的方法是 GETPULL,则 url 可以包含查询参数。如果未指定 url,则会使用默认网址 /_ah/queue/[QUEUE_NAME],其中 [QUEUE_NAME] 是任务队列的名称。

将数据传递给处理程序

您可以将数据作为任务网址中的查询参数传递给处理程序,前提是任务中指定的方法为 GETPULL

TaskOptions.Builder 构造函数包含多种方法,可添加数据作为 HTTP 请求的负载,以及添加数据作为参数(以查询参数的形式添加到网址)。

params
如果您使用 POST 方法以及负载,或者如果您使用 GET 方法并且已经包含带有查询参数的网址,请不要指定参数。

命名任务

默认情况下,在您创建新任务时,App Engine 会为该任务分配一个唯一的名称。不过,您可以使用 name 参数自行为任务命名。自行为任务命名的好处是,系统会对命名的任务进行去重处理,这意味着您可以使用任务名称来保证任务只会被添加一次。系统会在任务被完成或删除后的 9 天之内继续进行去重处理。

请注意,去重逻辑会产生很大的性能开销,从而导致延迟时间增加,并可能导致与命名任务相关的错误率增加。如果任务名称是按顺序排列的(例如使用时间戳),则这些开销可能会明显增加。因此,如果您自行命名任务,我们建议您为任务名称使用分布合理的前缀,例如内容的哈希值。

如果您自行为任务命名,那么请注意,名称长度不能超过 500 个字符,名称可以包含大写和小写字母、数字、下划线和连字符。

异步添加任务

默认情况下,向队列添加任务的调用是同步的。对于大多数场景,同步调用都是适用的。将任务添加到队列通常是一项快速操作。一小部分的添加任务操作可能需要比其他操作长很多的时间,但添加任务所需时间的中间值不到 5 毫秒。

不能对不同队列的添加任务操作进行批处理,因此 Task Queue API 还提供了异步调用,使您能够并行添加这些任务,从而进一步缩短此延迟时间。如果要构建一个需要同时向不同队列添加多个任务、对延迟时间极其敏感的应用,异步调用会很有用。

如果要对任务队列进行异步调用,请使用 Queue 类提供的异步方法。对返回的 Future 对象调用 get 会强制完成请求。在事务中异步添加任务时,您应在提交事务之前先对 Future 调用 get(),以确保请求已完成。

在 Cloud Datastore 事务中将任务加入队列

您可以将任务作为 Datastore 事务的一部分加入队列,以便只有在成功提交事务时才会将任务加入队列,并且在这种情况下保证任务入队。事务中添加的任务被视为事务的组成部分,且具有相同级别的隔离和一致性

在单个事务期间,应用插入任务队列的事务性任务不能超过五个。事务性任务不能具有用户指定的名称。

以下代码示例显示如何将事务性任务作为数据存储区事务的一部分插入到推送队列中。

DatastoreService ds = DatastoreServiceFactory.getDatastoreService();
Queue queue = QueueFactory.getDefaultQueue();
try {
    Transaction txn = ds.beginTransaction();

    // ...

    queue.add(TaskOptions.Builder.withUrl("/path/to/my/worker"));

    // ...
    txn.commit();
} catch (DatastoreFailureException e) {
}

使用 DeferredTask 而不是工作器服务

为每个不同的任务设置处理程序(如前面部分所述)可能会很麻烦,为任务序列化和反序列化复杂的参数可能同样麻烦 - 特别是如果您要在队列上运行许多不同但很小的任务时。Java SDK 包含一个名为 DeferredTask 的接口。使用此接口,您可以将任务定义为单个方法。此接口使用 Java 序列化将一组工作打包至一个任务队列。该方法如果正常返回,则视为成功。如果该方法抛出任何异常,则视为失败。


/** A hypothetical expensive operation we want to defer on a background task. */
public static class ExpensiveOperation implements DeferredTask {

  @Override
  public void run() {
    System.out.println("Doing an expensive operation...");
    // expensive operation to be backgrounded goes here
  }
}

/**
 * Basic demonstration of adding a deferred task.
 *
 * @param request servlet request
 * @param resp servlet response
 */
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse resp)
    throws IOException {
  // Add the task to the default queue.
  Queue queue = QueueFactory.getDefaultQueue();

  // Wait 5 seconds to run for demonstration purposes
  queue.add(
      TaskOptions.Builder.withPayload(new ExpensiveOperation())
          .etaMillis(System.currentTimeMillis() + DELAY_MS));

  resp.setContentType("text/plain");
  resp.getWriter().println("Task is backgrounded on queue!");
}

在多租户应用中处理任务

默认情况下,推送队列使用创建任务时在命名空间管理器中设置的当前命名空间。如果应用使用多租户,请参阅 Namespaces Java 8 API

后续步骤