push タスクの作成

このページでは、タスクを作成して push キューに配置する方法を説明します。タスクを処理する場合は、新しいタスク オブジェクトを作成してキューに配置する必要があります。タスクを処理するサービスとハンドラを明示的に指定し、必要に応じてタスク固有のデータをハンドラに渡すこともできます。また、タスクを実行するタイミングのスケジューリング、タスクが失敗した場合の再試行回数の制限など、タスクの構成を微調整することもできます。

新しいタスクを作成する

タスクを作成し、キューに入れるには、taskqueue.add() 関数を呼び出します。次のコードでは worker という名前のサービスを対象とするタスクを作成し、URL /update-counter を設定してハンドラを呼び出します。

class EnqueueTaskHandler(webapp2.RequestHandler):
    def post(self):
        amount = int(self.request.get('amount'))

        task = taskqueue.add(
            url='/update_counter',
            target='worker',
            params={'amount': amount})

        self.response.write(
            'Task {} enqueued, ETA {}.'.format(task.name, task.eta))

または、Task オブジェクトを作成して、そのオブジェクトの add() メソッドを呼び出すこともできます。

ワーカー サービスを指定する

タスクがキューから取り出される(ポップ)と、タスクはタスクキュー サービスによってワーカー サービスに送信されます。いずれのタスクにもターゲットと url があり、どのサービスとハンドラがそのタスクを実行するかがこれらによって決まります。

target

ターゲットは、どのサービスがタスク実行の HTTP リクエストを受け取るかを指定します。これは文字列であり、サービス / バージョン / インスタンスを正規形式のいずれかで指定します。よく使用される形式は次のとおりです。

    service
    version.service
    instance.version.service

ターゲット文字列の前には、アプリのドメイン名が付加されます。タスクのターゲットを設定するには、次の 3 つの方法があります。

  • タスクを作成するときにターゲットを宣言します。taskqueue.add() 関数の target パラメータを使用して、ターゲットを明示的に設定できます。上記の例をご覧ください。

  • queue.yaml でキューを定義するときに、target ディレクティブを指定します(queue-blue定義をご覧ください)。target が指定されているキューに追加されるタスクはすべて、そのターゲットを使用します。タスクの作成時に別のターゲットが割り当てられていても無視されます。

  • 上記の 2 つの方法のいずれかでターゲットが指定されていない場合、タスクをキューに追加したサービスのバージョンが、そのタスクのターゲットになります。この方法でタスクをデフォルトのサービスとバージョンからキューに追加した場合、タスクの実行前にデフォルトのバージョンが変更されると、タスクは変更後のデフォルトのバージョンで実行されます。

url

url に基づいて、ターゲット サービスのハンドラのうち 1 つが選択され、そのハンドラによってタスクが実行されます。

url は、ターゲット サービス内のハンドラ URL パターンのうち 1 つに一致する必要があります。タスクに指定されたメソッドが GET または PULL である場合は、url にクエリ パラメータを含めることができます。url が指定されていない場合は、デフォルトの URL /_ah/queue/[QUEUE_NAME] が使用されます。ここで [QUEUE_NAME] はタスクのキューの名前です。

データをハンドラに渡す

データをハンドラに渡す場合、タスクの URL 内のクエリ パラメータとして渡す方法がありますが、この方法が可能なのはタスクで指定されているメソッドが GETPULL の場合のみです。

以下のフィールドのいずれかを使用して、タスクにデータを追加することもできます。

  • payload。HTTP リクエストの本文でタスクデータが配信されます。
  • params

次の 3 つの呼び出しは同等です。

taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')

タスクに名前を付ける

新しいタスクを作成すると、デフォルトではタスクに一意の名前が割り当てられます。ただし、name パラメータを使用すると、タスクに独自の名前を割り当てることができます。独自のタスク名を割り当てることの利点は、名前付きのタスクでは重複が除外されることです。つまり、タスク名を使用すると、タスクが 1 回のみ追加されることを保証できます。重複の除外は、タスクが完了するか、削除されてから 9 日間続きます。

