pull タスクをリースする

タスクが pull キューに入ると、ワーカーはそれらをリースすることができます。タスクが処理されたら、ワーカーはそのタスクを削除する必要があります。

始める前に

理解しておくべきこと

  • このメソッドは、スタンダード環境のサービス内で実行されているワーカーにのみ適用されます。スタンダード環境にないワーカー サービスを使用してタスクを処理する場合は、Cloud Tasks API のドキュメントをご覧ください。
  • pull キューを使用する場合は、処理量に基づいてワーカーの規模を調整する必要があります。

タスクをリースする

タスクがキューに入った後、ワーカーは lease_tasks() メソッドを使用して、それらの 1 つまたは複数をリースできます。add() を使用して追加されたばかりのタスクが lease_tasks() で利用できるようになるまでに、短時間の遅延が生じる場合があります。

リースをリクエストするときは、リースするタスクの数(最大 1,000 個)とリースの期間(秒単位で最大 1 週間)を指定します。最も時間のかかるタスクがリースの期限までに確実に終了するように、リースの期間は十分長くする必要があります。タスクのリースは modify_task_lease() を使用して変更できます。

タスクをリースすると、そのタスクはリースの期限まで別のワーカーが処理できなくなります。

lease_tasks() メソッドは、キューからリースされたタスクのリスクを含む Task オブジェクトを返します。

次のコードサンプルでは、100 個のタスクをキュー pull-queue から 1 時間リースします。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

タスクタグによる一括処理

タスクはすべて同様とは限りません。コードでタスクにタグ付けし、リースするタスクをタグで選択できます。タグがフィルタの役割を果たします。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
                                # Tag is that of "oldest" task by eta.

ポーリング レートの規制

タスクをリースするためにキューをポーリングするワーカーは、そのキューからタスクをリースするよりも早くリースできるかどうかを検出する必要があります。この障害が発生した場合に、lease_tasks() で次の例外が生成されることがあります。

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


ユーザーのコードでは、これらの例外をキャッチし、lease_tasks() 呼び出しをバックオフして後で再試行する必要があります。この問題を回避するには、lease_tasks() を呼び出すときに、リモート プロシージャ コールの期限を長く設定することを検討します。リース リクエストによってタスクの空のリストが返された場合にも、バックオフが必要です。

1 つのキューにつき 1 秒あたりに 10 個以上の LeaseTasks リクエストを生成した場合、最初の 10 個のリクエストのみ結果が返されます。リクエストがこの制限を超えている場合は、OK が返され、結果は 0 です。

GCP Console でのタスクのモニタリング

アプリケーション内のすべてのタスクおよびキューに関する情報を表示するには、次の手順を実行します。

  1. GCP Console で [タスクキュー] ページを開き、ページの上部にあるメニューバーで [pull キュー] タブを選択します。

    [pull キュー] タブに移動する

  2. 目的のキューの名前をクリックして、キューの詳細ページを開きます。選択したキュー内のすべてのタスクが表示されます。

タスクを削除する

ワーカーはタスクを完了したら、そのタスクをキューから削除する必要があります。ワーカーが処理を終了した後にキューにタスクが残っている場合は、ワーカーが失敗したことが考えられます。その場合、そのタスクは別のワーカーによって処理されます。

タスクのリスト(lease_task() から返されたリストなど)を削除するには、リストを delete_tasks() に渡します。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

pull キューのエンドツーエンドの例

Python で pull キューを使用するエンドツーエンドの例については、appengine-pullqueue-counter をご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Python の App Engine スタンダード環境