Cloud Spanner によるイベントソース システムのデプロイ

この記事では、アーキテクチャ パターンを例として、Cloud Spanner をイベント取り込みシステム、Cloud Pub/Sub をイベントソースとして使用し、次の処理を実行できるシステムを作成する方法について説明します。

  • 可用性の高いイベントソースへの書き込み。
  • 他のシステムでの使用を目的とする書き込みのイベントとしてのパブリッシュ。
  • 再生を目的とするイベントのアーカイブ。
  • 分析を目的とするシステムへのイベントの読み込み。
  • クエリの高速化を目的とするシステムへのイベントのフィルタ。

この記事は、イベントソース システムの用途、デメリット、コンポーネントの学習に関心があるソフトウェア エンジニアを対象としています。イベントソース アーキテクチャをサポートする Cloud Functions を使用して、多数のアプリおよびサービスを作成する方法を学習します。

ユースケースと設計基準

このアーキテクチャ パターンは、データソース(この場合は Cloud Spanner)への新しいデータの書き込みに基づいてアクションを実行する必要がある場合にいつでも使用できます。

ユースケース

このパターンは、以下のシナリオの管理に役立ちます。

  • e コマースのショッピング カート
  • 注文管理とサプライ チェーン
  • ウォレット、支払い、および請求解決

次の図は、e コマースのショッピング カート システムのフローを示しています。

e コマースのショッピング カートにおけるイベントのフロー。

e コマースや支払いなどの複雑なシステムでは、イベントベースのアーキテクチャを使用してトランザクションを追跡するのが便利です。たとえば、お客様が商品をショッピング カートに追加したり、支払いのためにクレジット カードの処理を行ったりした場合、複数の下流プロセスをトリガーして、商品の在庫があり、お客様の口座に十分な残高があることを確認できます。ビジネスはお客様からの注文にかかっているため、お客様がいつでも注文を行えるようにする必要があります。

設計基準

以下は、イベントソース システム アーキテクチャの使用が推奨される設計基準の例です。

  • システムへの注文および支払いの書き込みを高い可用性で処理する。
  • お客様が注文した商品のみを受け取り、購入金額が正確に請求されていることを確認する。
  • 確定失敗モードがある(書き込みが失敗したか、成功したかを把握できる)。
  • 関連のあるシステム書き込みが行われたことを依存サービスに通知するメカニズムがある。

この記事では、これらの要件をすべて満たし、後から柔軟に機能を追加できるシステムについて説明します。

システム アーキテクチャ

まず、書き込みを受け入れることができる可用性の高いサービスが必要です。また、システムで確定失敗モードが提供されている必要があります。書き込みが失敗した場合には、そのすべてまたは一部が重複することを心配せずに書き込みを再試行できるよう、システムで失敗が認識される必要もあります。

このシナリオ向けの標準的なアプリケーションは原子性、一貫性、独立性、および永続性(ACID)トランザクションをサポートするデータベースですが、特に書き込みに関してデータベースの高可用性を実現することは困難です。レプリケーションによってデータの不整合が発生し、アーキテクチャのコストと複雑さが増す可能性があります。高可用性を重視する場合、複雑さは最大の設計リスクになります。

また、一般的な高可用性データベース構成では、複数のアベイラビリティ ゾーンが含まれる障害の処理を行うと、レプリケーションの遅延が発生します。こうした遅延が増加すると、追加の障害によって、別のアベイラビリティ ゾーンのレプリカノードに転送中の書き込みがすべて失われる可能性が高くなります。このような追加の障害が発生した場合、該当するゾーン内で障害が発生する前に、レプリカへの書き込みが完了していません。

アーキテクチャの図

下の図は、従来の高可用性データベースに伴う複雑さとコストの問題に対処するために設計された Cloud Spanner を含むイベントソース アーキテクチャを示しています。

Cloud Spanner を含むイベントソース アーキテクチャの図。

