トランザクション

注: 新しいアプリケーションを作成する際は、NDB クライアント ライブラリを使用することを強くおすすめします。NDB クライアント ライブラリには、Memcache API によるエンティティの自動キャッシュをはじめ、このクライアント ライブラリにはないメリットがあります。古い DB クライアント ライブラリを現在使用している場合は、DB から NDB への移行ガイドをお読みください。

Datastore はトランザクションをサポートしています。トランザクションは不可分な一連のオペレーションから構成されます。つまり、トランザクション内のオペレーションが部分的に実行されることはありません。アプリケーションは、1 つのトランザクションで複数のオペレーションと計算を実行できます。

トランザクションの使用

トランザクションとは、1 つまたは複数のエンティティに対して Datastore が行う一連の処理です。トランザクションのオペレーションは不可分で、部分的に実行されることはありません。トランザクション内のオペレーションは、すべて適用されるか 1 つも適用されないかのどちらかとなります。トランザクションの持続時間は最大 60 秒で、30 秒後に 10 秒間のアイドル時間が発生します。

次の場合、オペレーションが失敗します。

  • 1 つのエンティティ グループに同時に実行される変更の数が多すぎる場合。
  • トランザクションがリソース制限を超過している場合。
  • Datastore で内部エラーが発生した場合。

いずれの場合も、Datastore API で例外が発生します。

トランザクションは Datastore のオプション機能です。Datastore オペレーションを実行するために、必ずしもトランザクションを使用する必要はありません。

アプリケーションはステートメントとデータストア オペレーションのセットを 1 つのトランザクションで実行できます。そのためステートメントまたはオペレーションのいずれかで例外が発生すると、そのセット内の Datastore オペレーションは一切反映されません。アプリケーションでは、Python の関数を使用して、トランザクション内で実行する処理を定義します。アプリケーションは、トランザクションが単一のエンティティ グループ内のエンティティにアクセスするかどうかや、トランザクションがクロスグループ トランザクションであるかどうかに応じて、いずれかの run_in_transaction メソッドを使用してトランザクションを開始します。

トランザクション内でのみ使用される関数の場合、@db.transactional デコレータを使用します。

from google.appengine.ext import db

class Accumulator(db.Model):
    counter = db.IntegerProperty(default=0)

@db.transactional
def increment_counter(key, amount):
    obj = db.get(key)
    obj.counter += amount
    obj.put()

q = db.GqlQuery("SELECT * FROM Accumulator")
acc = q.get()

increment_counter(acc.key(), 5)

その関数がトランザクションなしで呼び出されることがある場合は、関数を修飾する代わりに、db.run_in_transaction() に引数として渡します。

from google.appengine.ext import db

class Accumulator(db.Model):
    counter = db.IntegerProperty(default=0)

def increment_counter(key, amount):
    obj = db.get(key)
    obj.counter += amount
    obj.put()

q = db.GqlQuery("SELECT * FROM Accumulator")
acc = q.get()

db.run_in_transaction(increment_counter, acc.key(), 5)

db.run_in_transaction() は関数オブジェクトと、関数に渡す位置引数とキーワード引数を受け取ります。関数が値を返す場合、db.run_in_transaction() はその値を返します。

関数が戻るとトランザクションが commit され、Datastore 処理の内容がすべて反映されます。関数が例外をスローすると、トランザクションはロールバックされ、処理内容は反映されません。例外については前出の注記をご覧ください。

1 つのトランザクションが別のトランザクション内から呼び出された場合、@db.transactionaldb.run_in_transaction() のデフォルトの動作が変わります。とデフォルトの動作が異なります。@db.transactional はこの操作を許可しますが、内部トランザクションは、外部トランザクションと同じトランザクションになります。db.run_in_transaction() を呼び出すと、既存のトランザクション内に別のトランザクションをネストしようとしますが、この動作はまだサポートされていないため、db.BadRequestError が発生します。他の動作をしてすることもできます。詳細については、トランザクション オプションの関数リファレンスをご覧ください。

クロスグループ(XG)トランザクションの使用

クロスグループ トランザクションは、複数のエンティティ グループに処理を実行します。クロスグループ トランザクションでは、1 つのグループに対するトランザクションとは異なり、複数のエンティティ グループのエンティティを更新してもトランザクションが失敗することはありません。クロスグループ トランザクションを開始するには、トランザクション オプションを使用します。

使用中: @db.transactional

from google.appengine.ext import db

@db.transactional(xg=True)
def make_things():
  thing1 = Thing(a=3)
  thing1.put()
  thing2 = Thing(a=7)
  thing2.put()

make_things()

使用中: db.run_in_transaction_options

from google.appengine.ext import db

xg_on = db.create_transaction_options(xg=True)

def my_txn():
    x = MyModel(a=3)
    x.put()
    y = MyModel(a=7)
    y.put()

