Pull タスクをリースする

タスクが pull キューに入ると、ワーカーはそれらをリースできます。タスクが処理されたら、ワーカーはそのタスクを削除する必要があります。

始める前に

注意事項

  • このメソッドは、スタンダード環境のサービス内で実行されているワーカーにのみ適用されます。
  • pull キューを使用する場合は、処理量に基づいてワーカーの規模を調整する必要があります。

タスクのリース

タスクがキューに入った後、ワーカーは lease_tasks() メソッドを使用して、1 つまたは複数のタスクをリースできます。add() を使用して最近追加したタスクが lease_tasks() で利用可能になるまでに少し時間がかかることがあります。

リースをリクエストするときは、リースするタスクの数(最大 1,000 個)とリースの期間(秒単位で最大 1 週間)を指定します。最も時間のかかるタスクがリースの期限までに確実に終了するように、リース期間を十分長くする必要があります。タスクのリースは modify_task_lease() を使用して変更できます。

タスクをリースすると、そのタスクはリースの期限まで別のワーカーが処理できなくなります。

lease_tasks() メソッドは、キューからリースされたタスクのリストを含む Task オブジェクトを返します。

次のコードサンプルでは、100 個のタスクをキュー pull-queue から 1 時間リースします。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

タスクタグによる一括処理

タスクはすべて同様とは限りません。コードでタスクにタグ付けし、リースするタスクをタグで選択できます。タグがフィルタの役割を果たします。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

ポーリング レートの規制

タスクをリースするためにキューをポーリングするワーカーは、そのキューからタスクをリースするよりも早くリースしようとしているかどうかを検出する必要があります。この障害が発生した場合に、lease_tasks() で次の例外が生成されることがあります。

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


これらの例外はコードでキャッチし、lease_tasks() の呼び出しを取り止めて後で再試行する必要があります。この問題を回避するには、lease_tasks() を呼び出すときに、リモート プロシージャ コールの期限を長く設定することを検討します。リース リクエストによってタスクの空のリストが返された場合も、バックオフする必要があります。

1 つのキューにつき 1 秒あたりに 10 個以上の LeaseTasks リクエストを生成した場合、最初の 10 個のリクエストのみ結果が返されます。リクエストがこの上限を超えている場合は、OK が返され、結果は返されません。

Google Cloud コンソールでのタスクのモニタリング

アプリケーション内のすべてのタスクおよびキューに関する情報を表示するには、次の手順を行います。

  1. Google Cloud コンソールで [Cloud Tasks] ページを開き、[種類] 列の [pull] の値を確認します。

    [Cloud Tasks] に移動

  2. 目的のキューの名前をクリックして、[キューの詳細] ページを開きます。選択したキュー内のすべてのタスクが表示されます。

タスクの削除

ワーカーはタスクを完了したら、そのタスクをキューから削除する必要があります。ワーカーが処理を終了した後にキューにタスクが残っている場合は、ワーカーが失敗したことが考えられます。その場合、そのタスクは別のワーカーによって処理されます。

タスクのリスト(lease_task() から返されたリストなど)を削除するには、リストを delete_tasks() に渡します。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

pull キューのエンドツーエンドの例

Python で pull キューを使用するエンドツーエンドの例については、appengine-pullqueue-counter をご覧ください。