このイベントソース アーキテクチャでは、Cloud Functions を使用して作成する次のコンポーネントが使用されます。これらのコンポーネントは、アプリと Cloud Functions で構成されます。

  • ポーラーアプリ: Cloud Spanner のポーリングを実行し、レコードの形式を Avro に変換して、Cloud Pub/Sub にパブリッシュします。
  • archiver: Cloud Pub/Sub トピックにパブリッシュされたメッセージによってトリガーされたイベントを取得し、それらのレコードを Cloud Storage のグローバル アーカイブに書き込みます。
  • bqloader: Cloud Storage に書き込まれたレコードによってトリガーされ、それらのレコードを対応する BigQuery テーブルに読み込みます。
  • janitor: グローバル アーカイブに書き込まれたすべてのエントリを一定の速度で読み取り、それらを長期ストレージ用に圧縮します。
  • replayer: 長期ストレージからレコードを順に読み取り、解凍して新しい Cloud Pub/Sub ストリームに読み込みます。
  • マテリアライザー アプリ: Cloud Pub/Sub に書き込まれたレコードをフィルタし、対応する Redis(実体化されたビュー)データベースに読み込んでクエリに容易にアクセスできるようにします。

ポーリング サービスの作成

書き込みを受け入れるシステムを準備した後、システムへの書き込みが実行されるたびに下流のサービスに通知される必要があります。

従来のデータベースではこの処理をいくつかの異なる方法で実行しますが、基本的にはデータベースのログ先行書き込み(WAL)または変更データ キャプチャ(CDC)ストリームをリッスンします。これらのソリューションは、形式の読みやすさに欠けます。この形式は、レコードに対する変更の表現とストリーミングを目的として設計されています。下流のシステムにイベントを通知し、そのイベントに関連するコンテキストを伝えることは目的としていません。多くの場合、完全なレコードではない変更のみのバイナリ表現は役に立ちません。この形式のもう 1 つの難点として、人間が読める形式ではないため、ストリームのデバッグや監査が非常に難しくなります。

代わりに、すべての新規エントリについてデータベースをポーリングし、それらを下流のシステムに渡すサービスを作成できます。ポーリング サービスは、データベースに書き込まれる新しいレコードを処理する一般的な手段であり、次のようなメリットがあります。

  • 理解しやすく、書き込みが容易です。
  • 正しく書き込まれた場合のオーバーヘッドが少なくなります。
  • 安定性と独立性が実現します。
  • クエリとレコード解析の両方について、エラー耐性があります。
  • 変更の柔軟なフィルタと表現が可能になります。

従来の WAL や CDC を使用せずにポーリング サービスを作成すると、次のようなデメリットがあります。

  • 短い間隔(1 秒未満)でデータベースをポーリングすると、データベースへの負荷が大きくなる可能性があります。
  • テーブル レイアウト、ポーリングに使用するクエリ、および現在データベースをポーリングしている他のアプリの数によっては、ポーリング サービスによって他のアプリとのリソース競合(ロック)が発生する可能性があります。
  • ポーリングでは、負荷の増大とリソース競合に対処するため、より大きなデータベース マシンとより高価なストレージ(SSD など)が必要になる可能性があります。

ポーリングの読み取りに読み取り専用トランザクションを使用し、効率的かつ効果的なクエリに関する SQL のベスト プラクティスに従うことで、Cloud Spanner におけるこれらのデメリットの一部を軽減できます。

特定の期間の新規レコードをすべて検索するには、Cloud Spanner の commit タイムスタンプ機能を使用できます。commit タイムスタンプは、TrueTime テクノロジーを基盤とします。これにより、書き込みがデータベースに commit された日時を Cloud Spanner でグローバルに整合性のある形式で表現できるようになります。Cloud Spanner は世界中の多数の地域で書き込みを受け入れることができ、ポーラーはイベントの正確な会計が順番に含まれるソースを作成できます。

Cloud Functions を使用したタスクの表現

Cloud Functions は、GCP におけるサーバーレスのイベント ドリブン コンピューティング プラットフォームです。これらのファンクションは、HTTP リクエストやイベント トリガーなどのトリガーに応答して実行されるコードのステートレス スニペットです。イベントソース システムの場合、通常、Cloud Functions はパブリッシュされるイベントに関連する個別のタスクを表します。これらのファンクションはサーバーレスであるため、リクエストのボリュームに応じてスケーリングされ、追加の操作は必要ありません。

