このページでは、タスクを作成して push キューに配置する方法を説明します。タスクを処理する場合は、新しいタスク オブジェクトを作成してキューに配置する必要があります。タスクを処理するサービスとハンドラを明示的に指定し、必要に応じてタスク固有のデータをハンドラに渡すこともできます。また、タスクを実行するタイミングのスケジューリング、タスクが失敗した場合の再試行回数の制限など、タスクの構成を微調整することもできます。
新しいタスクを作成する
タスクを作成し、キューに入れるには、taskqueue.add() 関数を呼び出します。次のコードでは worker
という名前のサービスを対象とするタスクを作成し、URL /update-counter
を設定してハンドラを呼び出します。
または、Task オブジェクトを作成して、そのオブジェクトの add() メソッドを呼び出すこともできます。
ワーカー サービスを指定する
タスクがキューから取り出される(ポップ)と、タスクはタスクキュー サービスによってワーカー サービスに送信されます。いずれのタスクにもターゲットと url があり、どのサービスとハンドラがそのタスクを実行するかがこれらによって決まります。
target
ターゲットは、どのサービスがタスク実行の HTTP リクエストを受け取るかを指定します。これは文字列であり、サービス / バージョン / インスタンスを正規形式のいずれかで指定します。よく使用される形式は次のとおりです。
service
version.service
instance.version.service
ターゲット文字列の前には、アプリのドメイン名が付加されます。タスクのターゲットを設定するには、次の 3 つの方法があります。
タスクを作成するときにターゲットを宣言します。taskqueue.add() 関数の
target
パラメータを使用して、ターゲットを明示的に設定できます。上記の例をご覧ください。queue.yaml
でキューを定義するときに、target
ディレクティブを指定します(queue-blue
の定義をご覧ください)。target
が指定されているキューに追加されるタスクはすべて、そのターゲットを使用します。タスクの作成時に別のターゲットが割り当てられていても無視されます。上記の 2 つの方法のいずれかでターゲットが指定されていない場合、タスクをキューに追加したサービスのバージョンが、そのタスクのターゲットになります。この方法でタスクをデフォルトのサービスとバージョンからキューに追加した場合、タスクの実行前にデフォルトのバージョンが変更されると、タスクは変更後のデフォルトのバージョンで実行されます。
url
url
に基づいて、ターゲット サービスのハンドラのうち 1 つが選択され、そのハンドラによってタスクが実行されます。
url
は、ターゲット サービス内のハンドラ URL パターンのうち 1 つに一致する必要があります。タスクに指定されたメソッドが GET
または PULL
である場合は、url
にクエリ パラメータを含めることができます。url
が指定されていない場合は、デフォルトの URL /_ah/queue/[QUEUE_NAME]
が使用されます。ここで [QUEUE_NAME]
はタスクのキューの名前です。
データをハンドラに渡す
データをハンドラに渡す場合、タスクの URL 内のクエリ パラメータとして渡す方法がありますが、この方法が可能なのはタスクで指定されているメソッドが GET
か PULL
の場合のみです。
payload
。HTTP リクエストの本文でタスクデータが配信されます。params
次の 3 つの呼び出しは同等です。
taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')
タスクに名前を付ける
新しいタスクを作成すると、デフォルトではタスクに一意の名前が割り当てられます。ただし、name
パラメータを使用すると、タスクに独自の名前を割り当てることができます。独自のタスク名を割り当てることの利点は、名前付きのタスクでは重複が除外されることです。つまり、タスク名を使用すると、タスクが 1 回のみ追加されることを保証できます。重複の除外は、タスクが完了するか、削除されてから 9 日間続きます。
重複除外ロジックはパフォーマンスのオーバーヘッドを大幅に増加させるため、レイテンシが増加し、場合によっては名前付きタスクに伴うエラー率が高まるので注意してください。このようなコストは、タイムスタンプのように連続したタスク名が付けられている場合、大幅に増大する可能性があります。そのため、独自の名前を割り当てる場合は、コンテンツのハッシュなどの適度に分散された接頭辞をタスク名に使用することをおすすめします。
独自の名前をタスクに割り当てる場合、名前の最大長が 500 文字で、名前には英字の大文字と小文字、数字、アンダースコア、ハイフンを使用できることに注意してください。
taskqueue.add(url='/url/path', name='first-try')
タスクを非同期で追加する
デフォルトでは、タスクをキューに追加する呼び出しは同期的です。ほとんどのシナリオでは、同期呼び出しで問題ありません。タスク 1 個のキューへの追加は、通常は短時間で終わるオペレーションです。かなり長い時間を要するタスク追加オペレーションもごくわずかにありますが、タスク 1 個の追加に要する時間の中央値は 5 ミリ秒未満です。
異なるキューに対するタスク追加オペレーションはバッチ処理できないため、Task Queue API にはそのようなタスクを並行して追加できる非同期呼び出しが用意されており、このレイテンシをさらに短縮できます。これは、レイテンシの影響を受けやすく、異なるキューに対するタスク追加オペレーションを同時に複数個実行する必要があるアプリケーションを作成する場合に便利です。
タスクキューに対して非同期呼び出しを行う場合は、Queue クラスと RPC オブジェクトの非同期メソッドを使用します。返された RPC
に対して get_result()
を呼び出すと、リクエストを強制的に完了させることができます。トランザクションの中で非同期でタスクを追加するときは、リクエストを確実に完了させるために、get_result()
を RPC
オブジェクトに対して呼び出してからトランザクションを commit してください。
Cloud Datastore トランザクションの中でタスクをキューに追加する
タスクを Datastore トランザクションの一部としてキューに登録できます。これにより、トランザクションが正常に commit された場合にのみ、タスクがキューに登録されるようになります(キューへの登録が保証されます)。トランザクションの中で追加されたタスクは、そのトランザクションの一部とみなされ、同じレベルの分離と整合性を持つことになります。
1 つのトランザクションの中でタスクキューに挿入できるトランザクション タスクは 5 個までです。トランザクション タスクの名前をユーザーが指定することはできません。
次のコードサンプルでは、Datastore トランザクションの一部としてトランザクション タスクを push キューに挿入する方法を示します。
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
@ndb.transactional
def do_something_in_transaction():
taskqueue.add(url='/path/to/my/worker', transactional=True)
#...
do_something_in_transaction()
ワーカー サービスの代わりに遅延タスク ライブラリを使用する
上記のセクションで説明したように、実行するタスクごとに別個にハンドラを設定すると、タスクの複雑な引数のシリアル化 / シリアル化解除と同様、煩雑になる場合があります。特に、キューで実行するタスクが小さく、多種多様で数も多い場合には、厄介です。Python SDK に含まれるライブラリ(google.appengine.ext.deferred
)を使用すると、単純な関数をエクスポーズするだけで、専用のタスクハンドラの設定やパラメータのシリアル化/シリアル化解除の作業をすべて回避できます。
このライブラリを使用するには、deferred
ビルトインを app.yaml
に追加する必要があります。詳細については、app.yaml
リファレンスの組み込みハンドラのセクションをご覧ください。
deferred
ライブラリを使用するには、関数とその引数を deferred.defer()
に渡すだけです。
import logging
from google.appengine.ext import deferred
def do_something_expensive(a, b, c=None):
logging.info("Doing something expensive!")
# Do your work here
# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)
deferred
ライブラリは、関数呼び出しとその引数をパッケージ化して、タスクキューに追加します。タスクの実行時に、deferred
ライブラリが do_something_expensive("Hello, world!", 42, True)
を実行します。
マルチテナント アプリケーションでタスクを扱う
push キューでは、タスクの作成時に名前空間マネージャーで設定された名前空間がデフォルトで使用されます。アプリケーションでマルチテナンシーを使用する場合は、Python 2 の Namespaces API をご覧ください。
次のステップ
- タスクハンドラを作成する方法を学習する。