租用提取工作

一旦工作位於提取佇列中,工作站就可以租用這些工作。工作處理完畢後,工作站必須刪除這些工作。

事前準備

須知

  • 此方法只適用在標準環境的服務中執行的工作站。如果想使用標準環境以外的工作站服務來處理工作,請參閱 Cloud Tasks API 的文件
  • 在使用提取佇列時,需由您負責按處理量調度工作站的資源。

租用工作

工作排入佇列後,工作站就可使用 lease_tasks() 方法,租用一個或多個工作。在最近使用 add() 新增的工作透過 lease_tasks() 變為可用狀態之前,可能會出現短暫延遲。

當您要求租用時,需指定租用的工作數 (上限為 1,000 項工作),以及以秒為單位的租用期間 (時間上限為一週)。租用期間必須夠長,才能確保最慢的工作在租期結束前也有時間完成。您可以使用 modify_task_lease() 修改工作租用。

租用工作時,直到租期結束為止,其他工作站皆無法使用該工作。

lease_tasks() 方法會傳回 Task 物件,內有佇列租用工作的清單。

下列程式碼範例會從佇列 pull-queue 中租用 100 項工作一小時。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

使用工作標記進行批次處理

並非所有的工作都相同,您的程式碼可以「標記」工作,然後依照標記來選擇要租用的工作。標記的用途和篩選器相同。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
                                # Tag is that of "oldest" task by eta.

管理輪詢率

會輪詢工作佇列來租用工作的工作站,應能偵測本身嘗試租用工作的速度是否高於佇列所能提供的速度。如果未做到,lease_tasks() 會產生下列幾種例外狀況:

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


您的程式碼必須能掌握這些例外狀況,停止呼叫 lease_tasks(),並稍後再嘗試一次。為了避免出現這種問題,在呼叫 lease_tasks() 時,請考慮將 RPC 期限設高一點。當租用要求傳回空白工作清單時,建議也跟著停止作業。

如果產生出每佇列每秒大於 10 個 LeaseTasks 要求,則只有前 10 個要求會傳回結果。如果要求超過此上限,OK 傳回的結果為零。

在 GCP 主控台上監控工作

如何查看應用程式中所有工作和佇列的相關資訊:

  1. 開啟 GCP 主控台中的「工作佇列」頁面,在頁面頂端的選單列中選取 [提取佇列] 分頁標籤。

    前往 [提取佇列] 分頁標籤

  2. 按一下您需要的佇列名稱,開啟該佇列的詳細資料頁面。該頁面會隨即顯示所選佇列內的全部工作。

刪除工作

工作站完成任務後,即需從佇列中刪除該項工作。如果在工作站完成工作後,仍可在佇列中看見這些工作,則可能是工作站作業失敗,在這種情況下,該工作會交由其他工作站處理。

只需將工作清單傳送到 delete_tasks(),即可將清單刪除,例如 lease_task() 傳回的工作清單。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

提取佇列的端對端範例

如需在 Python 中使用提取佇列的端對端完整簡明範例,請參閱 appengine-pullqueue-counter

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
App Engine standard environment for Python 2