WebSocket での Pub/Sub メッセージのストリーミング


このチュートリアルでは、Google Cloud の使用時にフロントエンド アプリ(この場合はウェブページ)で大量の受信データを処理する方法について説明します。また、大量のメッセージ ストリームを処理する際の課題についても説明します。このチュートリアルのサンプルアプリでは、WebSocket を使用して Pub/Sub トピックに公開された大量のメッセージ ストリームを可視化し、フロントエンドの効率性を維持しながら、メッセージをタイムリーに処理します。

このチュートリアルは、HTTP を介したブラウザ間通信と、HTML、CSS、JavaScript を使用したフロントエンド アプリの作成に精通しているデベロッパーを対象としています。また、Google Cloud の使用経験があり、Linux コマンドライン ツールに精通していることを前提としています。

目標

  • 仮想マシン(VM)インスタンスを作成し、Pub/Sub サブスクリプションのペイロードをブラウザ クライアントにストリーミングするためのコンポーネントを構成します。
  • Pub/Sub トピックをサブスクライブし、個々のメッセージをログに出力するように VM のプロセスを構成します。
  • ウェブサーバーをインストールして静的コンテンツを提供し、シェルコマンド出力を WebSocket クライアントにストリーミングします。
  • HTML、CSS、JavaScript を使用して、WebSocket ストリームの集計と個々のメッセージ サンプルをブラウザで可視化します。

料金

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. このチュートリアルに記載されているコマンドを実行するために、Cloud Shell を開きます。

    Cloud Shell に移動

    このチュートリアルでは、Cloud Shell からすべてのターミナル コマンドを実行します。

  7. Compute Engine API と Pub/Sub API を有効にします。
    gcloud services enable compute pubsub

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

はじめに

イベント ドリブン モデルを採用するアプリが増えています。フロントエンド アプリでは、これらのアーキテクチャの基盤となるメッセージング サービスに簡単に接続できることが重要になります。

ウェブブラウザ クライアントにデータをストリーミングする方法はいくつかありますが、よく使われているのが WebSocket です。このチュートリアルでは、Pub/Sub トピックに公開されるメッセージ ストリームをサブスクライブするプロセスをインストールし、WebSocket 経由で接続しているクライアントにウェブサーバーからメッセージを転送します。

このチュートリアルでは、NYC Taxi Tycoon Google Dataflow CodeLab で使用されている一般公開の Pub/Sub トピックを使用します。このトピックは、Taxi & Limousine Commission の移動記録データセットからニューヨーク市の配車履歴データを取得して利用状況のシミュレーションを行い、その結果をリアルタイムで提供します。

アーキテクチャ

次の図に、このチュートリアルで構築するアーキテクチャを示します。

チュートリアルのアーキテクチャ

この図では、メッセージ パブリッシャーが Compute Engine リソースを含むプロジェクトの外部にあり、このパブリッシャーが Pub/Sub トピックにメッセージを送信します。メッセージは Compute Engine インスタンスから WebSocket 経由でブラウザに送信され、HTML5 と JavaScript で作成されたダッシュボードに表示されます。

このチュートリアルでは、Pub/Sub と WebSocket を連携させるため、次のツールを使用しています。

  • pulltop は、このチュートリアルでインストールする Node.js プログラムです。このツールは Pub/Sub トピックをサブスクライブし、受信したメッセージを標準出力にストリーミングします。
  • websocketd は、既存のコマンドライン インターフェース プログラムをラップし、WebSocket 経由でアクセスできるようにする小さなコマンドライン ツールです。

pulltopwebsocketd を組み合わせることで、Pub/Sub トピックから受信したメッセージを WebSocket 経由でブラウザにストリーミングすることが可能になります。

Pub/Sub トピックのスループットの調整

NYC Taxi Tycoon の一般公開の Pub/Sub トピックでは、1 秒間に 2,000〜2,500 件の配車データが更新され、1 秒あたり最大で 8 MB のデータが生成されます。キュー内で未確認メッセージが増加していることを Pub/Sub が検知すると、Pub/Sub に組み込まれたフロー制御により、サブスクライバーのメッセージ レートが自動的に低下します。このため、ワークステーション、ネットワーク接続、フロントエンド処理コードによって、メッセージレートの変動が大きくなることがあります。

効率的なブラウザ メッセージの処理

WebSocket ストリーム経由で大量のメッセージを受信する場合は、このストリームを処理するフロントエンド コードを作成する必要があります。たとえば、メッセージごとに HTML 要素を動的に生成するようにします。ただし、想定されたメッセージ レートで各メッセージのページを更新すると、ブラウザ ウィンドウがロックされる可能性があります。また、HTML 要素を動的に生成すると、メモリの割り当てが頻繁に発生してガベージ コレクションの期間が長くなり、ユーザー エクスペリエンスが低下します。毎秒 2, 000 件届くメッセージごとに document.createElement() を呼び出す必要はありません。