Cloud Functions のデメリットは次のとおりです。

  • レスポンス時間が不安定になる可能性があります。
  • 起動と実行の時間が短いため、ロギングとトレースが難しくなる可能性があります。
  • 通常は失敗時の再試行が自動的に行われるため、ステートレスかつべき等にする必要があります。
  • ロギング環境の状態によっては、ローカルでエラーをデバッグおよび再現するのが難しい場合があります。

ソースとしての Cloud Pub/Sub の使用

ソースは、イベントバスやメッセージ キューにパブリッシュされたイベントの追記専用レコードです。特定のイベントバスまたはメッセージ キュー トピックをサブスクライブするか、関心のある名詞、動詞、または特定のメタデータですべてのメッセージのコレクションをフィルタすることで、イベントのサブスクリプションやリッスンを実行できます。

このパターンでは、ソースは単一の Cloud Pub/Sub トピックです。レコードがストリームに書き込まれるたびに、イベンティング システムを使用して Cloud Functions をトリガーできます。

関心のある他のファンクションに引き続き利用するため、サブスクライバーがメッセージを確認しないよう注意してください。また、キューにはメッセージの保持期間制限があるため、これらのメッセージをアーカイブにバックアップする Cloud Functions を作成する必要があります。アーカイブされたメッセージを再生するには、Cloud Functions を使用してアーカイブされたメッセージを読み取り、使用するため新しい Cloud Pub/Sub トピックにパブリッシュできます。

ポーラーを使用したデータベースのクエリ

ポーラーの役割は、一定の時間間隔でデータベースのクエリを実行し、commit タイムスタンプで降順ソートされた(レコードが古い順に並べ替えられた)特定の時点より後の全レコードを要求することです。

ポーラーの要件

この設計では、ポーラーが最後に認識したタイムスタンプを追跡し、初回実行のためにシステムをブートストラップする必要があります。このタイムスタンプは、アプリケーション状態に保存するか、別のデータベースに書き込むか、ソースに最新のレコードを要求してそこからタイムスタンプを解析することで追跡できます。

レコードを別のデータベースに保存すると、別のシステムとの依存関係が発生して障害点が増えるため、複雑さが増す可能性があります。最後のプロセスのタイムスタンプをローカルのアプリ ストレージに保持し、アプリでエラーが発生するか、なんらかの理由でアプリが再起動され、内部状態が失われた場合にのみクエリを使用することをおすすめします。

ポーラーの要件は以下のとおりです。

  • ポーリング間隔が固定され、一貫しています。
  • ポーリング間隔が 1 秒未満です。
  • 初回実行時にシステムをブートストラップします。
  • 前に記録されたタイムスタンプの後に発生する全レコードのクエリを実行します。
  • 各レコードを個別の Avro レコードにシリアル化します。
  • 各 Avro レコードを Cloud Pub/Sub ベースのイベントソースにパブリッシュします。
  • ポーリング間隔の期間が経過するまでスリープします。
  • 障害が発生した場合、一定の時間間隔内に自動で再試行します。
  • 障害が発生した場合、ポーラーの再起動時に、最後に記録されたタイムスタンプを取得するため Cloud Pub/Sub ストリームに対するクエリが実行されます。それが失敗した場合は、手動で構成したタイムスタンプを使用してポーラーを起動できます。また、Cloud Storage アーカイブからタイムスタンプを取得することもできます。

ポーラーの設計

次に、ポーラーを設計します。ポーラーを設計する方法は少なくとも 3 つあり、それぞれに固有のデメリットがあります。

Cloud Scheduler を使用してデータベースをポーリングする Cloud Functions をスケジュールするか、Kubernetes クラスタでポーラーを cron ジョブとして起動して任意の間隔でスケジュールするか、Kubernetes ポッドで継続的に実行されるサービスを使用して固定された間隔でデータベースをポーリングし、メモリ内の最後に処理されたタイムスタンプのメンテナンスと更新を行うことができます。

データベースのクエリを一定の時間間隔で実行する必要があるため、Cloud Scheduler を使用してデータベースをポーリングする Cloud ファンクションをスケジュールし、新しいレコードを Cloud Pub/Sub ストリームに送信することをおすすめします。

