タスクが pull キューに入ると、ワーカーはそれらをリースできます。タスクが処理されたら、ワーカーはそのタスクを削除する必要があります。
始める前に
注意事項
- このメソッドは、スタンダード環境のサービス内で実行されているワーカーにのみ適用されます。
- pull キューを使用する場合は、処理量に基づいてワーカーの規模を調整する必要があります。
タスクのリース
タスクがキューに入った後、ワーカーは lease_tasks()
メソッドを使用して、1 つまたは複数のタスクをリースできます。add()
を使用して最近追加したタスクが lease_tasks()
で利用可能になるまでに少し時間がかかることがあります。
リースをリクエストするときは、リースするタスクの数(最大 1,000 個)とリースの期間(秒単位で最大 1 週間)を指定します。最も時間のかかるタスクがリースの期限までに確実に終了するように、リース期間を十分長くする必要があります。タスクのリースは modify_task_lease()
を使用して変更できます。
タスクをリースすると、そのタスクはリースの期限まで別のワーカーが処理できなくなります。
lease_tasks()
メソッドは、キューからリースされたタスクのリストを含む Task オブジェクトを返します。
pull-queue
から 1 時間リースします。
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
タスクタグによる一括処理
タスクはすべて同様とは限りません。コードでタスクにタグ付けし、リースするタスクをタグで選択できます。タグがフィルタの役割を果たします。
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
ポーリング レートの規制
タスクをリースするためにキューをポーリングするワーカーは、そのキューからタスクをリースするよりも早くリースしようとしているかどうかを検出する必要があります。この障害が発生した場合に、lease_tasks()
で次の例外が生成されることがあります。
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
これらの例外はコードでキャッチし、lease_tasks()
の呼び出しを取り止めて後で再試行する必要があります。この問題を回避するには、lease_tasks() を呼び出すときに、リモート プロシージャ コールの期限を長く設定することを検討します。リース リクエストによってタスクの空のリストが返された場合も、バックオフする必要があります。
1 つのキューにつき 1 秒あたりに 10 個以上の LeaseTasks リクエストを生成した場合、最初の 10 個のリクエストのみ結果が返されます。リクエストがこの上限を超えている場合は、OK
が返され、結果は返されません。
Google Cloud コンソールでのタスクのモニタリング
アプリケーション内のすべてのタスクおよびキューに関する情報を表示するには、次の手順を行います。
Google Cloud コンソールで [Cloud Tasks] ページを開き、[種類] 列の [pull] の値を確認します。
目的のキューの名前をクリックして、[キューの詳細] ページを開きます。選択したキュー内のすべてのタスクが表示されます。
タスクの削除
ワーカーはタスクを完了したら、そのタスクをキューから削除する必要があります。ワーカーが処理を終了した後にキューにタスクが残っている場合は、ワーカーが失敗したことが考えられます。その場合、そのタスクは別のワーカーによって処理されます。
タスクのリスト(lease_task()
から返されたリストなど)を削除するには、リストを delete_tasks()
に渡します。
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
pull キューのエンドツーエンドの例
Python で pull キューを使用するエンドツーエンドの例については、appengine-pullqueue-counter をご覧ください。