コンテンツに移動
データ分析

Google Cloud でのストリーミング データ パイプラインの構築

2023年3月16日
https://storage.googleapis.com/gweb-cloudblog-publish/images/da_2022_xYbOMKi.max-2436x985.jpg
Google Cloud Japan Team

※この投稿は米国時間 2023 年 2 月 23 日に、Google Cloud blog に投稿されたものの抄訳です。

多くのお客様が、データの取り込みと処理を行い、後で分析できるようにそのデータを保存するために、ストリーミング データ パイプラインを構築しています。このブログ投稿では、以下に示す一般的なパイプライン設計に注目します。この設計は 3 つのステップから構成されています。

  • データソースが、データを含むメッセージを Pub/Sub トピックに送信する。

  • Pub/Sub がメッセージをバッファし、処理コンポーネントに転送する。

  • 処理コンポーネントが処理を終えた後でデータを BigQuery に保存する。

処理コンポーネントについては、基本的なものから高度なものまで 3 種類(BigQuery のサブスクリプション、Cloud Run サービス、Dataflow パイプライン)を取り上げます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/2_streaming_data.8897898d.fill-2000x1124.jpg

サンプル ユースケース

実装の詳細についての説明に入る前に、ストリーミング データ パイプラインのサンプル ユースケースをいくつかご紹介します。

  • 広告クリックの処理。広告クリックを受け取り、クリックごとに不正予測ヒューリスティックを実行して、クリックを破棄するか、さらに分析するために保存する。

  • データ形式の正規化。さまざまなソースからデータを受け取り、単一のデータモデルに正規化して、後で分析するために、またはさらに処理するために保存する。

  • テレメトリーの取り込み。ユーザーとのやり取りを保存し、アクティブ ユーザーやセッションの平均の長さなどのリアルタイムの統計情報をデバイスの種類別にグループ化して表示する。

  • 変更データ キャプチャ ログの記録。Pub/Sub を介して、すべてのデータベースの更新情報をデータベースから BigQuery にロギングする。

Pub/Sub によるデータの取り込み

それでは、最初から始めましょう。Pub/Sub トピックにメッセージをパブリッシュする 1 つまたは複数のデータソースがあります。Pub/Sub はフルマネージドのメッセージ サービスです。ユーザーがメッセージをパブリッシュすると、Pub/Sub が 1 つまたは多数のサブスクライバーにメッセージを配信します。Pub/Sub にメッセージをパブリッシュする最も便利な方法は、クライアント ライブラリを使用することです。

Pub/Sub で認証を受けるには認証情報を提供する必要があります。データ プロデューサーが Google Cloud で実行されている場合、クライアント ライブラリがこの処理を担当し、組み込みのサービス ID を使用します。ワークロードが Google Cloud で実行されていない場合、ID 連携を使用する必要があります。または、最終手段として、サービス アカウント キーをダウンロードすることもできます(ただし、有効期間が長い認証情報であるため、必ず認証情報のローテーション戦略を採用してください)。

3 種類の処理コンポーネント

単純なパイプラインがあれば複雑なパイプラインもあるという認識が重要です。単純なパイプラインは、データを永続化する前に一切の処理を行いません(または軽量な処理を行います)。高度なパイプラインは、データグループを集約してデータのストレージ要件を緩和し、複数の処理ステップを実行できます。

ここでは、以下の 3 種類のオプションのいずれかを使用して処理を行う方法について説明します。

  • BigQuery のサブスクリプション。メッセージを変更せずに BigQuery データセットに保存する、ノーコードのパススルー ソリューション。

  • Cloud Run サービス。集約を行わない、個々のメッセージの軽量な処理向け。

  • Dataflow パイプライン。高度な処理向け(詳細は後述します)。

アプローチ 1: BigQuery のサブスクリプションを使用してデータを変更せずに保存する

https://storage.googleapis.com/gweb-cloudblog-publish/images/4_streaming_data.4f849e48.fill-2000x1124.jpg

