コンテンツに移動
デベロッパー

BigQuery、Pub/Sub、ADK を使用したイベント ドリブン データ エージェントの構築

2026年4月21日
https://storage.googleapis.com/gweb-cloudblog-publish/images/blog_hero_image_final.max-2600x2600.png
Rachael Deacon-Smith

Developer Advocate, Google

Nick Orlove

BigQuery Product Manager

※この投稿は米国時間 2026 年 4 月 11 日に、Google Cloud blog に投稿されたものの抄訳です。

リアルタイムの自律エージェントの必要性

データは、それに基づいて行動できる能力があってこそ価値を発揮します。現代の企業では、イベントが発生してから数時間、あるいは数分後に対応しても、手遅れになることがよくあります。金融詐欺や流動的なサプライ チェーンの混乱に対処する場合も、一分一秒が重要になります。

しかし、多くのシステムは依然として、処理の遅いスケジュールされたバッチジョブや、変更を pull し続ける脆弱なマイクロサービスに依存しています。問題が表面化したときには、手遅れになっていることがよくあります。そのため、人間の調査担当者は、ログやデータベース クエリを掘り下げて情報をかき集め、それらをつなぎ合わせる作業に追われることになります。時間がかかり、骨の折れるプロセスで、スケーラビリティもありません。

イベント ドリブン データ エージェントの登場

時間のかかるパイプラインや手動のトリアージを待つ代わりに、データ プラットフォームが異常を検出するとすぐにアラートを push し、自律型 AI エージェントが調査して解決してくれるとしたらどうでしょうか。

これがイベント ドリブン データ エージェント アーキテクチャのメリットです。BigQuery の継続的クエリPub/SubVertex AI Agent Engine 上の ADK エージェントを組み合わせることで、イベントをリアルタイムでトリアージして自律的に調査するパイプラインを構築できます。エージェントは高度な推論を使用してコンテキストを収集し、データを分析して、問題をその場で解決するか、人間参加型の介入が必要な場合は担当者にエスカレーションします。

ハイブリッド アーキテクチャ: 仕組み

https://storage.googleapis.com/gweb-cloudblog-publish/images/blog_image2.max-2200x2200.jpg

このイベント ドリブン パイプラインは、次の 3 つの主要な構成要素を活用します。

  1. 検出: BigQuery の継続的クエリは、ライブデータ ストリームをモニタリングし、ルールベースのエンジンを使用して異常を検出します。

  2. ルーティング: Pub/Sub は、単一メッセージ変換(SMT)を使用してペイロードを AI エージェントが期待する正確な形式に再形成し、これらのイベントを確実に配信します。これにより、エージェント パイプラインがトリガーされ、調査が開始されます。

  3. 解決: Vertex AI エージェント(ADK で構築)がイベントを受信し、カスタムツールを使用して調査し、意思決定をログに記録します。

各コンポーネントについて詳しく見ていきましょう。具体例として、不正な金融取引をリアルタイムで検出および調査するという、シンプルなユースケースをご紹介します。

パート 1: BigQuery の継続的クエリ

BigQuery の継続的クエリでは、標準 SQL を使用してリアルタイムのイベント ストリームをネイティブに構築できます。これらは、継続的に実行される永続的な SQL クエリです。受信データを分析し、SQL の結果を Pub/Sub などの宛先に即座にエクスポートします。

BigQuery でストリーミング イベントを pull 型からネイティブな push 型へと移行することで、標準 SQL を使用してデータ ウェアハウス内で複雑な異常(ユーザーが指定した期間内に 2 つの異なる国で取引を行うなど)を検出できます。データを別のストリーミング分析エンジンに移動する必要はありません。

この変革は、BigQuery 継続的クエリのステートフル データ処理の公開プレビュー版のリリースによって実現しました。これにより、ストリーム間の JOIN、ウィンドウ集計、タンブリング ウィンドウのネイティブ サポートが導入されます。BigQuery で分散したデータ ストリームを関連付け、移動平均や総計などの複雑な指標を直接計算できるようにすることで、あらゆる SQL ユーザーがストリーム処理を行えるよう、その民主化を推進しています。その結果、イベントの発生を検知して即座に対応するリアルタイムの「アクション システム」を構築するために、専門の外部ツールや高度なデータ サイエンスの専門知識は必要なくなります。このアプローチは、LLM トークンの費用管理にも役立ちます。ステートフル SQL を使用して特定の異常をフィルタリングすることで、エージェントが元データに圧倒されることなく、まさに必要なコンテキストのみを処理できるようになります。

この実装は簡単です。標準 SQL クエリと EXPORT DATA ステートメントを組み合わせることで、一致する行が発生した瞬間に、その行を Pub/Sub トピックに直接ルーティングできます。

lang-sql
読み込んでいます...

パート 2: Pub/Sub と単一メッセージ変換(SMT)