このアプローチは有効ですが、2 つのデメリットがあります。まず、Cloud Functions は本質的にステートレスであるため、アプリケーション状態を維持する方法を見つける必要があります。アプリケーション状態を維持するには、追加のデータベースを導入する必要があり、書き込まれるイベントとソースに追加されるイベントとの間のレイテンシが不安定になります。Cloud Functions は、生成と実行に想定よりも時間がかかる場合があります。この遅延は、ポーリング間隔が短いほど目立ちます。

予測可能な下流のコンシューマすべてで 1 秒を超えて変動する可能性があるレイテンシが許容される場合は、Cloud Functions のスケジュールがシステム設計の最適なオプションとなる可能性があります。その場合、Cloud Functions は Cloud Pub/Sub ストリームのクエリによって最後に処理されたタイムスタンプを追跡するか、Cloud Storage でその状態を保持します。Kubernetes のように管理が複雑化しない一方で、Cloud Functions を使用する場合に考慮する必要があるもう 1 つのデメリットが、追加のクエリによってタイムスタンプの追加呼び出しが原因でレイテンシが増加し、ポーリング システムの障害点が増える可能性があることです。追加呼び出しのレイテンシを許容できる場合は、ここで Cloud Functions を使用できます。

Kubernetes クラスタでポーラーを cron ジョブとして起動し、任意の間隔でスケジュールすることもできます。このアプローチにも Cloud Scheduler を使用する場合と同様のデメリットがあり、Kubernetes を追加することで複雑さが増します。ただし、ジョブを Kubernetes でサービスとして開始し、設定した時間が経過するまでスリープ状態にして、アプリのメインイベント ループで制御できます。また、状態を保持し、ポーリングのレイテンシと再試行のセマンティクスを完全に制御できます。この選択肢の場合、Kubernetes によって複雑さが増しますが、Google Kubernetes Engine(GKE)を使用することで軽減できます。GKE により、以下を最大限に制御できます。

  • ポーリング間の状態(最終タイムスタンプ)。
  • ポーリング間のレイテンシ。
  • Cloud Spanner の読み取りまたは Cloud Pub/Sub の書き込みが失敗した場合における再試行のセマンティクスと期間。
  • ポーラー サービスが突然停止または終了した場合の自動再起動。

ポーラーのブートストラップ

ポーラーの初回実行時には、イベントソース システムの実行に必要なすべてのコンポーネントが設定されます。これには、正しい名前(理想的にはテーブルの名前)の Cloud Pub/Sub ストリームと、archiver のパブリッシュ先である Cloud Storage バケットが含まれます。すべてのシステム コンポーネントが設定された後、ポーラーのブートストラップ プロセスで初期テーブル スキャンが実行され、既存のデータがすべて処理されます。ポーラーは、終了後に固定ポーリング間隔に移行し、ポーリング システムで最初の処理実行に使用する最後に処理された commit タイムスタンプを渡します。

ソースを使用したデータの解釈

ポーラーのスケジュールを設定して最新のデータが取得されたら、そのデータをソースで表現する必要があります。これは一般化されたソリューションであり、スキーマが異なる多くのテーブルでこのポーラーを再利用できるため、テーブル固有のポーラーおよびソース コンシューマの作成はスケーラブルなソリューションではありません。また、バージョン管理されたスキーマのバリエーションが事前に認識されていなくても、異なるコンシューマをソースに追加できる必要があります。

次のようなユースケースが考えられます。

  • 全トランザクションの長期アーカイブ。
  • 分析を目的とするデータ ウェアハウスへのトランザクションの読み込み。
  • NoSQL データベースへのトランザクションの読み込みによる機械学習モデルへのデータ提供と、よくある質問に対する回答のユーザーに近い場所へのキャッシュ。

