建立發送工作

本頁說明如何建立工作並加入發送佇列。當您要處理工作時,必須先建立新的工作物件並加入佇列。您可以明確指定用於處理工作的服務和處理常式,然後選擇將工作特定資料傳送給處理常式;也可以微調工作的設定,例如安排未來的執行時間,或限制工作失敗時的重試次數。

建立新工作

如要建立工作並將工作加入佇列,請使用 QueueFactory 取得 Queue,並呼叫其 add() 方法。您可以使用原廠的 getQueue() 方法取得 queue.xml 檔案中指定的具名佇列,或使用 getDefaultQueue() 取得預設佇列。您可以使用由 TaskOptions.Builder 產生的 TaskOptions 執行個體呼叫 Queueadd() 方法,也可以呼叫此方法而不搭配使用引數,使用預設選項為佇列建立工作。

import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
Queue queue = QueueFactory.getDefaultQueue();
queue.add(TaskOptions.Builder.withUrl("/worker").param("key", key));

指定工作站服務

工作離開佇列時,工作佇列服務會將工作傳送至工作站服務。每個工作都有「目標」和「網址」,用於判斷最後執行工作的服務和處理常式。

target

目標會指定用於接收 HTTP 要求以執行工作的服務。這是字串,會以任何一種標準格式來指定服務/版本/執行個體。最常使用的格式如下:

    service
    version.service
    instance.version.service

系統會在應用程式的網域名稱前面添加目標字串。為工作設定目標的方法有三種:

  • 建構工作時宣告目標。 您可以在建立工作時明確設定目標,方法是使用 TaskOptions 設定 Host 標頭:

    taskOptions.header("Host", versionHostname)
    

  • queue.xml 中定義佇列時加入 target 指令,如 queue-blue定義所示。所有使用 target 加入佇列的工作都會使用這個目標,即使建構工作時指派了不同的目標也是如此。

  • 如果並未以上述任一方法指定目標,則工作的目標即是將工作加入佇列的服務版本。 請注意,如果您以此方式使用預設的服務和版本,將工作加入佇列,而預設版本在工作執行之前有所變更,則工作會以新的預設版本執行。

url

url 會選取目標服務的其中一個處理常式來執行工作。

url 必須與目標服務的其中一個處理常式網址格式相符。如果工作中指定的方法為 GETPULL,則 url 可以包含查詢參數。如果並未指定任何 url,則會使用預設網址 /_ah/queue/[QUEUE_NAME],其中 [QUEUE_NAME] 是工作佇列的名稱。

將資料傳送至處理常式

您可以透過工作網址中的查詢參數,將資料傳送至處理常式,但只有在工作中指定的方法為 GETPULL,才能使用這種方式。

TaskOptions.Builder 建構函式提供多種方法,可將資料新增為 HTTP 要求的酬載,以及將資料新增為參數,進而以查詢參數形式加入網址中。

params
如果使用 POST 方法搭配酬載,或使用 GET 方法並且在網址中加入查詢參數時,請勿指定參數。

命名工作

根據預設,在您建立新工作時,App Engine 會為工作指派不重複的名稱。不過,您可以使用 name 參數自行為工作指派名稱。指派自己的工作名稱有個優點:名稱相同的工作會遭到排除,也就是可以使用工作名稱來確保每項工作僅會加入一次。排除重複作業會在工作完成或刪除後持續 9 天。

請注意,排除重複邏輯會對效能造成大量負擔,加重延遲情形,且可能會提高命名工作相關的錯誤率。如果工作名稱採循序方式 (例如附有時間戳記),則這些成本還可能會大幅增加。因此,如果您自行指派名稱,建議您在工作名稱中使用分佈均勻的字首,例如內容的雜湊。

如果您要自行指派工作名稱,請注意,名稱的長度上限為 500 個字元,名稱可使用大小寫字母、數字、底線和連字號。

非同步新增工作

預設情況下,新增工作至佇列的呼叫會同步執行。在大多數情況下,同步呼叫皆能正常執行。新增工作至佇列的作業通常很快速。少數新增工作的作業可能需要較長的時間,但新增工作所需時間的中位數少於 5 毫秒。

由於無法批次新增任務至不同佇列,因而 Task Queue API 也提供非同步呼叫,讓使用者得以平行新增工作,進而盡可能縮短延遲時間。如果您建構的應用程式對延遲極其敏感,且需要同時對不同佇列執行新增工作的作業,此方法會相當實用。

如果想對同一工作佇列執行非同步呼叫,請使用佇列類別所提供的非同步方法。請在傳回的 Future 物件上呼叫 get,以強制完成要求。在同一交易中非同步新增工作時,可在修訂交易前在 Future 上呼叫 get(),以確保要求已完成。

在 Cloud Datastore 交易中將工作排入佇列

您可以將工作排入佇列做為 Datastore 交易的一部分,以便只在成功修訂交易時才將工作排入佇列,並確保工作已順利新增。加入到交易中的工作會視為交易的一部分,並且具有相同層級的隔離與一致性

應用程式無法在單一交易期間將超過五個的交易工作插入到工作佇列。交易工作不能有使用者指定的名稱。

以下程式碼範例示範如何將交易工作插入到發送佇列中,成為 Datastore 交易的一部分:

DatastoreService ds = DatastoreServiceFactory.getDatastoreService();
Queue queue = QueueFactory.getDefaultQueue();
try {
    Transaction txn = ds.beginTransaction();

    // ...

    queue.add(TaskOptions.Builder.withUrl("/path/to/my/worker"));

    // ...
    txn.commit();
} catch (DatastoreFailureException e) {
}

使用 DeferredTask 而非工作站服務

要為每個不同的工作設定處理常式 (如前幾節所述) 可能相當麻煩,因為這可能會將工作的複雜引數序列化或取消序列化,特別是當有各種不同的小工作要在佇列中執行時,更是一大工程。Java SDK 包含一個名為 DeferredTask 的介面,可讓您將工作定義為單一方法。這個介面使用 Java 序列化作業,將某個單位的作業封裝成工作佇列。只要從該方法返回,就會視為執行成功;從該方法擲回任何例外狀況,則視為執行失敗。


/** A hypothetical expensive operation we want to defer on a background task. */
public static class ExpensiveOperation implements DeferredTask {

  @Override
  public void run() {
    System.out.println("Doing an expensive operation...");
    // expensive operation to be backgrounded goes here
  }
}

/**
 * Basic demonstration of adding a deferred task.
 *
 * @param request servlet request
 * @param resp servlet response
 */
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse resp)
    throws IOException {
  // Add the task to the default queue.
  Queue queue = QueueFactory.getDefaultQueue();

  // Wait 5 seconds to run for demonstration purposes
  queue.add(
      TaskOptions.Builder.withPayload(new ExpensiveOperation())
          .etaMillis(System.currentTimeMillis() + DELAY_MS));

  resp.setContentType("text/plain");
  resp.getWriter().println("Task is backgrounded on queue!");
}

在多租戶應用程式中處理工作

根據預設,發送佇列會使用工作建立時在命名空間管理員中設定的目前命名空間。如果您的應用程式使用多租戶架構,請參閱 命名空間 Java API

後續步驟