Pub/Sub でスキーマのギャップを埋める。継続的クエリからエクスポートされたイベントデータは、Pub/Sub トピックに直接送信されます。この元データを AI エージェントが使用できるようにするには、エージェントが想定するスキーマに合わせてペイロードを変換する必要があります。

専用の Cloud Functions などをデプロイしてこれらのメッセージを再フォーマットする代わりに、単一メッセージ変換(SMT)を使用して Pub/Sub サブスクリプション内だけで完結させることができます。SMT を使用すると、軽量なインライン JavaScript ユーザー定義関数(UDF)を Pub/Sub 内で直接実行して、ペイロードをその場でマッピング、再形成、クリーンアップできます。

たとえば、BigQuery ペイロードをインターセプトし、Agent Engine が想定する正確なクエリ形式を抽出する JavaScript スニペットを記述した transform.yaml を定義できます。

lang-py
読み込んでいます...

ルーティング パイプラインを構成するには、Pub/Sub push サブスクリプションを作成します。このサブスクリプションは、変換されたすべての BigQuery イベントを AI エージェントの Webhook エンドポイントへ、自動的に直接 push します。

読み込んでいます...

https://storage.googleapis.com/gweb-cloudblog-publish/images/pub_sub_screenshot.max-2200x2200.png

上記の push-endpoint パラメータに注目してください。この Webhook URL は、アーキテクチャの最後の部分である AI エージェント自体によって生成されます。

パート 3: ADK と Vertex AI Agent Engine

エージェントが Vertex AI Agent Engine にデプロイされると、プラットフォームは、これらの受信イベントを受け取るために特別に設計された安全な streamQuery エンドポイントを自動的にプロビジョニングします。

これはオペレーションの頭脳です。異常が検出され、Pub/Sub 経由でルーティングされると、Vertex AI にデプロイされた ADK エージェントがメッセージによってトリガーされます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/vertex_ai_agent_engine.max-1300x1300.png

推論ループを実装するには、ツールを備えたエージェントを定義してデプロイします。

lang-py
読み込んでいます...

具体的な指示とこのカスタム ツールセットを装備したエージェントは、外部のコンテキストを積極的に収集することでアラートを自律的に調査します。BigQuery にクエリを実行してユーザーの取引履歴を取得したり、領収書などの非構造化データを分析したり、Google 検索で調査結果をグラウンディングして販売者の評判を確認したりできます。最終的には、その取引を FALSE_POSITIVE として分類するか、ESCALATION_NEEDED としてフラグを立てます。

人間参加型のメリット

このアプローチは、アーキテクチャのスケーラビリティの中核をなすものです。ノイズを効果的にフィルタリングすることで、運用上のオーバーヘッドを大幅に削減し、調査担当者が最も複雑なケースにのみ時間を費やせるようにします。ADK は豊富なツールとインテグレーションを提供しているため、エージェントはイベントを幅広いエンタープライズ システムにエスカレーションできます。これにより、人間参加型(Human-in-the-loop)のエンゲージメントを実現できるほか、人間監視型(Human-on-the-loop)オブザーバビリティを使用してパイプラインをエンドツーエンドで自動化することもできます。

すべてを 1 つに: エージェント分析

パイプラインが稼働したら、作業は構築からモニタリングに移行します。従来型のソフトウェアとは異なり、自律エージェントはバックグラウンドで永続的に実行されます。バックグラウンドで動作するため、を行っているか、どのくらいの時間がかかっているか、どのくらいの費用がかかっているかを詳細に把握できることが重要です。

デプロイ時に BigQuery Agent Analytics プラグインを初期化することで、ADK はすべてのトレースデータ、ツールの使用状況、実行レイテンシを BigQuery に直接自動的に記録します。

https://storage.googleapis.com/gweb-cloudblog-publish/images/bigquery_results3.max-2200x2200.png

このトレースデータをエージェントが出力した構造化された意思決定と結合することで、高度な分析が可能になります。これにより、動的なダッシュボードを構築し、カスタム アラートを設定して、AI ワークフォースをリアルタイムでモニタリングできるようになります。エージェント分析プラグインの使用方法について詳しくは、こちらの Codelab をご覧ください。

まとめ

リアルタイム データ ストリーミングとエージェント型 AI の融合により、運用上のアラートの処理方法が変わろうとしています。

  1. BigQuery の継続的クエリを使用してリアルタイムで検出する。

  2. Pub/Sub SMT を使用して変換とルーティングを行う。

  3. Vertex AI Agent Engine を使用して調査し、解決する

  4. BigQuery Agent Analytics プラグインを使用して分析する

このアーキテクチャにより、ガバナンスが効いたスケーラブルなサーバーレスの Google Cloud 環境内で、異常が発生した瞬間に対応できる、プロアクティブで自律的なワークフォースを構築できます。

実際に使ってみる準備はできましたか?

Codelab では、この Cymbal Bank パイプラインをゼロから構築する方法を段階的に説明しています。

- Google、デベロッパー アドボケイト、Rachael Deacon-Smith

- BigQuery プロダクト マネージャー、Nick Orlove

投稿先