この最初のアプローチが最も単純です。BigQuery のサブスクリプションを使用すると、Pub/Sub トピックから直接 BigQuery のデータセットにメッセージをストリーミングできます。このアプローチは、メッセージを取り込んでいて、データの保存前に一切の処理を行う必要がない場合に適しています。

トピックへの新しいサブスクリプションを設定するときに、以下のように [BigQuery への書き込み] オプションを選択します。
https://storage.googleapis.com/gweb-cloudblog-publish/images/5_streaming_data.ba723acd.fill-2000x1124.jpg

ユーザーは、このサブスクリプションの実装方法の詳細を一切把握していません。つまり、受信データに対してコードを実行できません。いわゆる「ノーコード ソリューション」です。この場合、保存前にデータに対してフィルタリングを適用できません。

また、まずデータを保存して、後から BigQuery で処理を行う場合にもこのパターンを利用できます。一般に、これを「ELT(抽出、読み込み、変換)」と呼びます。

ヒント: メッセージが BigQuery に正確に 1 回書き込まれるという保証はないため、後でクエリする際に必ずデータの重複を除去するようにしてください。

アプローチ 2: Cloud Run を使用してメッセージを個別に処理する

個々のメッセージを保存する前に軽量な処理を行う必要がある場合は、Cloud Run を使用します。軽量な変換の好例は、すべてのデータソースが独自の形式とフィールドを使用しているものの、1 つの形式でデータを保存したい場合に行うデータ形式の正規化です。

https://storage.googleapis.com/gweb-cloudblog-publish/images/1_streaming_data.280c3cb5.fill-2000x1124.jpg

Cloud Run を使用すると、Google のインフラストラクチャ上で直接ウェブサービスとしてコードを実行できます。Cloud Run サービスの HTTPS エンドポイントに push サブスクリプションを使用してすべてのメッセージを HTTP リクエストとして送信するよう Pub/Sub を構成できます。リクエストが届くと、コードが処理を行って BigQuery Storage Write API を呼び出し、データを BigQuery テーブルに挿入します。Cloud Run では、任意のプログラミング言語とフレームワークを使用できます。

2022 年 2 月の時点では、Cloud Run への Pub/Sub の統合方法として push サブスクリプションが推奨されています。push サブスクリプションは失敗したリクエストを自動的に再試行するため、すべての配信試行に失敗したメッセージを受信するようにデッドレター トピックを設定できます。詳しくは、メッセージ エラーの処理をご覧ください。

パイプラインにデータが一切送信されないという状況もあります。その場合、Cloud Run はインスタンス数を自動的にゼロにスケールします。逆に、ピーク時の負荷を処理する場合は、最大 1,000 のコンテナ インスタンスまでスケールアウトできます。費用に関して不安がある場合は、インスタンスの最大数を設定できます。

Cloud Run ではデータスキーマを簡単に進化させることができます。Liquibase などの定評あるツールを使用して、データスキーマの移行を定義し、管理できます。詳しくは、BigQuery での Liquibase の使い方をご覧ください。

セキュリティ強化のため、Cloud Run マイクロサービスの上り(内向き)ポリシーを内部に設定します。これにより、アクセスが Pub/Sub(およびその他の内部サービス)からのみになるため、サブスクリプション用のサービス アカウントを作成して、そのサービス アカウントのみに Cloud Run サービスへのアクセスを提供できます。詳しくは、push サブスクリプションを安全に設定する方法をご覧ください。

以下の場合は、Cloud Run をパイプラインの処理コンポーネントとして使用することを検討します。

  • メッセージのグループ化と集約を行わずにメッセージを個別に処理できる。

  • 特殊な SDK ではなく汎用のプログラミング モデルを使用したい。

  • すでに Cloud Run を使用してウェブ アプリケーションを提供しており、使用しているソリューション アーキテクチャのシンプルさと整合性を気に入っている。

ヒント: Storage Write API は、REST over HTTP ではなく gRPC ストリーミングを使用します。このため、以前の insertAll メソッドよりも効率的です。