このチュートリアルでは、このような大量のメッセージ ストリームを管理するため、次のことを行います。

  • 一連のストリーム指標をリアルタイムで計算し、継続的に更新する。これにより、モニタリングされたメッセージに関する大部分の情報を集計値として表示できます。
  • ブラウザベースのダッシュボードを使用して、事前に定義されたスケジュールで個々のメッセージのサンプルを視覚化し、降車イベントと乗車イベントのみをリアルタイムで表示する。

次の図に、このチュートリアルで作成したダッシュボードを示します。

このチュートリアルのコードでウェブページに表示するダッシュボード

この図は、最後のメッセージのレイテンシが 24 ミリ秒で、メッセージ レートが 1 秒あたり約 2,100 件であることを示しています。個々のメッセージを処理するクリティカル コードパスが時間内に完了しないと、最後のメッセージのレイテンシが増加するため、1 秒あたりのモニタリング メッセージの数が減少します。配車のサンプリングは 3 秒に 1 回のサイクルで実行される JavaScript setInterval API で行われます。これにより、フロントエンドで大量の DOM 要素が生成されなくなります(その大半は毎秒 10 を超えるレートでは確認できません)。

ダッシュボードは、ストリームの途中でイベントの処理を開始するため、進行中の配車は以前に表示されていない限りダッシュボードで新しいデータとして認識されます。このコードは、ride_id 値をインデックスとした連想配列で配車データを格納します。乗客が降車すると、その配車への参照を削除します。状態が enroute または pickup の配車は、その配車が確認済みでない限り、その配列への参照を追加します。

WebSocket サーバーをインストールして構成する

まず、WebSocket サーバーとして使用する Compute Engine インスタンスを作成します。インスタンスを作成したら、後で必要なツールをインストールします。

  1. Cloud Shell で、デフォルトの Compute Engine ゾーンを設定します。次の例では us-central1-a を設定していますが、任意のゾーンを使用できます。

    gcloud config set compute/zone us-central1-a
    
  2. デフォルト ゾーンに websocket-server という名前の Compute Engine インスタンスを作成します。

    gcloud compute instances create websocket-server --tags wss
    
  3. ファイアウォール ルールを追加し、ポート 8000wss タグ付きのインスタンスに TCP トラフィックを許可します。

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. 既存のプロジェクトを使用している場合は、インスタンスへの SSH 接続が許可されるように、TCP ポート 22 が開いていることを確認してください。

    デフォルトでは、default-allow-ssh ファイアウォール ルールはデフォルト ネットワークで有効になっています。ただし、自身または管理者が既存のプロジェクトでデフォルト ルールを削除した場合、TCP ポート 22 が開かない可能性があります(このチュートリアル用に新しいプロジェクトを作成した場合、このルールはデフォルトで有効になり、操作は不要です)。

    ファイアウォール ルールを追加し、ポート 22wss タグ付きのインスタンスに TCP トラフィックを許可します。

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. SSH を使用してインスタンスに接続します。

    gcloud compute ssh websocket-server
    
  6. インスタンスのターミナル コマンドで、アカウントを root に切り替えて、ソフトウェアをインストールできるようにします。

    sudo -s
    
  7. git ツールと unzip ツールをインストールします。

    apt-get install -y unzip git
    
  8. インスタンスに websocketd バイナリをインストールします。

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Node.js とチュートリアル コードをインストールする

  1. インスタンスのターミナルで Node.js をインストールします。

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. チュートリアルのソース リポジトリをダウンロードします。

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. pulltop の権限を変更して実行を許可します。

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. pulltop 依存関係をインストールします。

    cd pulltop
    npm install
    sudo npm link
    

pulltop がメッセージを読み取れることをテストする

  1. インスタンスで、公開トピックに対して pulltop を実行します。

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    pulltop が機能している場合は、次のような結果が表示されます。

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. [Ctrl+C] を押してストリームを停止します。

websocketd とのメッセージ フローを確立する

pulltop が Pub/Sub トピックを読み取れることを確認したら、websocketd プロセスを開始してブラウザへのメッセージ送信を開始できます。

トピック メッセージをローカル ファイルにキャプチャする

