コンテンツに移動
データ分析

Pub/Sub インポート トピックを使用して AWS Kinesis から Google Cloud へデータを簡単にストリーミングする

2024年6月14日
Jaume Marhuenda-Beltran

Software Engineer

Gemini 1.5 モデル をお試しください。

Vertex AI からアクセスできる、Google のもっとも先進的なマルチモーダル モデルです。

試す

※この投稿は米国時間 2024 年 5 月 31 日に、Google Cloud blog に投稿されたものの抄訳です。

単一のプロバイダに縛られるのを避けるため、冗長性を高めるため、あるいは異なるクラウド プロバイダの差別化された製品を利用するためといった理由で、多くの企業がビジネスをサポートするためにマルチクラウド モデルを採用しています。

Google Cloud で最も愛用され、差別化されたプロダクトの一つが BigQuery で、フルマネージドで AI に対応したマルチクラウド データ分析プラットフォームを提供します。BigQuery Omni は、BigQuery を使用して AWS Azure のデータをクエリし、その結果を Google Cloud コンソールに表示できる統合管理インターフェースを提供しています。また、クラウド間でリアルタイムにデータを組み合わせたり移動したりする場合、Pub/Sub は、外部ソースから Pub/Sub へのワンクリックでのストリーミング取り込みを可能にする新しい機能、インポート トピックを提供します。最初にサポートされる外部ソースは、Amazon Kinesis Data Streams です。これらの新しいインポート トピックと Pub/Sub BigQuery サブスクリプションを活用して、AWS のストリーミング データを数クリックで BigQuery で利用できるようにする方法を見ていきましょう。

インポート トピックの概要

Pub/Sub はスケーラブルな非同期メッセージング サービスであり、メッセージを生成するサービスと、それらのメッセージを処理するサービスを切り離すことができます。Pub/Sub は、クライアント ライブラリを使って任意のソースから任意のシンクへのデータ ストリーミングに使用できるようになっており、Google Cloud のエコシステム内にうまく統合されています。Pub/Sub は、BigQuery Cloud Storage にデータを自動的にストリーミングするエクスポート サブスクリプションをサポートしています。また、Pub/Sub Cloud Functions Cloud Run とネイティブに統合されており、たとえば、Google Kubernetes EngineGKE)や Google Compute Engine、またはどのオンプレミスでも、任意の一般アクセス可能なエンドポイントにメッセージを配信できます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/1_cross_cloud_streaming.max-1400x1400.png

エクスポート サブスクリプションで提供される機能は BigQuery Cloud Storage へのデータの書き込み、インポート トピックでは Amazon Kinesis Data Streams からのデータの読み込みとなります。これは、Amazon Kinesis Data Streams から Pub/Sub に直接データを取り込むための、合理化されたフルマネージドの方法です。その結果、クラウド間のデータ パイプライン設定の複雑さが大幅に軽減されます。インポート トピックは、データの取り込みプロセスの健全性とパフォーマンスを可視化するための、すぐに使用できるモニタリングも提供します。さらに、インポート トピックは自動スケーリングも提供しているため、データ量の変動に対応するための手動の構成は不要です。

インポート トピックにより、BigQuery によるマルチクラウド分析が実現するだけでなく、Amazon Kinesis Data Streams から Pub/Sub へのストリーミング データの移行も容易になります。インポート トピックを介して両システム間の接続が確立されると、Amazon Kinesis 製品は、任意のスケジュールで徐々に Pub/Sub パブリッシャーに移行できるようになります。

https://storage.googleapis.com/gweb-cloudblog-publish/images/2_aws_kinesis_migration.max-1000x1000.png

なお、現時点でサポートしているのは、拡張ファンアウトを備えた Amazon Kinesis コンシューマのみになります。

Amazon Kinesis Data Streams のデータを BigQuery で分析する

たとえば、あなたが Amazon Kinesis Data Streams に保存されている、量の変動がきわめて激しいストリーミング データを使ってビジネスを運営しているとします。こうしたデータは分析と意思決定に不可欠であり、BigQuery を活用して分析したいと考えています。まず、これらの詳細な手順に沿ってインポート トピックを作成します。インポート トピックは、さまざまな公式 Pub/Sub ライブラリ Google Cloud コンソールを使って作成できます。コンソールの Pub/Sub ページで [トピックを作成] をクリックすると、以下が表示されます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/3_create_topic.max-600x600.png