db.run_in_transaction_options(xg_on, my_txn)

トランザクションで実行可能な処理

Datastore では、1 つのトランザクション内で実行できる処理に制限を設定しています。

1 つのグループに対するトランザクションの場合、トランザクション内の Datastore オペレーションは、同じエンティティ グループ内のエンティティに対して実行される必要があります。クロスグループ トランザクションの場合は、最大 25 のエンティティ グループのエンティティに対して実行される必要があります。たとえば祖先に基づくエンティティの照会、キーに基づくエンティティの取得、エンティティの更新、エンティティの削除などを実行できます。ルート エンティティはそれぞれ別のエンティティ グループに属しているため、1 つのトランザクション内で複数のルート エンティティを作成または操作するには、クロスグループ トランザクションを使用する必要があります。

1 つ以上の共通エンティティ グループで複数のトランザクションがエンティティを同時に変更しようとすると、変更を commit した最初のトランザクションは成功しますが、他のすべてのトランザクションは commit に失敗します。このため、エンティティ グループを使用する場合、グループ内のエンティティに同時に実行できる書き込みの数が制限されます。トランザクションが開始されると、Datastore はオプティミスティック同時実行制御を使用して、トランザクションで使用されるエンティティ グループの最終更新時間をチェックします。エンティティ グループのトランザクションを commit する時点で、Datastore はトランザクションで使用されたエンティティ グループの最終更新時間を再度チェックします。最初のチェック以降に変更が行われていると、例外をスローします。

アプリでは、トランザクション内でクエリを実行できますが、実行するには、祖先フィルタを使用する必要があります。また、トランザクション内でキーを使用してデータストア エンティティを取得できます。このキーは、トランザクションの開始前に準備することも、トランザクション内でキー名または ID を指定して作成することもできます。

その他の Python コードはすべてトランザクション関数の中で使用できます。現在のスコープがトランザクション関数の中にネストされているかどうかは、db.is_in_transaction() を使用して確認できます。トランザクション関数で Datastore 処理以外の副作用が発生しないようにしてください。他のユーザーが同じエンティティ グループ内のエンティティを更新しているために Datastore の処理が失敗した場合、トランザクション関数を複数回呼び出すことが可能です。この場合、Datastore API がトランザクションを一定回数再試行します。再試行がすべて失敗すると、db.run_in_transaction()TransactionFailedError をスローします。トランザクションの再試行回数を調整する場合は、db.run_in_transaction() ではなく db.run_in_transaction_custom_retries() を使用します。

同様に、トランザクション関数では、トランザクションの成否によって生じる副作用がないようにしてください(トランザクション関数を呼び出すコードにそのような副作用を元に戻す能力がある場合はこの限りではありません)。たとえば、トランザクションで新しい Datastore エンティティを格納し、作成したエンティティの ID を後で使用できるよう保存している場合、トランザクションが失敗すると、エンティティの作成がロールバックされるため、保存した ID では目的のエンティティを参照できせん。この場合は、呼び出し元コードにおいて、保存した ID を使用しないようにする必要があります。

分離と整合性

トランザクションの外部では、Datastore の分離レベルは「read committed(確定した最新データを常に読み取る)」に最も近くなります。トランザクション内部では、「serializable(シリアル化可能)」分離レベルが強制的に適用されます。つまり、このトランザクションによって読み取りまたは変更が行われているデータを、他のトランザクションが同時に変更することはできません。

トランザクション内のすべての読み取りは、トランザクションを開始したときの最新かつ整合性がある Datastore の状態を反映します。トランザクション内の各クエリと get は、トランザクション開始時の Datastore に対する単一の一貫したスナップショットを参照することが保証されています。トランザクションのエンティティ グループ内の各エンティティとインデックス行は完全に更新されるため、クエリは完全で正確な結果セットとしてエンティティを返すことができます。トランザクション外部で生じる可能性のある、偽陽性または偽陰性といった誤検知が生じることはありません。

こうした一貫性のあるスナップショット ビューは、トランザクション内の書き込み後の読み取りにまで拡張されています。ほとんどのデータベースと異なり、Datastore トランザクション内のクエリや get オペレーションが、同じトランザクション内で先に実行された書き込みの結果を読み取ることはありません。特にトランザクション内でエンティティが変更または削除された場合は、クエリまたは get により、トランザクション開始時のオリジナル バージョンのエンティティが返されます。または、トランザクション開始時にこのエンティティが存在しなかった場合は何も返されません。

トランザクションの用法

この例では、トランザクションを使用して、現在の値より新しいプロパティ値でエンティティを更新します。

def increment_counter(key, amount):
    obj = db.get(key)
    obj.counter += amount
    obj.put()