重複除外ロジックはパフォーマンスのオーバーヘッドを大幅に増加させるため、レイテンシが増加し、場合によっては名前付きタスクに伴うエラー率が高まるので注意してください。このようなコストは、タイムスタンプのように連続したタスク名が付けられている場合、大幅に増大する可能性があります。そのため、独自の名前を割り当てる場合は、コンテンツのハッシュなどの適度に分散された接頭辞をタスク名に使用することをおすすめします。

独自の名前をタスクに割り当てる場合、名前の最大長が 500 文字で、名前には英字の大文字と小文字、数字、アンダースコア、ハイフンを使用できることに注意してください。

taskqueue.add(url='/url/path', name='first-try')

タスクを非同期で追加する

デフォルトでは、タスクをキューに追加する呼び出しは同期的です。ほとんどのシナリオでは、同期呼び出しで問題ありません。タスク 1 個のキューへの追加は、通常は短時間で終わるオペレーションです。かなり長い時間を要するタスク追加オペレーションもごくわずかにありますが、タスク 1 個の追加に要する時間の中央値は 5 ミリ秒未満です。

異なるキューに対するタスク追加オペレーションはバッチ処理できないため、Task Queue API にはそのようなタスクを並行して追加できる非同期呼び出しが用意されており、このレイテンシをさらに短縮できます。これは、レイテンシの影響を受けやすく、異なるキューに対するタスク追加オペレーションを同時に複数個実行する必要があるアプリケーションを作成する場合に便利です。

タスクキューに対して非同期呼び出しを行う場合は、Queue クラスと RPC オブジェクトの非同期メソッドを使用します。返された RPC に対して get_result() を呼び出すと、リクエストを強制的に完了させることができます。トランザクションの中で非同期でタスクを追加するときは、リクエストを確実に完了させるために、get_result()RPC オブジェクトに対して呼び出してからトランザクションを commit してください。

Cloud Datastore トランザクションの中でタスクをキューに追加する

タスクを Datastore トランザクションの一部としてキューに登録できます。これにより、トランザクションが正常に commit された場合にのみ、タスクがキューに登録されるようになります(キューへの登録が保証されます)。トランザクションの中で追加されたタスクは、そのトランザクションの一部とみなされ、同じレベルの分離と整合性を持つことになります。

1 つのトランザクションの中でタスクキューに挿入できるトランザクション タスクは 5 個までです。トランザクション タスクの名前をユーザーが指定することはできません。

次のコードサンプルでは、Datastore トランザクションの一部としてトランザクション タスクを push キューに挿入する方法を示します。

from google.appengine.api import taskqueue
from google.appengine.ext import ndb

@ndb.transactional
def do_something_in_transaction():
  taskqueue.add(url='/path/to/my/worker', transactional=True)
  #...

do_something_in_transaction()

ワーカー サービスの代わりに遅延タスク ライブラリを使用する

上記のセクションで説明したように、実行するタスクごとに別個にハンドラを設定すると、タスクの複雑な引数のシリアル化 / シリアル化解除と同様、煩雑になる場合があります。特に、キューで実行するタスクが小さく、多種多様で数も多い場合には、厄介です。Python SDK に含まれるライブラリ(google.appengine.ext.deferred)を使用すると、単純な関数をエクスポーズするだけで、専用のタスクハンドラの設定やパラメータのシリアル化/シリアル化解除の作業をすべて回避できます。

このライブラリを使用するには、deferred ビルトインを app.yaml に追加する必要があります。詳細については、app.yaml リファレンスの組み込みハンドラのセクションをご覧ください。

deferred ライブラリを使用するには、関数とその引数を deferred.defer() に渡すだけです。

import logging

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)

deferred ライブラリは、関数呼び出しとその引数をパッケージ化して、タスクキューに追加します。タスクの実行時に、deferred ライブラリが do_something_expensive("Hello, world!", 42, True) を実行します。

マルチテナント アプリケーションでタスクを扱う

push キューでは、タスクの作成時に名前空間マネージャーで設定された名前空間がデフォルトで使用されます。アプリケーションでマルチテナンシーを使用する場合は、Python 2 の Namespaces API をご覧ください。

次のステップ