重要: [作成] を押すと、Pub/Sub はすぐに Amazon Kinesis のデータ ストリームからの読み込みを開始し、インポート トピックにメッセージをパブリッシュします。すでに Kinesis データ ストリームにデータがある場合、インポート トピックを作成する際にデータの損失を防ぐためにいくつかの手順を踏む必要があります。トピックにサブスクリプションが関連付けられておらず、メッセージ保持が有効になっていない場合、Pub/Sub はそのトピックにパブリッシュされたメッセージを破棄することがあります。トピックを作成するときにデフォルトのサブスクリプションを作成するだけでは十分ではありません。これらはまだ 2 つの個別のオペレーションであり、トピックがサブスクリプションなしで存在する期間が短時間あります。

データの損失を防ぐには、次の 2 つのオプションがあります。

  1. トピックを作成し、それを更新してインポート トピックにする

    1. インポート以外のトピックを作成します。

    2. トピックへのサブスクリプションを作成します。

    3. トピック構成を更新して Amazon Kinesis Data Streams からの取り込みを有効にし、インポート トピックにします。

  2. メッセージ保持を有効にしてサブスクリプションをシークする

    1. メッセージ保持を有効にしてインポート トピックを作成します。

    2. トピックへのサブスクリプションを作成します。

    3. トピック作成前のタイムスタンプまでサブスクリプションをシークします

エクスポート サブスクリプションは、作成されるとすぐにデータの書き込みを開始します。そのため過去にさかのぼってシークすると重複が発生する可能性があります。そのため、エクスポート サブスクリプションを使用する際は、最初のオプションを使用することをおすすめします。

データを BigQuery に転送するには、BigQuery サブスクリプションを作成します。そのためには、Pub/Sub コンソールの Pub/Sub サブスクリプション ページに移動し、[サブスクリプションを作成] をクリックします。

https://storage.googleapis.com/gweb-cloudblog-publish/images/image7_Q7hE1yB.max-600x600.png

Pub/Sub は、Amazon Kinesis のデータ ストリームを積極的にモニタリングすることで自動スケーリングします。Amazon Kinesis ListShards API に定期的にクエリを実施して、ストリームのシャードの最新状態を確認できるようにします。Amazon Kinesis のデータ ストリーム内で変更(再シャーディング)が発生するたびに、Pub/Sub は自動的にその取り込みの構成を適応させ、すべてのデータを取り込んで、Pub/Sub トピックにパブリッシュするようにします。

Pub/Sub は、Amazon Kinesis SubscribeToShard API を使用して、親シャードを持たない、または親シャードがすでに取り込まれている各シャードに対して永続的な接続を確立し、Amazon Kinesis のデータ ストリームの異なるシャードからのデータを継続的に取り込みます。Pub/Sub は、親シャードが完全に取り込まれるまで子シャードの取り込みを開始しません。ただし、メッセージは順序指定キーなしでパブリッシュされるため、厳密な順序指定キーはありません。個々の Amazon Kinesis レコードは、Amazon Kinesis レコードのデータ blob Pub/Sub メッセージのデータ フィールドにコピーする(その後パブリッシュされる)ことで、対応する Pub/Sub メッセージに変換されます。Pub/Sub は、Amazon Kinesis シャードごとのデータ読み取りレートを最大化しようとします。

これで、BigQuery テーブルに直接クエリを実行することで、データ転送が成功したことを確認できるようになりました。簡単な SQL クエリで Amazon Kinesis からのデータがテーブルに入力されたことを確認できるようになり、さらに分析を進め、より広範な分析ワークフローに統合するための準備に着手できます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/5_sql_query.max-1000x1000.png

クロスクラウド インポートをモニタリングする

データの取り込みパイプラインのモニタリングは、スムーズなオペレーションを確保するために不可欠です。最近、Pub/Sub 3 つの新しい指標が追加され、インポート トピックの健全性を確認し、そのパフォーマンスを把握できるようになりました。バイト数、メッセージ数、トピックの状態が表示されます。状態が「アクティブ」でない限り、取り込みは構成ミス、ストリームの欠如、またはコンシューマの欠如によってブロックされます。詳しくは、公式ドキュメントで潜在的なエラー状態とそのトラブルシューティング手順の包括的なリストをご確認ください。これらの指標はトピックの詳細ページから簡単にアクセスでき、そこで状態が「アクティブ」かどうか、そのスループット、1 秒あたりのメッセージを確認できます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/6_dashboards.max-1100x1100.png

まとめ

複数のクラウド環境での運用は、多くの企業にとって標準的な業務手順となっています。ビジネスのさまざまな部分で異なるクラウドを使用する場合でも、それぞれのクラウドが提供する最適な製品を活用できるようにする必要があります。Pub/Sub により、AWS から Google Cloud へのデータ ストリーミングが簡単になりました。使用を開始するには、Google Cloud コンソール Pub/Sub をご覧いただくか、無料トライアルに登録して、今すぐお試しください。

-ソフトウェア エンジニア Jaume Marhuenda-Beltran

投稿先