WebSocket での Pub/Sub メッセージのストリーミング


このチュートリアルでは、Google Cloud の使用時にフロントエンド アプリ(この場合はウェブページ)で大量の受信データを処理する方法について説明します。また、大量のメッセージ ストリームを処理する際の課題についても説明します。このチュートリアルのサンプルアプリでは、WebSocket を使用して Pub/Sub トピックに公開された大量のメッセージ ストリームを可視化し、フロントエンドの効率性を維持しながら、メッセージをタイムリーに処理します。

このチュートリアルは、HTTP を介したブラウザ間通信と、HTML、CSS、JavaScript を使用したフロントエンド アプリの作成に精通しているデベロッパーを対象としています。また、Google Cloud の使用経験があり、Linux コマンドライン ツールに精通していることを前提としています。

目標

  • 仮想マシン(VM)インスタンスを作成し、Pub/Sub サブスクリプションのペイロードをブラウザ クライアントにストリーミングするためのコンポーネントを構成します。
  • Pub/Sub トピックをサブスクライブし、個々のメッセージをログに出力するように VM のプロセスを構成します。
  • ウェブサーバーをインストールして静的コンテンツを提供し、シェルコマンド出力を WebSocket クライアントにストリーミングします。
  • HTML、CSS、JavaScript を使用して、WebSocket ストリームの集計と個々のメッセージ サンプルをブラウザで可視化します。

料金

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. このチュートリアルに記載されているコマンドを実行するために、Cloud Shell を開きます。

    Cloud Shell に移動

    このチュートリアルでは、Cloud Shell からすべてのターミナル コマンドを実行します。

  7. Compute Engine API と Pub/Sub API を有効にします。
    gcloud services enable compute pubsub

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

はじめに

イベント ドリブン モデルを採用するアプリが増えています。フロントエンド アプリでは、これらのアーキテクチャの基盤となるメッセージング サービスに簡単に接続できることが重要になります。

ウェブブラウザ クライアントにデータをストリーミングする方法はいくつかありますが、よく使われているのが WebSocket です。このチュートリアルでは、Pub/Sub トピックに公開されるメッセージ ストリームをサブスクライブするプロセスをインストールし、WebSocket 経由で接続しているクライアントにウェブサーバーからメッセージを転送します。

このチュートリアルでは、NYC Taxi Tycoon Google Dataflow CodeLab で使用されている一般公開の Pub/Sub トピックを使用します。このトピックは、Taxi & Limousine Commission の移動記録データセットからニューヨーク市の配車履歴データを取得して利用状況のシミュレーションを行い、その結果をリアルタイムで提供します。

アーキテクチャ

次の図に、このチュートリアルで構築するアーキテクチャを示します。

チュートリアルのアーキテクチャ

この図では、メッセージ パブリッシャーが Compute Engine リソースを含むプロジェクトの外部にあり、このパブリッシャーが Pub/Sub トピックにメッセージを送信します。メッセージは Compute Engine インスタンスから WebSocket 経由でブラウザに送信され、HTML5 と JavaScript で作成されたダッシュボードに表示されます。

このチュートリアルでは、Pub/Sub と WebSocket を連携させるため、次のツールを使用しています。

  • pulltop は、このチュートリアルでインストールする Node.js プログラムです。このツールは Pub/Sub トピックをサブスクライブし、受信したメッセージを標準出力にストリーミングします。
  • websocketd は、既存のコマンドライン インターフェース プログラムをラップし、WebSocket 経由でアクセスできるようにする小さなコマンドライン ツールです。

pulltopwebsocketd を組み合わせることで、Pub/Sub トピックから受信したメッセージを WebSocket 経由でブラウザにストリーミングすることが可能になります。

Pub/Sub トピックのスループットの調整

NYC Taxi Tycoon の一般公開の Pub/Sub トピックでは、1 秒間に 2,000〜2,500 件の配車データが更新され、1 秒あたり最大で 8 MB のデータが生成されます。キュー内で未確認メッセージが増加していることを Pub/Sub が検知すると、Pub/Sub に組み込まれたフロー制御により、サブスクライバーのメッセージ レートが自動的に低下します。このため、ワークステーション、ネットワーク接続、フロントエンド処理コードによって、メッセージレートの変動が大きくなることがあります。

効率的なブラウザ メッセージの処理

WebSocket ストリーム経由で大量のメッセージを受信する場合は、このストリームを処理するフロントエンド コードを作成する必要があります。たとえば、メッセージごとに HTML 要素を動的に生成するようにします。ただし、想定されたメッセージ レートで各メッセージのページを更新すると、ブラウザ ウィンドウがロックされる可能性があります。また、HTML 要素を動的に生成すると、メモリの割り当てが頻繁に発生してガベージ コレクションの期間が長くなり、ユーザー エクスペリエンスが低下します。毎秒 2, 000 件届くメッセージごとに document.createElement() を呼び出す必要はありません。