アプローチ 3: Dataflow を使用してメッセージの高度な処理と集約を行う

Cloud Dataflow は Google Cloud で Apache Beam パイプラインを実行するためのフルマネージド サービスであり、長い間、Google Cloud でストリーミング パイプラインを構築するための基盤であり続けてきました。このアプローチは、データグループを集約してデータを削減するパイプラインと、複数の処理ステップを実行するパイプラインに適しています。Cloud Dataflow には、複数ステップのパイプラインで問題のトラブルシューティングを簡単に行えるようにする UI があります。

データ ストリームでは、ウィンドウ処理によってグループ化が行われます。ウィンドウ処理機能が、タイムスタンプ別に無限のコレクションをグループ化します。ウィンドウ処理には、タンブリング ウィンドウ、ホッピング ウィンドウ、セッション ウィンドウなどの複数の戦略があります。詳しくは、データ ストリーミングに関するドキュメントをご覧ください。

Cloud Dataflow は AI / ML ワークロードにも利用でき、Tensorflow を使用して ML モデルで前処理、トレーニング、予測を行うことを望むユーザーに適しています。こちらの Dataflow をエンドツーエンドの ML ワークフローに統合する優れたチュートリアルのリストをご活用ください。

Cloud Dataflow は大規模なデータ処理向けに設計されています。代表的な事例は、Cloud Dataflow を使用して年間のパーソナル Wrapped プレイリストを算出している Spotify です。詳しくは、Spotify のエンジニアリング ブログで 2020 年の Wrapped パイプラインの分析に関する投稿をご覧ください。

Dataflow は、クラスタを垂直方向にも水平方向にも自動スケールできます。ユーザーはクラスタで GPU 搭載インスタンスを使用することもできます。その場合、Cloud Dataflow が新しいワーカーをクラスタに取り込んで需要を満たします。また、後でワーカーが不要になった場合は、それらの破棄も行います。


ヒント: 費用を削減するためにクラスタ内のワーカー数の上限を定め、料金のアラートを設定してください。

採用するアプローチの判断

https://storage.googleapis.com/gweb-cloudblog-publish/images/3_streaming_data.b01f6f46.fill-2000x1124.jpg

3 種類のツールはそれぞれ、機能も複雑性も異なります。Dataflow が最も優れた選択肢ですが、同時に最も複雑でもあり、ユーザーは特殊な SDK(Apache Beam)を使用してパイプラインを構築する必要があります。その一方、BigQuery のサブスクリプションは処理ロジックを一切許可せず、ウェブ コンソールを使用して構成できます。ニーズに最適なツールを選択することで、より優れた成果をより迅速に上げることができます。

大規模な(Spotify 規模の)パイプラインの場合、ウィンドウ処理を使用してデータを削減する必要がある場合、または複雑なマルチステップのパイプラインがある場合は、Dataflow を選択してください。それ以外の場合はすべて、Cloud Run から開始することをおすすめします。ただし、ノーコードのソリューションによって Pub/Sub を BigQuery に接続することを希望する場合は除きます。その場合は、BigQuery のサブスクリプションを選択してください。

費用という別の検討事項もあります。Cloud Dataflow は自動スケーリングを適用しますが、受信データがない場合にゼロ インスタンスにスケールすることはありません。この理由から、Dataflow ではなく Cloud Run が選択される場合もあります。  

以下の表に主な相違点をまとめました。

https://storage.googleapis.com/gweb-cloudblog-publish/images/6_a.max-923x519.jpg

次のステップ


本投稿に貢献してくれた共著者の Graham Polley 氏Zencore)に感謝します。同氏には LinkedIn または Twitter から連絡できます。また、レビュー フィードバックを寄せてくれた Mete、Sara、Jakob、Valentin、Guillaume、Sean、Kobe、Christopher、Jason、Wei にも感謝します。


- Google Cloud、デベロッパー アドボケイト Wietse Venema
Zencore、テクノロジーおよびデリバリー担当責任者(EMEA)Graham Polley 氏
投稿先