このコードがオブジェクトをフェッチした後、変更済みオブジェクトを保存する前に値が別のユーザーによって更新されてしまう可能性があるため、トランザクションが必要になります。トランザクションを使用しないと、最初のユーザーのリクエストに対し、もう 1 人のユーザーによる更新前の count 値が使用され、保存によって新しい値が上書きされてしまいます。トランザクションを使用することで、もう 1 人のユーザーによる更新がアプリケーションに通知されます。トランザクションでエンティティが更新されると、すべてのステップが中断せずに完了するまでトランザクションが再試行されます。

トランザクションのもう 1 つの一般的な用法は、名前付きキーによってエンティティを取得することです。名前付きキーがまだ存在しない場合は、新規に作成します。

class SalesAccount(db.Model):
    address = db.PostalAddressProperty()
    phone_number = db.PhoneNumberProperty()

def get_or_create(parent_key, account_id, address, phone_number):
    obj = db.get(db.Key.from_path("SalesAccount", account_id, parent=parent_key))
    if not obj:
        obj = SalesAccount(key_name=account_id,
                           parent=parent_key,
                           address=address,
                           phone_number=phone_number)
        obj.put()
    else:
        obj.address = address
        obj.phone_number = phone_number

同じ文字列 ID を持つエンティティを別のユーザーが作成または更新しようとする状況を処理するには、前の例と同様、ここでもトランザクションが必要です。トランザクションを使用しない場合は、エンティティがまだ存在していない時点で 2 人のユーザーが同時にこれを作成しようとすると、1 人目のユーザーが知らないうちに 2 人目のユーザーが作成したエンティティが 1 人目のエンティティを上書きしてしまいます。トランザクションを使用すると、2 回目の試行が実行されます。エンティティが存在するため、エンティティが更新されます。

トランザクションが失敗した場合は、成功するまでアプリにトランザクションを再試行させることができます。またはアプリのユーザー インターフェースのレベルまでエラーを伝達し、ユーザーにこのエラーを処理させることもできます。すべてのトランザクションに対し、再試行ループを作成する必要はありません。

「get して存在しなければ create」は便利であり、このための組み込みメソッドが用意されています。Model.get_or_insert() は、キー名、親(オプション)、モデル コンストラクタに渡す引数を受け取り、その名前とパスのエンティティが存在しない場合は最後の引数をモデル コンストラクタに渡してエンティティを作成します。get と create は 1 つのトランザクションで発生します。トランザクションが成功すると、このメソッドは実際のエンティティを表すモデル インスタンスを常に返します。

最後に、トランザクションを使用してデータ整合性のある Datastore スナップショットを読み取ることができます。こうした手法は、整合性が要求されるページの表示やデータのエクスポートに、複数回の読み取りが必要となるような場合に役立ちます。このような種類のトランザクションは、書き込みを伴わないため、読み取り専用トランザクションとも呼ばれます。読み取り専用の単一グループによるトランザクションは、同時変更によって失敗することはないため、失敗した場合の再試行を実装する必要はありません。ただし、クロスグループ トランザクションは、同時変更によって失敗する可能性があるため、再試行が必要になります。読み取り専用トランザクションの commit とロールバックは、いずれも no-op(何も行わない)として処理されます。

class Customer(db.Model):
    user = db.StringProperty()

class Account(db.Model):
    """An Account has a Customer as its parent."""
    address = db.PostalAddressProperty()
    balance = db.FloatProperty()

def get_all_accounts():
    """Returns a consistent view of the current user's accounts."""
    accounts = []
    for customer in Customer.all().filter('user =', users.get_current_user().user_id()):
        accounts.extend(Account.all().ancestor(customer))
    return accounts

トランザクション タスクをキューに入れる

Datastore トランザクションの一部として、トランザクションが正常に commit された場合にのみタスクをキューに入れることができます。トランザクションが commit できない場合、タスクはキューに入りません。トランザクションが commit されると、タスクがキューに入ります。キューに追加されたタスクはすぐに実行されません。そのため、タスクはトランザクションにおいてアトミックではありません。ただし、いったんキューに登録されれば、タスクは成功するまで再試行されます。これは、run_in_transaction() 関数でキューに登録されるすべてのタスクに適用されます。

トランザクション タスクは、トランザクションの成功(購入を確認するメールの送信など)に依存する Datastore 以外の処理を、トランザクションに組み合わせることができるので便利です。また、Datastore の処理をトランザクションに組み合わせることもできます。たとえば、トランザクションが成功した場合にのみ、エンティティ グループに対する変更をトランザクションの外部で commit できます。

アプリケーションの 1 つのトランザクション中にタスクキューに挿入できるトランザクション タスクの数は 5 個までです。トランザクション タスクの名前をユーザーが指定することはできません。

def do_something_in_transaction(...)
    taskqueue.add(url='/path/to/my/worker', transactional=True)
  ...

db.run_in_transaction(do_something_in_transaction, ....)