トランザクション

Datastore はトランザクションをサポートしています。トランザクションは不可分な一連のオペレーションから構成されます。つまり、トランザクション内のオペレーションが部分的に実行されることはありません。アプリケーションは、1 つのトランザクションで複数のオペレーションと計算を実行できます。

トランザクションの使用

トランザクションとは、1 つまたは複数のエンティティに対して Datastore が行う一連の処理です。トランザクションのオペレーションは不可分で、部分的に実行されることはありません。トランザクション内のオペレーションは、すべて適用されるか 1 つも適用されないかのどちらかとなります。トランザクションの持続時間は最大 60 秒で、30 秒後に 10 秒間のアイドル時間が発生します。

次の場合、オペレーションが失敗します。

  • 1 つのエンティティ グループに同時に実行される変更の数が多すぎる場合。
  • トランザクションがリソース制限を超過している場合。
  • Datastore で内部エラーが発生した場合。

上記のいずれの場合も、Datastore API はエラーを返します。

トランザクションは Datastore のオプション機能です。Datastore オペレーションを実行するために、必ずしもトランザクションを使用する必要はありません。

datastore.RunInTransaction 関数は、指定された関数をトランザクション内で実行します。


package counter

import (
	"context"
	"fmt"
	"net/http"

	"google.golang.org/appengine"
	"google.golang.org/appengine/datastore"
	"google.golang.org/appengine/log"
	"google.golang.org/appengine/taskqueue"
)

func init() {
	http.HandleFunc("/", handler)
}

type Counter struct {
	Count int
}

func handler(w http.ResponseWriter, r *http.Request) {
	ctx := appengine.NewContext(r)

	key := datastore.NewKey(ctx, "Counter", "mycounter", 0, nil)
	count := new(Counter)
	err := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
		// Note: this function's argument ctx shadows the variable ctx
		//       from the surrounding function.
		err := datastore.Get(ctx, key, count)
		if err != nil && err != datastore.ErrNoSuchEntity {
			return err
		}
		count.Count++
		_, err = datastore.Put(ctx, key, count)
		return err
	}, nil)
	if err != nil {
		log.Errorf(ctx, "Transaction failed: %v", err)
		http.Error(w, "Internal Server Error", 500)
		return
	}

	fmt.Fprintf(w, "Current count: %d", count.Count)
}

関数が nil を返すと、RunInTransaction はトランザクションを commit しようとし、成功した場合は nil を返します。関数が nil 以外のエラー値を返した場合、Cloud Datastore の変更は適用されず、RunInTransaction は同じエラーを返します。

競合のため RunInTransaction がトランザクションを commit できない場合、再度試行します。3 回失敗すると、処理を停止します。これは、トランザクション関数がべき等であること(複数回試行しても結果が同じであること)を意味します。スライス フィールドをマーシャリング解除する場合、datastore.Get はべき等ではありません。

トランザクションで実行可能な処理

Datastore では、1 つのトランザクション内で実行できる処理に制限を設定しています。

1 つのグループに対するトランザクションの場合、トランザクション内の Datastore オペレーションは、同じエンティティ グループ内のエンティティに対して実行される必要があります。クロスグループ トランザクションの場合は、最大 25 のエンティティ グループのエンティティに対して実行される必要があります。たとえば祖先に基づくエンティティの照会、キーに基づくエンティティの取得、エンティティの更新、エンティティの削除などを実行できます。ルート エンティティはそれぞれ別のエンティティ グループに属しているため、1 つのトランザクション内で複数のルート エンティティを作成または操作するには、クロスグループ トランザクションを使用する必要があります。

1 つ以上の共通エンティティ グループで複数のトランザクションがエンティティを同時に変更しようとすると、変更を commit した最初のトランザクションは成功しますが、他のすべてのトランザクションは commit に失敗します。このため、エンティティ グループを使用する場合、グループ内のエンティティに同時に実行できる書き込みの数が制限されます。トランザクションが開始されると、Datastore はオプティミスティック同時実行制御を使用して、トランザクションで使用されるエンティティ グループの最終更新時間をチェックします。エンティティ グループのトランザクションを commit する時点で、Datastore はトランザクションで使用されたエンティティ グループの最終更新時間を再度チェックします。初回のチェック時以降に変更が加えられていた場合は、エラーが返されます。

分離と整合性

トランザクションの外部では、Datastore の分離レベルは「read committed(確定した最新データを常に読み取る)」に最も近くなります。トランザクション内部では、「serializable(シリアル化可能)」分離レベルが強制的に適用されます。つまり、このトランザクションによって読み取りまたは変更が行われているデータを、他のトランザクションが同時に変更することはできません。

トランザクション内のすべての読み取りは、トランザクションを開始したときの最新かつ整合性がある Datastore の状態を反映します。トランザクション内の各クエリと get は、トランザクション開始時の Datastore に対する単一の一貫したスナップショットを参照することが保証されています。トランザクションのエンティティ グループ内の各エンティティとインデックス行は完全に更新されるため、クエリは完全で正確な結果セットとしてエンティティを返すことができます。トランザクション外部で生じる可能性のある、偽陽性または偽陰性といった誤検知が生じることはありません。

