このチュートリアルでは、Google Cloud の使用時にフロントエンド アプリ(この場合はウェブページ)で大量の受信データを処理する方法について説明します。また、大量のメッセージ ストリームを処理する際の課題についても説明します。このチュートリアルのサンプルアプリでは、WebSockets を使用して Pub/Sub トピックに公開された大量のメッセージ ストリームを可視化し、フロントエンドの効率性を維持しながら、メッセージをタイムリーに処理する方法を示します。
このチュートリアルは、HTTP を介したブラウザ間通信と、HTML、CSS、JavaScript を使用したフロントエンド アプリの作成に精通しているデベロッパーを対象としています。また、Google Cloud の使用経験があり、Linux コマンドライン ツールに精通していることを前提としています。
目標
- 仮想マシン(VM)インスタンスを作成し、Pub/Sub サブスクリプションのペイロードをブラウザ クライアントにストリーミングするためのコンポーネントを構成します。
- Pub/Sub トピックをサブスクライブし、個々のメッセージをログに出力するように VM のプロセスを構成します。
- ウェブサーバーをインストールして静的コンテンツを提供し、シェルコマンド出力を WebSocket クライアントにストリーミングします。
- HTML、CSS、JavaScript を使用して、WebSocket ストリームの集計と個々のメッセージ サンプルをブラウザで可視化します。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
- このチュートリアルに記載されているコマンドを実行するために、Cloud Shell を開きます。
このチュートリアルでは、Cloud Shell からすべてのターミナル コマンドを実行します。
- Compute Engine API と Pub/Sub API を有効にします。
gcloud services enable compute pubsub
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
はじめに
イベント ドリブン モデルを採用するアプリが増えています。フロントエンド アプリでは、これらのアーキテクチャの基盤となるメッセージング サービスに簡単に接続できることが重要になります。
ウェブブラウザ クライアントにデータをストリーミングする方法はいくつかありますが、よく使われているのが WebSocket です。このチュートリアルでは、Pub/Sub トピックに公開されるメッセージ ストリームをサブスクライブするプロセスをインストールし、WebSocket 経由で接続しているクライアントにウェブサーバーからメッセージを転送します。
このチュートリアルでは、NYC Taxi Tycoon Google Dataflow CodeLab で使用されている一般公開の Pub/Sub トピックを使用します。このトピックは、Taxi & Limousine Commission の移動記録データセットからニューヨーク市の配車履歴データを取得して利用状況のシミュレーションを行い、その結果をリアルタイムで提供します。
アーキテクチャ
次の図に、このチュートリアルで構築するアーキテクチャを示します。
この図では、メッセージ パブリッシャーが Compute Engine リソースを含むプロジェクトの外部にあり、このパブリッシャーが Pub/Sub トピックにメッセージを送信します。メッセージは Compute Engine インスタンスから WebSocket 経由でブラウザに送信され、HTML5 と JavaScript で作成されたダッシュボードに表示されます。
このチュートリアルでは、Pub/Sub と WebSocket を連携させるため、次のツールを使用しています。
pulltop
は、このチュートリアルでインストールする Node.js プログラムです。このツールは Pub/Sub トピックをサブスクライブし、受信したメッセージを標準出力にストリーミングします。websocketd
は、既存のコマンドライン インターフェース プログラムをラップし、WebSocket を使用してアクセスできるようにする小さなコマンドライン ツールです。
pulltop
と websocketd
を組み合わせることで、Pub/Sub トピックから受信したメッセージを WebSocket 経由でブラウザにストリーミングすることが可能になります。
Pub/Sub トピックのスループットの調整
NYC Taxi Tycoon の一般公開の Pub/Sub トピックでは、1 秒間に 2,000〜2,500 件の配車データが更新され、1 秒あたり最大で 8 MB のデータが生成されます。キュー内で未確認メッセージが増加していることを Pub/Sub が検知すると、Pub/Sub に組み込まれたフロー制御により、サブスクライバーのメッセージ レートが自動的に低下します。このため、ワークステーション、ネットワーク接続、フロントエンド処理コードによって、メッセージレートの変動が大きくなることがあります。
効率的なブラウザ メッセージの処理
WebSocket ストリーム経由で大量のメッセージを受信する場合は、このストリームを処理するフロントエンド コードを作成する必要があります。たとえば、メッセージごとに HTML 要素を動的に生成するようにします。ただし、想定されたメッセージ レートで各メッセージのページを更新すると、ブラウザ ウィンドウがロックされる可能性があります。また、HTML 要素を動的に生成すると、メモリの割り当てが頻繁に発生してガベージ コレクションの期間が長くなり、ユーザー エクスペリエンスが低下します。毎秒 2, 000 件届くメッセージごとに document.createElement()
を呼び出す必要はありません。
このチュートリアルでは、このような大量のメッセージ ストリームを管理するため、次のことを行います。
- 一連のストリーム指標をリアルタイムで計算し、継続的に更新する。これにより、モニタリングされたメッセージに関する大部分の情報を集計値として表示できます。
- ブラウザベースのダッシュボードを使用して、事前に定義されたスケジュールで個々のメッセージのサンプルを視覚化し、降車イベントと乗車イベントのみをリアルタイムで表示する。
次の図に、このチュートリアルで作成したダッシュボードを示します。
この図は、最後のメッセージのレイテンシが 24 ミリ秒で、メッセージ レートが 1 秒あたり約 2,100 件であることを示しています。個々のメッセージを処理するクリティカル コードパスが時間内に完了しないと、最後のメッセージのレイテンシが増加するため、1 秒あたりのモニタリング メッセージの数が減少します。配車のサンプリングは 3 秒に 1 回のサイクルで実行される JavaScript setInterval
API で行われます。これにより、フロントエンドで大量の DOM 要素が生成されなくなります(その大半は毎秒 10 を超えるレートでは確認できません)。
ダッシュボードは、ストリームの途中でイベントの処理を開始するため、進行中の配車は以前に表示されていない限りダッシュボードで新しいデータとして認識されます。このコードは、ride_id
値をインデックスとした連想配列で配車データを格納します。乗客が降車すると、その配車への参照を削除します。状態が enroute または pickup の配車は、その配車が確認済みでない限り、その配列への参照を追加します。
WebSocket サーバーをインストールして構成する
まず、WebSocket サーバーとして使用する Compute Engine インスタンスを作成します。インスタンスを作成したら、後で必要なツールをインストールします。
Cloud Shell で、デフォルトの Compute Engine ゾーンを設定します。次の例では
us-central1-a
を設定していますが、任意のゾーンを使用できます。gcloud config set compute/zone us-central1-a
デフォルト ゾーンに
websocket-server
という名前の Compute Engine インスタンスを作成します。gcloud compute instances create websocket-server --tags wss
ファイアウォール ルールを追加し、ポート
8000
でwss
タグ付きのインスタンスに TCP トラフィックを許可します。gcloud compute firewall-rules create websocket \ --direction=IN \ --allow=tcp:8000 \ --target-tags=wss
既存のプロジェクトを使用している場合は、インスタンスへの SSH 接続が許可されるように、TCP ポート
22
が開いていることを確認してください。デフォルトでは、
default-allow-ssh
ファイアウォール ルールはデフォルト ネットワークで有効になっています。ただし、自身または管理者が既存のプロジェクトでデフォルト ルールを削除した場合、TCP ポート22
が開かない可能性があります(このチュートリアル用に新しいプロジェクトを作成した場合、このルールはデフォルトで有効になり、操作は不要です)。ファイアウォール ルールを追加し、ポート
22
でwss
タグ付きのインスタンスに TCP トラフィックを許可します。gcloud compute firewall-rules create wss-ssh \ --direction=IN \ --allow=tcp:22 \ --target-tags=wss
SSH を使用してインスタンスに接続します。
gcloud compute ssh websocket-server
インスタンスのターミナル コマンドで、アカウントを
root
に切り替えて、ソフトウェアをインストールできるようにします。sudo -s
git
ツールとunzip
ツールをインストールします。apt-get install -y unzip git
インスタンスに
websocketd
バイナリをインストールします。cd /var/tmp/ wget \ https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip unzip websocketd-0.3.0-linux_386.zip mv websocketd /usr/bin
Node.js とチュートリアル コードをインストールする
インスタンスのターミナルで Node.js をインストールします。
curl -sL https://deb.nodesource.com/setup_10.x | bash - apt-get install -y nodejs
チュートリアルのソース リポジトリをダウンロードします。
exit cd ~ git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
pulltop
の権限を変更して実行を許可します。cd solutions-pubsub-websockets chmod 755 pulltop/pulltop.js
pulltop
依存関係をインストールします。cd pulltop npm install sudo npm link
pulltop がメッセージを読み取れることをテストする
インスタンスで、公開トピックに対して
pulltop
を実行します。pulltop projects/pubsub-public-data/topics/taxirides-realtime
pulltop
が機能している場合は、次のような結果が表示されます。{"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude" :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6 593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat us":"enroute","passenger_count":1}
[
Ctrl+C
] を押してストリームを停止します。
websocketd とのメッセージ フローを確立する
pulltop
が Pub/Sub トピックを読み取れることを確認したら、websocketd
プロセスを開始してブラウザへのメッセージ送信を開始できます。
トピック メッセージをローカル ファイルにキャプチャする
このチュートリアルでは、pulltop
から取得したメッセージ ストリームをキャプチャし、ローカル ファイルに書き込みます。メッセージ トラフィックをローカル ファイルにキャプチャする場合、ストレージ要件が増えますが、Pub/Sub トピック メッセージのストリーミング と websocketd
プロセスのオペレーションは分離されます。ローカルで情報をキャプチャする場合は、現在接続している WebSocket クライアントを強制的にリセットするのではなく、Pub/Sub ストリーミングを一時的に停止できます。メッセージ ストリームが再度確立されると、websocketd
はクライアントへのメッセージ ストリーミングを自動的に再開します。
インスタンスで、公開トピックに
pulltop
を実行し、メッセージの出力をローカルのtaxi.json
ファイルにリダイレクトします。ログアウトするかターミナルを閉じると、nohup
コマンドはpulltop
プロセスを続行するように OS に指示します。nohup pulltop \ projects/pubsub-public-data/topics/taxirides-realtime > \ /var/tmp/taxi.json &
JSON メッセージがファイルに書き込まれていることを確認します。
tail /var/tmp/taxi.json
メッセージが
taxi.json
ファイルに書き込まれている場合、出力は次のようになります。{"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude" :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6 593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta tus":"enroute","passenger_count":1}
アプリのウェブフォルダに移動します。
cd ../web
websocketd
を起動して、WebSocket を使用してローカル ファイルのコンテンツ ストリーミングを開始します。nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
これにより、
websocketd
コマンドがバックグラウンドで実行されます。websocketd
ツールは、tail
コマンドの出力を使用し、WebSocket メッセージとして各要素をストリーミングします。nohup.out
の内容を参照して、サーバーが正しく起動したことを確認します。tail nohup.out
すべてが正常に機能している場合、出力は次のようになります。
Mon, 25 Mar 2019 14:03:53 -0400 | INFO | server | | Serving using application : /usr/bin/tail -f /var/tmp/taxi.json Mon, 25 Mar 2019 14:03:53 -0400 | INFO | server | | Serving static content from : .
メッセージの可視化
Pub/Sub トピックに公開された個々の配車メッセージは、次のような構造になっています。
{ "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69", "point_idx": 248, "latitude": 40.74644000000001, "longitude": -73.97144, "timestamp": "2019-03-24T00:46:08.49094-04:00", "meter_reading": 8.40615, "meter_increment": 0.033895764, "ride_status": "enroute", "passenger_count": 1 }
これらの値に基づいて、ダッシュボードのヘッダーの指標を計算します。この計算は、インバウンドの配車イベントごとに 1 回実行されます。値は次のとおりです。
- 最後のメッセージのレイテンシ。最後に確認された配車イベントのタイムスタンプから現在の時刻(ウェブブラウザをホストしているシステムの時刻)までの秒数。
- アクティブな配車。現在進行中の配車です。この値は急増する場合があります。また、
ride_status
の値がdropoff
になると減少します。 - メッセージ レート。1 秒あたりに処理された配車イベントの平均数。
- メーター料金の合計額。アクティブな配車のメーター料金の合計額。この値は、乗客が降車すると減少します。
- 乗客の総数。乗客の数。この数は、乗車が完了すると減少します。
- 乗車 1 台あたりの平均乗車人数。乗客数の合計数を配車数の合計数で割った値。
- 乗客 1 人あたりの平均メーター料金。メーター料金の合計金額を乗客の総数で割った値。
乗客が乗車または降車すると、指標と個々の配車サンプルのほかに、配車サンプルのグリッドの上にアラート通知が表示されます。
現在のインスタンスの外部 IP アドレスを取得します。
curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
その IP アドレスをコピーします。
ローカルマシンで新しいウェブブラウザを開き、URL を入力します。
http://$ip-address:8000
このチュートリアルのダッシュボードにページが表示されます。
上部にあるタクシーのアイコンをクリックしてストリームへの接続を開き、メッセージの処理を開始します。
個々の配車が視覚化され、9 つの配車サンプルが 3 秒ごとにレンダリングされます。
タクシーのアイコンをクリックすると、WebSocket ストリームを開始または停止できます。WebSocket 接続が切断されると、アイコンが赤に変わり、指標と個々の配車情報の更新が停止します。再接続するには、タクシーのアイコンをもう一度クリックします。
パフォーマンス
次のスクリーンショットは、Chrome デベロッパー ツールのパフォーマンス モニターです。ブラウザのタブが 1 秒あたり約 2,100 件のメッセージを処理していることを表しています。
約 30 ミリ秒のレイテンシでメッセージがディスパッチされると、CPU 使用率は平均で約 80% になります。メモリ使用率は最小の 29 MB です。合計で 57 MB が割り当てられていますが、増減は自由に行われます。
クリーンアップ
ファイアウォール ルールを削除する
このチュートリアルで既存のプロジェクトを使用した場合、作成したファイアウォール ルールを削除できます。開いているポートを最小限に抑えることをおすすめします。
ポート
8000
で TCP を許可するように作成したファイアウォール ルールを削除します。gcloud compute firewall-rules delete websocket
SSH 接続を許可するファイアウォール ルールも作成した場合は、ポート
22
で TCP を許可するファイアウォール ルールを削除します。gcloud compute firewall-rules delete wss-ssh
プロジェクトの削除
このプロジェクトを再度使用したくない場合は、プロジェクトを削除できます。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
次のステップ
- Pub/Sub と WebSocket プロトコルについて学習する。
- Google Maps Platform API キーを
cabdash.js
に追加して、乗車と降車の位置情報を取得する。 - Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud アーキテクチャ センターをご覧ください。