これらのユースケースでは、Avro、JSON、Protobuf などのシリアル化形式の使用を検討してください。 Google のデータ ウェアハウス ソリューションである BigQuery は、Avro ファイルからデータを直接取り込むことができます。Avro は、BigQuery にデータを読み込むのに適した形式です。Avro ファイルの読み込みには、JSON と比べて次のようなメリットがあります。

  • 読み込みが速くなります。データブロックが圧縮されていても、データを並列で読み取ることができます。
  • 型指定やシリアル化が不要です。
  • 他の形式で見られる固有のエンコードの問題がないため、解析が簡単です。

Avro ファイルを BigQuery に読み込むと、自己記述型ソースデータからテーブル スキーマが自動的に推定されます。

Avro の代わりに Protobuf を使用することもできますが、このユースケースでは Avro を使用する明確なメリットが 2 つあります。

  • BigQuery での直接取り込みが可能です。
  • スキーマがデータ オブジェクトに含まれています。

最後のメリットとして、ソースのコンシューマがデータを抽出して検査できます。JSON を逆シリアル化し、形式が変わらないことを祈ったり、該当オブジェクトの特定のバージョンに対するスキーマ レジストリのバージョンを取得したりする必要はありません。

これらの理由により、ソースに追加する前に、トランザクションごとにテーブルを Cloud Spanner から Avro オブジェクトにシリアル化します。詳細については、Cloud Spanner テーブルを Avro レコードに変換する方法を参照してください。

ポーラーアプリの作成

デプロイメント モデルとシリアル化形式を決定したら、ポーラーアプリを作成するための言語オプションを検討します。データを Cloud Spanner から読み取り、Cloud Pub/Sub に書き込むことが目標であるため、Google Cloud API でサポートされている言語のみを使用できます。これには C#、Go、Java、Node.js、PHP、Python、Ruby が含まれます。

Cloud Spanner で現在サポートされている言語のうち、Avro で正式にサポートされている言語は C#、Java、Python、PHP、Ruby です。アプリのレイテンシをできるだけコントロールし、クエリが実行されたテーブルを複数のスレッドで処理する必要があるため、Java を使用することをおすすめします。

イベント ストリームの使用

ポーラーがサービスのメインアプリですが、他にもストリームのコンシューマがいくつかあります。最初に必要なアプリは、ストリームをサブスクライブし、各メッセージを履歴参照用に Cloud Storage にアーカイブするアプリです。このシステムでは、各レコードが個別のファイルとしてテーブルと同じ名前でバケットに保存されます。

1 時間おきか、またはトランザクション頻度に基づき、個別のトランザクション レコードを取得してより大きなファイルに圧縮するサービスもあります。トランザクションの頻度やトランザクション レコードのサイズに応じて、個別のトランザクション Avro ファイルを圧縮することも検討します。

過去のトランザクションをすべて Cloud Storage にアーカイブした後、次の処理を実行できます。

  • 分析を目的とする BigQuery へのトランザクション データの直接入力。
  • 他のシステムのトレーニングやテストを目的とする履歴レコードの再生。
  • レコードが失われたか、破損した場合における記録システム(この場合は Cloud Spanner データベース)のコンテンツの再構築。
  • レポートまたは監査を目的とする履歴読み取り専用レプリカの作成。
  • ステージングまたはテスト環境用のデータベース バージョンの作成。

archiver Cloud Functions の作成

ストリームへのトランザクションの追加時にトリガーされる archiver と呼ばれる Cloud Functions を作成します。新しいファンクションを作成し、トピック(Cloud Spanner のテーブル)ごとにトリガーする必要があります。構成された Cloud Pub/Sub トピックにトランザクションが追加されるたびに、archiver Cloud Functions によって Avro レコードが取得され、テーブルと同じ名前で Cloud Storage バケットに書き込まれます。この Cloud Functions によってファイルにテーブル名と同じ名前が付けられ、ミリ秒単位までの日時と 4 桁のランダムな数字が取得されます。この命名規則に従い、Universally Unique Identifier(UUID)に似た ID が作成されます。

bqloader Cloud Functions の作成