こうした一貫性のあるスナップショット ビューは、トランザクション内の書き込み後の読み取りにまで拡張されています。ほとんどのデータベースと異なり、Datastore トランザクション内のクエリや get オペレーションが、同じトランザクション内で先に実行された書き込みの結果を読み取ることはありません。特にトランザクション内でエンティティが変更または削除された場合は、クエリまたは get により、トランザクション開始時のオリジナル バージョンのエンティティが返されます。または、トランザクション開始時にこのエンティティが存在しなかった場合は何も返されません。

トランザクションの用法

この例では、トランザクションを使用して、現在の値より新しいプロパティ値でエンティティを更新します。

func increment(ctx context.Context, key *datastore.Key) error {
	return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
		count := new(Counter)
		if err := datastore.Get(ctx, key, count); err != nil {
			return err
		}
		count.Count++
		_, err := datastore.Put(ctx, key, count)
		return err
	}, nil)
}

このコードがオブジェクトをフェッチした後、変更済みオブジェクトを保存する前に値が別のユーザーによって更新されてしまう可能性があるため、トランザクションが必要になります。トランザクションを使用しないと、最初のユーザーのリクエストに対し、もう 1 人のユーザーによる更新前の count 値が使用され、保存によって新しい値が上書きされてしまいます。トランザクションを使用することで、もう 1 人のユーザーによる更新がアプリケーションに通知されます。トランザクションでエンティティが更新されると、すべてのステップが中断せずに完了するまでトランザクションが再試行されます。

トランザクションのもう 1 つの一般的な用法は、名前付きキーによってエンティティを取得することです。名前付きキーがまだ存在しない場合は、新規に作成します。

type Account struct {
	Address string
	Phone   string
}

func GetOrUpdate(ctx context.Context, id, addr, phone string) error {
	key := datastore.NewKey(ctx, "Account", id, 0, nil)
	return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
		acct := new(Account)
		err := datastore.Get(ctx, key, acct)
		if err != nil && err != datastore.ErrNoSuchEntity {
			return err
		}
		acct.Address = addr
		acct.Phone = phone
		_, err = datastore.Put(ctx, key, acct)
		return err
	}, nil)
}

同じ文字列 ID を持つエンティティを別のユーザーが作成または更新しようとする状況を処理するには、前の例と同様、ここでもトランザクションが必要です。トランザクションを使用しない場合は、エンティティがまだ存在していない時点で 2 人のユーザーが同時にこれを作成しようとすると、1 人目のユーザーが知らないうちに 2 人目のユーザーが作成したエンティティが 1 人目のエンティティを上書きしてしまいます。

トランザクションが失敗した場合は、成功するまでアプリにトランザクションを再試行させることができます。またはアプリのユーザー インターフェースのレベルまでエラーを伝達し、ユーザーにこのエラーを処理させることもできます。すべてのトランザクションに対し、再試行ループを作成する必要はありません。

最後に、トランザクションを使用してデータ整合性のある Datastore スナップショットを読み取ることができます。こうした手法は、整合性が要求されるページの表示やデータのエクスポートに、複数回の読み取りが必要となるような場合に役立ちます。このような種類のトランザクションは、書き込みを伴わないため、読み取り専用トランザクションとも呼ばれます。読み取り専用の単一グループによるトランザクションは、同時変更によって失敗することはないため、失敗した場合の再試行を実装する必要はありません。ただし、クロスグループ トランザクションは、同時変更によって失敗する可能性があるため、再試行が必要になります。読み取り専用トランザクションの commit とロールバックは、いずれも no-op(何も行わない)として処理されます。

トランザクション タスクをキューに入れる

タスクを Datastore トランザクションの一部としてキューに登録できます。これにより、トランザクションが正常に commit された場合にのみ、タスクがキューに登録されるようになります(キューへの登録が保証されます)。キューに登録されても、タスクがすぐに実行されることが保証されるわけではありません。そのため、タスクはトランザクションにおいてアトミック(不可分)ではありません。ただし、キューに登録された後は、成功するまで再試行されます。これは、RunInTransaction 関数でキューに登録されるすべてのタスクに適用されます。

トランザクション タスクは、トランザクションの成功(購入を確認するメールの送信など)に依存する Datastore 以外の処理を、トランザクションに組み合わせることができるので便利です。また、データストアの処理をトランザクションに組み合わせることもできます。たとえば、トランザクションが成功した場合にのみ、エンティティ グループに対する変更をトランザクションの外部で commit できます。

1 つのトランザクションの中でタスクキューに挿入できるトランザクション タスクは 5 個までです。トランザクション タスクの名前をユーザーが指定することはできません。

datastore.RunInTransaction(ctx, func(ctx context.Context) error {
	t := &taskqueue.Task{Path: "/path/to/worker"}
	if _, err := taskqueue.Add(ctx, t, ""); err != nil {
		return err
	}
	// ...
	return nil
}, nil)