建立發送工作

本頁說明如何建立工作並加入發送佇列。當您要處理工作時,必須先建立新的工作物件並加入佇列。您可以明確指定用於處理工作的服務和處理常式,然後選擇將工作特定資料傳送給處理常式;也可以微調工作的設定,例如安排未來的執行時間,或限制工作失敗時的重試次數。

建立新工作

如要建立工作並加入佇列,請呼叫 taskqueue 函式。下列指令碼會建立一個將名為 worker 的服務做為目標的工作,並將網址設為 /update-counter 以叫用處理常式:

class EnqueueTaskHandler(webapp2.RequestHandler):
    def post(self):
        amount = int(self.request.get('amount'))

        task = taskqueue.add(
            url='/update_counter',
            target='worker',
            params={'amount': amount})

        self.response.write(
            'Task {} enqueued, ETA {}.'.format(task.name, task.eta))

您也可以建立工作物件並呼叫物件的 add() 方法。

指定工作站服務

工作離開佇列時,工作佇列服務會將工作傳送至工作站服務。每個工作都有「目標」和「網址」,用於判定最後執行工作的服務和處理常式。

target

目標會指定用於接收 HTTP 要求以執行工作的服務。這是字串,會以任何一種標準格式來指定服務/版本/執行個體。最常使用的格式如下:

    service
    version.service
    instance.version.service

目標字串會加在應用程式網域名稱的前面。有三種設定工作目標的方式:

  • 在建構工作時宣告目標。您可以使用 taskqueue.add() 函式的 target 參數明確設定目標,請參閱上方範例。

  • queue.yaml 中定義佇列時,加入 target 指令,就和在 queue-blue定義中一樣。所有使用 target 加入佇列的工作都會使用這個目標,即使建構工作時指派了不同的目標也是如此。

  • 如果並未以上述任一方法指定目標,則工作的目標即是將工作加入佇列的服務版本。請注意,如果您以此方式使用預設的服務和版本,將工作加入佇列,而預設版本在工作執行之前有所變更,則工作會以新的預設版本執行。

url

url 會在目標服務中選取一個處理常式,由該處理常式執行工作。

url 應會符合目標服務中的其中一個處理常式網址模式。如果工作中指定的方法是 GETPULL,則 url 可以包含查詢參數。如果並未指定任何 url,則會使用預設的網址 /_ah/queue/[QUEUE_NAME],其中 [QUEUE_NAME] 是工作佇列的名稱。

將資料傳送至處理常式

可以將資料做為工作網址中的查詢參數,將資料傳送至處理常式,但只有在工作中指定的方法為 GETPULL 時,才能使用這種方式。

您也可以使用 payloadparams 欄位,將資料新增到工作中。

以下三種呼叫方式具相同效果:

taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')

注意:

  • 酬載會於 HTTP 要求的內文中提交。
  • 如果使用含酬載的 HTTP POST 方法,或使用 HTTP GET 方法且加入了查詢參數的網址,請勿指定參數。

命名工作

建立新工作時,根據預設,App Engine 會指派一個不重複的名稱給工作。但可以使用 name 參數自行指派工作名稱。指派自己的工作名稱有個優點:名稱相同的工作會遭到排除,也就是可以使用工作名稱來確保每項工作僅會加入一次。排除重複作業會在工作完成或刪除後持續 9 天。

請注意,排除重複邏輯會對效能造成大量負擔,加重延遲情形,且可能會提高命名工作相關的錯誤率。如果工作名稱採循序方式 (例如附有時間戳記),則這些成本還可能會大幅增加。所以,如果您自行指派名稱,建議您在工作名稱中使用分佈均勻的字首,例如內容的雜湊。

如果您要自行指派工作名稱,請注意,名稱的長度上限為 500 個字元,名稱可使用大小寫字母、數字、底線和連字號。

taskqueue.add(url='/url/path', name='first-try')

非同步新增工作

預設情況下,新增工作至佇列的呼叫會同步執行。在大多數情況下,同步呼叫皆能正常執行。新增工作至佇列的作業通常很快速。少數新增工作的作業可能需要較長的時間,但新增工作所需時間的中位數少於 5 毫秒。

由於無法批次新增任務至不同佇列,因而工作佇列 API 也提供非同步呼叫,讓使用者得以平行新增工作,進而盡可能縮短延遲時間。如果您建構的應用程式對延遲極其敏感,且需要同時對不同佇列執行新增工作的作業,就很適合。

如果想對同一工作佇列執行非同步呼叫,請使用佇列類別和 RPC 物件提供的非同步方法。請在傳回的 RPC 物件上呼叫 get_result(),以強制完成要求。在同一交易中非同步新增工作時,可在執行交易前對 RPC 物件呼叫 get_result(),以確保要求已完成。

在 Cloud Datastore 交易中將工作排入佇列

您可以將工作排入佇列,做為 Cloud Datastore 交易的一部分,如此只會在交易成功獲得認可時,才將工作排入佇列,並確保工作順利新增。加入到交易中的工作會被視為交易的一部分,並且具有相同層級的隔離與一致性

應用程式無法在單一交易期間將超過五個的交易工作插入到工作佇列。交易工作不能有使用者指定的名稱。

以下程式碼範例示範如何將交易工作插入到發送佇列中,成為 Cloud Datastore 交易的一部分:

from google.appengine.api import taskqueue
from google.appengine.ext import ndb

@ndb.transactional
def do_something_in_transaction():
  taskqueue.add(url='/path/to/my/worker', transactional=True)
  #...

do_something_in_transaction()

使用延遲工作程式庫而非工作站服務

要為每個不同的工作設定處理常式 (如前幾節所述) 可能相當麻煩,因為可能將工作的複雜引數序列化或取消序列化,特別是當有各式各樣的小工作要在佇列中執行時,更是個大工程。Python SDK 內有 google.appengine.ext.deferred 資料庫,提供了一個簡單的函式,可略過所有設定專用工作處理常式以即將參數序列化和取消序列化的工作。

要使用此程式庫,必須將 deferred 內建項目新增至 app.yaml。如需更多資訊,請參閱 app.yaml 參考資料的內建處理常式一節

如果要使用 deferred 程式庫,只需將函式及引數傳遞至 deferred.defer() 即可:

import logging

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)

deferred 程式庫會將函式呼叫及呼叫的引數封裝起來,然後新增至工作佇列。執行工作時,deferred 程式庫會執行 do_something_expensive("Hello, world!", 42, True)

如需 Python 中 deferred 程式庫使用上的更多資訊,請參閱 使用延遲程式庫的背景工作

在多租戶應用程式中處理工作

預設情況下,發送佇列會使用建立工作時在命名空間管理員中設定的最新命名空間。如果您的應用程式使用多租戶架構,請參閱命名空間 Python API 一文。

後續步驟

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
App Engine standard environment for Python 2