このチュートリアルでは、このような大量のメッセージ ストリームを管理するため、次のことを行います。

  • 一連のストリーム指標をリアルタイムで計算し、継続的に更新する。これにより、モニタリングされたメッセージに関する大部分の情報を集計値として表示できます。
  • ブラウザベースのダッシュボードを使用して、事前に定義されたスケジュールで個々のメッセージのサンプルを視覚化し、降車イベントと乗車イベントのみをリアルタイムで表示する。

次の図に、このチュートリアルで作成したダッシュボードを示します。

このチュートリアルのコードでウェブページに表示するダッシュボード

この図は、最後のメッセージのレイテンシが 24 ミリ秒で、メッセージ レートが 1 秒あたり約 2,100 件であることを示しています。個々のメッセージを処理するクリティカル コードパスが時間内に完了しないと、最後のメッセージのレイテンシが増加するため、1 秒あたりのモニタリング メッセージの数が減少します。配車のサンプリングは 3 秒に 1 回のサイクルで実行される JavaScript setInterval API で行われます。これにより、フロントエンドで大量の DOM 要素が生成されなくなります(その大半は毎秒 10 を超えるレートでは確認できません)。

ダッシュボードは、ストリームの途中でイベントの処理を開始するため、進行中の配車は以前に表示されていない限りダッシュボードで新しいデータとして認識されます。このコードは、ride_id 値をインデックスとした連想配列で配車データを格納します。乗客が降車すると、その配車への参照を削除します。状態が enroute または pickup の配車は、その配車が確認済みでない限り、その配列への参照を追加します。

WebSocket サーバーをインストールして構成する

まず、WebSocket サーバーとして使用する Compute Engine インスタンスを作成します。インスタンスを作成したら、後で必要なツールをインストールします。

  1. Cloud Shell で、デフォルトの Compute Engine ゾーンを設定します。次の例では us-central1-a を設定していますが、任意のゾーンを使用できます。

    gcloud config set compute/zone us-central1-a
    
  2. デフォルト ゾーンに websocket-server という名前の Compute Engine インスタンスを作成します。

    gcloud compute instances create websocket-server --tags wss
    
  3. ファイアウォール ルールを追加し、ポート 8000wss タグ付きのインスタンスに TCP トラフィックを許可します。

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. 既存のプロジェクトを使用している場合は、インスタンスへの SSH 接続が許可されるように、TCP ポート 22 が開いていることを確認してください。

    デフォルトでは、default-allow-ssh ファイアウォール ルールはデフォルト ネットワークで有効になっています。ただし、自身または管理者が既存のプロジェクトでデフォルト ルールを削除した場合、TCP ポート 22 が開かない可能性があります(このチュートリアル用に新しいプロジェクトを作成した場合、このルールはデフォルトで有効になり、操作は不要です)。

    ファイアウォール ルールを追加し、ポート 22wss タグ付きのインスタンスに TCP トラフィックを許可します。

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. SSH を使用してインスタンスに接続します。

    gcloud compute ssh websocket-server
    
  6. インスタンスのターミナル コマンドで、アカウントを root に切り替えて、ソフトウェアをインストールできるようにします。

    sudo -s
    
  7. git ツールと unzip ツールをインストールします。

    apt-get install -y unzip git
    
  8. インスタンスに websocketd バイナリをインストールします。

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Node.js とチュートリアル コードをインストールする

  1. インスタンスのターミナルで Node.js をインストールします。

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. チュートリアルのソース リポジトリをダウンロードします。

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. pulltop の権限を変更して実行を許可します。

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. pulltop 依存関係をインストールします。

    cd pulltop
    npm install
    sudo npm link
    

pulltop がメッセージを読み取れることをテストする

  1. インスタンスで、公開トピックに対して pulltop を実行します。

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    pulltop が機能している場合は、次のような結果が表示されます。

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. [Ctrl+C] を押してストリームを停止します。

websocketd とのメッセージ フローを確立する

pulltop が Pub/Sub トピックを読み取れることを確認したら、websocketd プロセスを開始してブラウザへのメッセージ送信を開始できます。

トピック メッセージをローカル ファイルにキャプチャする