このチュートリアルでは、pulltop から取得したメッセージ ストリームをキャプチャし、ローカル ファイルに書き込みます。メッセージ トラフィックをローカル ファイルにキャプチャする場合、ストレージ要件が増えますが、Pub/Sub トピック メッセージのストリーミング と websocketd プロセスのオペレーションは分離されます。ローカルで情報をキャプチャする場合は、現在接続している WebSocket クライアントを強制的にリセットするのではなく、Pub/Sub ストリーミングを一時的に停止できます。メッセージ ストリームが再度確立されると、websocketd はクライアントへのメッセージ ストリーミングを自動的に再開します。

  1. インスタンスで、公開トピックに pulltop を実行し、メッセージの出力をローカルの taxi.json ファイルにリダイレクトします。ログアウトするかターミナルを閉じると、nohup コマンドは pulltop プロセスを続行するように OS に指示します。

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. JSON メッセージがファイルに書き込まれていることを確認します。

    tail /var/tmp/taxi.json
    

    メッセージが taxi.json ファイルに書き込まれている場合、出力は次のようになります。

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. アプリのウェブフォルダに移動します。

    cd ../web
    
  4. websocketd を起動して、WebSocket を使用してローカル ファイルのコンテンツ ストリーミングを開始します。

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    これにより、websocketd コマンドがバックグラウンドで実行されます。websocketd ツールは、tail コマンドの出力を使用し、WebSocket メッセージとして各要素をストリーミングします。

  5. nohup.out の内容を参照して、サーバーが正しく起動したことを確認します。

    tail nohup.out
    

    すべてが正常に機能している場合、出力は次のようになります。

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

メッセージの可視化

Pub/Sub トピックに公開された個々の配車メッセージは、次のような構造になっています。

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

これらの値に基づいて、ダッシュボードのヘッダーの指標を計算します。この計算は、インバウンドの配車イベントごとに 1 回実行されます。値は次のとおりです。

  • 最後のメッセージのレイテンシ。最後に確認された配車イベントのタイムスタンプから現在の時刻(ウェブブラウザをホストしているシステムの時刻)までの秒数。
  • アクティブな配車。現在進行中の配車です。この値は急増する場合があります。また、ride_status の値が dropoff になると減少します。
  • メッセージ レート。1 秒あたりに処理された配車イベントの平均数。
  • メーター料金の合計額。アクティブな配車のメーター料金の合計額。この値は、乗客が降車すると減少します。
  • 乗客の総数。乗客の数。この数は、乗車が完了すると減少します。
  • 乗車 1 台あたりの平均乗車人数。乗客数の合計数を配車数の合計数で割った値。
  • 乗客 1 人あたりの平均メーター料金。メーター料金の合計金額を乗客の総数で割った値。

乗客が乗車または降車すると、指標と個々の配車サンプルのほかに、配車サンプルのグリッドの上にアラート通知が表示されます。

  1. 現在のインスタンスの外部 IP アドレスを取得します。

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. その IP アドレスをコピーします。

  3. ローカルマシンで新しいウェブブラウザを開き、URL を入力します。

    http://$ip-address:8000

    このチュートリアルのダッシュボードにページが表示されます。

    このチュートリアルのコードで作成したダッシュボード。招待メッセージが表示され、データが表示されます。

  4. 上部にあるタクシーのアイコンをクリックしてストリームへの接続を開き、メッセージの処理を開始します。

    個々の配車が視覚化され、9 つの配車サンプルが 3 秒ごとにレンダリングされます。

    アクティブな配車を表示するダッシュボード。

    タクシーのアイコンをクリックすると、WebSocket ストリームを開始または停止できます。WebSocket 接続が切断されると、アイコンが赤に変わり、指標と個々の配車情報の更新が停止します。再接続するには、タクシーのアイコンをもう一度クリックします。

パフォーマンス

次のスクリーンショットは、Chrome デベロッパー ツールのパフォーマンス モニターです。ブラウザのタブが 1 秒あたり約 2,100 件のメッセージを処理していることを表しています。

ブラウザのパフォーマンス モニタリング ペイン。1 秒あたりの CPU 使用率、ヒープサイズ、DOM ノード、スタイルの再計算が表示されています。値は比較的一定になっています。

約 30 ミリ秒のレイテンシでメッセージがディスパッチされると、CPU 使用率は平均で約 80% になります。メモリ使用率は最小の 29 MB です。合計で 57 MB が割り当てられていますが、増減は自由に行われます。

クリーンアップ

ファイアウォール ルールを削除する

このチュートリアルで既存のプロジェクトを使用した場合、作成したファイアウォール ルールを削除できます。開いているポートを最小限に抑えることをおすすめします。

  1. ポート 8000 で TCP を許可するように作成したファイアウォール ルールを削除します。

    gcloud compute firewall-rules delete websocket
    
  2. SSH 接続を許可するファイアウォール ルールも作成した場合は、ポート 22 で TCP を許可するファイアウォール ルールを削除します。

    gcloud compute firewall-rules delete wss-ssh
    

プロジェクトの削除

このプロジェクトを再度使用したくない場合は、プロジェクトを削除できます。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