bqloader という新しい Cloud Functions を作成し、その Avro ファイルを BigQuery に直接読み込むことができます。archiver によってファイルが Cloud Storage にアップロードされると、このファンクションがトリガーされます。Cloud Storage にトランザクションが読み込まれるたびに、この Cloud Functions によってそのトランザクション エントリが適切な BigQuery テーブルに追加されます。分析や機械学習へのデータ提供などのレイテンシをさらに短縮する場合は、tabledata.insertAll メソッドを使用し、データを BigQuery に 1 レコードずつストリーミングします。このアプローチを使用すると、読み込みジョブの実行遅延を発生させることなく、データのクエリを実行できます。BigQuery へのレコード読み込みに対する割り当てに留意してください。

janitor Cloud Functions の作成

archiver Cloud Functions により、Avro ファイルが Cloud Storage バケットに書き込まれます。次に、すべての個別トランザクションを長期ストレージ用に単一の圧縮ファイルに圧縮するための janitor という別の Cloud Functions を作成します。janitor により、ファイルに含まれるテーブル名、日付、期間を使用し、新しく作成されたこのファイルに名前が付けられます。たとえば、janitor が 1 時間おきに実行されるようスケジュールされている場合、ファイル名は table1-jan_1_2019_1200-1300.tar.gz のようになります。janitor Cloud Functions は必須ではありませんが、ストレージ コストを削減し、Cloud Storage バケットを整理するのに役立ちます。

replayer Cloud Functions の作成

Cloud Storage でアーカイブされた Avro ファイルを再生するには、HTTP によってトリガーされる replayer という別の Cloud Functions が必要です。replayer Cloud Functions により、再生を要求した期間のアーカイブ ファイルが取得、展開され、新しい Cloud Pub/Sub ストリームに順番にパブリッシュされます。

この Cloud Functions は、再生を要求した期間を提供する HTTP POST リクエストを使用してトリガーされます。この Cloud Functions は、終了後に Cloud Pub/Sub ストリームの名前を返すか、新しいストリームにすべてのアーカイブ データを正しく読み込めなかった場合にはエラーコードと説明を返します。

アーカイブ ファイルが大きくなりすぎて再生が不安定になるか、再生速度がユースケースに対して遅すぎる場合、このアプリでより高度な処理が必要になる可能性があります。アーカイブを小さなセクションに分割するか、replayer Cloud Functions に対して異なる言語を選択できます。

もう 1 つのオプションとして、ジョブをより小さなタスクに分割し、並列実行するための Cloud Dataflow ジョブがあります。このようなシステムの実装については、このドキュメントで取り上げませんが、GCP GitHub リポジトリおよび Cloud Dataflow ドキュメントに良い例が記載されています。

マテリアライザー アプリの作成

それぞれが 1 つのトランザクションを表すこれらのイベントをソースにまとめることで、追加のコードを記述することも、異なるアプリチーム間で調整を行うこともなく、さまざまな種類のサービスをシステムにシームレスに統合できます。

たとえば、このアプリは特定のトピックに関するすべてのメッセージをリッスンし、クライアントの名前でフィルタして、該当するお客様の関連データの実体化されたビューを作成します。ユーザーに関する情報のクエリを実行する必要があり、非常に短いレイテンシでその情報にアクセスする必要がある場合に、このアプリを使用できるのが理想的です。

データを早期に分析して高速のストレージに保存し、詳細なレスポンスを返すことができれば、頻繁に実行されるクエリのパフォーマンスを改善できます。

このマテリアライザー アプリは、Cloud Pub/Sub トピックによってトリガーされる Cloud Functions を含む場合があり、クライアント ID によって情報をフィルタします。クライアント ID が関心のある ID と一致する場合、この Cloud Functions によって関連データが Cloud Memorystore に書き込まれます。

まとめ

イベントソース システム アーキテクチャを構築することで、イベント間の関係に反応するか、それを認識する必要があるすべてのシステムで新しい機能を作成し、柔軟性を高めることができます。決定論的な順序付けや大量のスループットが重要な場合は、イベント取り込みシステムとして Cloud Spanner、イベントソースとして Cloud Pub/Sub を使用することで、イベントソース アーキテクチャに対する堅牢で信頼性の高い基盤を構築できます。イベントソースの基盤を設定したら、これまでは解決が困難だった問題をシンプルな Cloud Functions または Cloud Pub/Sub サブスクライバー ソリューションにできる多くの方法を発見できます。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...