このチュートリアルでは、pulltop から取得したメッセージ ストリームをキャプチャし、ローカル ファイルに書き込みます。メッセージ トラフィックをローカル ファイルにキャプチャする場合、ストレージ要件が増えますが、Pub/Sub トピック メッセージのストリーミング と websocketd プロセスのオペレーションは分離されます。ローカルで情報をキャプチャする場合は、現在接続している WebSocket クライアントを強制的にリセットするのではなく、Pub/Sub ストリーミングを一時的に停止できます。メッセージ ストリームが再度確立されると、websocketd はクライアントへのメッセージ ストリーミングを自動的に再開します。

  1. インスタンスで、公開トピックに pulltop を実行し、メッセージの出力をローカルの taxi.json ファイルにリダイレクトします。ログアウトするかターミナルを閉じると、nohup コマンドは pulltop プロセスを続行するように OS に指示します。

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. JSON メッセージがファイルに書き込まれていることを確認します。

    tail /var/tmp/taxi.json
    

    メッセージが taxi.json ファイルに書き込まれている場合、出力は次のようになります。

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. アプリのウェブフォルダに移動します。

    cd ../web
    
  4. websocketd を起動して、WebSocket を使用してローカル ファイルのコンテンツ ストリーミングを開始します。

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    これにより、websocketd コマンドがバックグラウンドで実行されます。websocketd ツールは、tail コマンドの出力を使用し、WebSocket メッセージとして各要素をストリーミングします。

  5. nohup.out の内容を参照して、サーバーが正しく起動したことを確認します。

    tail nohup.out
    

    すべてが正常に機能している場合、出力は次のようになります。

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

メッセージの可視化

Pub/Sub トピックに公開された個々の配車メッセージは、次のような構造になっています。

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

これらの値に基づいて、ダッシュボードのヘッダーの指標を計算します。この計算は、インバウンドの配車イベントごとに 1 回実行されます。値は次のとおりです。

  • 最後のメッセージのレイテンシ。最後に確認された配車イベントのタイムスタンプから現在の時刻(ウェブブラウザをホストしているシステムの時刻)までの秒数。
  • アクティブな配車。現在進行中の配車です。この値は急増する場合があります。また、ride_status の値が dropoff になると減少します。
  • メッセージ レート。1 秒あたりに処理された配車イベントの平均数。
  • メーター料金の合計額。アクティブな配車のメーター料金の合計額。この値は、乗客が降車すると減少します。
  • 乗客の総数。乗客の数。この数は、乗車が完了すると減少します。
  • 乗車 1 台あたりの平均乗車人数。乗客数の合計数を配車数の合計数で割った値。
  • 乗客 1 人あたりの平均メーター料金。メーター料金の合計金額を乗客の総数で割った値。

乗客が乗車または降車すると、指標と個々の配車サンプルのほかに、配車サンプルのグリッドの上にアラート通知が表示されます。

  1. 現在のインスタンスの外部 IP アドレスを取得します。

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. その IP アドレスをコピーします。

  3. ローカルマシンで新しいウェブブラウザを開き、URL を入力します。

    http://$ip-address:8000

    このチュートリアルのダッシュボードにページが表示されます。

    このチュートリアルのコードで作成したダッシュボード。招待メッセージが表示され、データが表示されます。

  4. 上部にあるタクシーのアイコンをクリックしてストリームへの接続を開き、メッセージの処理を開始します。

    個々の配車が視覚化され、9 つの配車サンプルが 3 秒ごとにレンダリングされます。

    アクティブな配車を表示するダッシュボード。

    タクシーのアイコンをクリックすると、WebSocket ストリームを開始または停止できます。WebSocket 接続が切断されると、アイコンが赤に変わり、指標と個々の配車情報の更新が停止します。再接続するには、タクシーのアイコンをもう一度クリックします。

パフォーマンス

次のスクリーンショットは、Chrome デベロッパー ツールのパフォーマンス モニターです。ブラウザのタブが 1 秒あたり約 2,100 件のメッセージを処理していることを表しています。

ブラウザのパフォーマンス モニタリング ペイン。1 秒あたりの CPU 使用率、ヒープサイズ、DOM ノード、スタイルの再計算が表示されています。値は比較的一定になっています。

約 30 ミリ秒のレイテンシでメッセージがディスパッチされると、CPU 使用率は平均で約 80% になります。メモリ使用率は最小の 29 MB です。合計で 57 MB が割り当てられていますが、増減は自由に行われます。

クリーンアップ

ファイアウォール ルールを削除する

このチュートリアルで既存のプロジェクトを使用した場合、作成したファイアウォール ルールを削除できます。開いているポートを最小限に抑えることをおすすめします。

  1. ポート 8000 で TCP を許可するように作成したファイアウォール ルールを削除します。

    gcloud compute firewall-rules delete websocket
    
  2. SSH 接続を許可するファイアウォール ルールも作成した場合は、ポート 22 で TCP を許可するファイアウォール ルールを削除します。

    gcloud compute firewall-rules delete wss-ssh
    

プロジェクトの削除

このプロジェクトを再度使用したくない場合は、プロジェクトを削除できます。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