通过 WebSocket 流式传输 Pub/Sub 消息


本教程介绍在您使用 Google Cloud 时,前端应用(在本例中为网页)如何处理大量传入数据。本教程介绍了大量数据流带来的一些挑战。本教程提供了一个示例应用,用于演示如何使用 WebSockets 直观呈现发布到 Pub/Sub 主题的密集消息流,并及时处理这些消息,保持前端高性能。

本教程适用于具有以下特征的开发者:熟悉通过 HTTP 进行的浏览器到服务器通信并且熟悉如何使用 HTML、CSS 和 JavaScript 编写前端应用。本教程假定您具有一定的 Google Cloud 使用经验,并且熟悉 Linux 命令行工具。

目标

  • 使用必要组件创建并配置虚拟机实例,以将 Pub/Sub 订阅的载荷流式传输到浏览器客户端。
  • 在虚拟机上配置一个进程以订阅 Pub/Sub 主题,并将各条消息输出到日志。
  • 安装网络服务器以传送静态内容并将 shell 命令输出流式传输到 WebSocket 客户端。
  • 使用 HTML、CSS 和 JavaScript 在浏览器中直观呈现 WebSocket 流聚合和单个消息示例。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 打开 Cloud Shell 以执行本教程中列出的命令。

    转到 Cloud Shell

    请通过 Cloud Shell 运行本教程中的所有终端命令。

  7. 启用 Compute Engine API 和 Pub/Sub API:
    gcloud services enable compute pubsub

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

简介

随着越来越多的应用采用事件驱动型模型,前端应用应该能够与构成这些架构的基础的消息传递服务建立简单、低摩擦的连接,这一点非常重要。

您可以通过多种方式将数据流式传输到网络浏览器客户端;其中最常见的方式是使用 WebSocket。本教程将指导您安装一个进程,该进程订阅发布到 Pub/Sub 主题的消息流,并通过网络服务器将这些消息路由到通过 WebSocket 连接的客户端。

在本教程中,您将使用 NYC Taxi Tycoon Google Dataflow CodeLab 中使用的公开提供的 Pub/Sub 主题。 本主题为您提供了模拟出租车遥测数据实时流,该数据流基于从纽约市的出租车和豪华轿车委员会的行程记录数据集中提取的历史行程数据。

架构

下图显示了您在本教程中构建的教程架构。

教程架构

该图显示了包含 Compute Engine 资源的项目外部的消息发布者;该发布者将消息发送到 Pub/Sub 主题。Compute Engine 实例通过 WebSocket 将消息提供给运行基于 HTML5 和 JavaScript 的信息中心的浏览器。

本教程使用了一系列工具来连接 Pub/Sub 和 Websocket:

  • pulltop 是您在本教程中安装的 Node.js 程序。 该工具订阅 Pub/Sub 主题,并将接收到的消息流式传输到标准输出。
  • websocketd 是一个小型命令行工具,用于封装现有命令行界面程序并允许使用 WebSocket 访问该程序。

通过结合使用 pulltopwebsocketd,您可以使用 WebSocket 将从 Pub/Sub 主题接收的消息流式传输到浏览器。

调整 Pub/Sub 主题吞吐量

NYC Taxi Tycoon 公共 Pub/Sub 主题每秒可生成 2000 至 2500 个模拟的出租车行程更新 - 每秒高达 8 Mb 或更多。如果 Pub/Sub 检测到越来越大的未确认消息队列,则 Pub/Sub 中的内置流控制会自动减慢订阅者的消息速率。因此,您可能会看到跨不同工作站、网络连接和前端处理代码的较高的消息速率变化。

有效的浏览器消息处理

鉴于通过 WebSocket 流的消息量非常大,因此您在编写处理此流的前端代码时需要深思熟虑。 例如,您可以为每条消息动态创建 HTML 元素。但按照预期的消息速率,为每条消息更新页面可能会锁定浏览器窗口。动态创建 HTML 元素导致的频繁内存分配还会延长垃圾回收的持续时间,从而影响用户体验。简而言之,不应该对每秒接收到的大约 2000 条消息都调用 document.createElement()

为管理这种密集消息流,本教程采用了如下方法:

  • 实时计算并持续更新一组数据流指标,将有关所观察到的消息的大部分信息显示为聚合值。
  • 使用基于浏览器的信息中心,按预定义的时间表直观呈现一小部分单独的消息,仅实时显示下客和上客事件。

下图显示了在本教程中创建的信息中心。

通过本教程中的代码在网页上创建的信息中心

该图指示在速率为每秒约 2100 条消息的情况下,最后一条消息的延迟时间为 24 毫秒。如果处理每条消息的关键代码路径未及时完成,则每秒观察到的消息数会随着最后一条消息延迟时间的增加而减少。 行程采样使用设置为每三秒钟循环一次的 JavaScript setInterval API 完成,这样可以防止前端在其生命周期内创建大量的 DOM 元素。(在速率超过每秒 10 条的情况下,实际上无法观察到绝大多数的这类消息。)

信息中心在消息流中间开始处理事件,因此信息中心会将进行中的行程识别为新行程(除非之前已见过这些行程)。代码使用关联数组来存储每个观察到的行程(按 ride_id 值编制索引),并移除对乘客下车后的特定行程的引用。“在途中”或“上客”状态的行程会添加对该数组的引用,除非之前已观察到此行程(这是针对“在途中”的情况)。

安装并配置 WebSocket 服务器

首先,您需要创建将用作 WebSocket 服务器的 Compute Engine 实例。创建实例后,您需要在该实例上安装稍后需要使用的工具。

  1. 在 Cloud Shell 中,设置默认 Compute Engine 区域。 以下示例显示的是 us-central1-a,但您可以根据需要使用任何区域。

    gcloud config set compute/zone us-central1-a
    
  2. 在默认区域中创建名为 websocket-server 的 Compute Engine 实例:

    gcloud compute instances create websocket-server --tags wss
    
  3. 添加一条防火墙规则,该规则允许端口 8000 上的 TCP 流量进入标记为 wss 的任何实例:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. 如果您使用的是现有项目,请确保 TCP 端口 22 已打开,以允许通过 SSH 连接到实例。

    默认情况下,default-allow-ssh 防火墙规则已在默认网络中启用。但是,如果您或您的管理员移除了现有项目中的默认规则,则 TCP 端口 22 可能无法打开。(如果您为本教程创建了一个新项目,则该规则默认处于启用状态,您无需执行任何操作。)

    添加一条防火墙规则,该规则允许端口 22 上的 TCP 流量进入标记为 wss 的任何实例:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. 使用 SSH 连接到该实例。

    gcloud compute ssh websocket-server
    
  6. 在实例的终端命令中,将账号切换到 root,以便您可以安装软件:

    sudo -s
    
  7. 安装 gitunzip 工具:

    apt-get install -y unzip git
    
  8. 在实例上安装 websocketd 二进制文件:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

安装 Node.js 和教程代码

  1. 在实例的终端上,安装 Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. 下载教程源代码库:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. 更改对 pulltop 的权限以允许执行:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. 安装 pulltop 依赖项:

    cd pulltop
    npm install
    sudo npm link
    

测试 pulltop 是否可以读取消息

  1. 在实例上,针对公共主题运行 pulltop

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    如果 pulltop 正常运行,您将看到如下所示的结果流:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Ctrl+C 即可停止该流。

建立到 websocketd 的消息流

现在,确定了 pulltop 可以读取 Pub/Sub 主题后,您可以启动 websocketd 进程以开始将消息发送到浏览器。

将主题消息捕获到本地文件

在本教程中,您将捕获从 pulltop 获取的消息流,并将其写入到本地文件。将消息流量捕获到本地文件会增加存储需求,但同时也会将 websocketd 进程的操作与流式 Pub/Sub 主题消息分离。在本地捕获信息支持以下场景:需要暂时停止 Pub/Sub 流式传输(目的可能是调整流控制参数),但不强制重置当前连接的 WebSocket 客户端。重新建立消息流后,websocketd 会自动继续将消息流式传输到客户端。

  1. 在实例上,针对公共主题运行 pulltop,并将消息输出重定向到本地 taxi.json 文件。如果您退出或关闭终端,nohup 命令将指示操作系统保持 pulltop 进程处于运行状态。

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. 验证是否正在将 JSON 消息写入文件:

    tail /var/tmp/taxi.json
    

    如果正在将这些消息写入 taxi.json 文件,则输出类似于以下内容:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. 更改为您的应用的网络文件夹:

    cd ../web
    
  4. 启动 websocketd 以开始使用 WebSocket 流式传输本地文件的内容:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    这将在后台运行 websocketd 命令。websocketd 工具使用 tail 命令的输出,并将每个元素作为 WebSocket 消息进行流式传输。

  5. 检查 nohup.out 的内容以验证服务器是否已正确启动:

    tail nohup.out
    

    如果一切正常,则输出类似于以下内容:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

直观呈现消息

发布到 Pub/Sub 主题的各条行程消息的结构如下:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

根据这些值,您可以为信息中心的标头计算多个指标。 系统会根据每个入站行程事件分别执行一次计算。这些值包括以下内容:

  • 上次消息延迟时间。上次观察到的行程的事件时间戳的时间戳与当前时间(源自托管网络浏览器的系统上的时钟)之间的秒数。
  • 活动行程数。当前正在进行的行程数。此数值可能会快速增加,在观察到 ride_status 的值为 dropoff 时则会减小。
  • 消息速率。每秒处理的行程事件平均数。
  • 总计量数。所有活动行程的计量总和。 此数值会随着行程的下客而减小。
  • 乘客总数。所有行程的乘客数。此数值会随着行程的完成而减小。
  • 每个行程的平均乘客数。行程总数除以乘客总数。
  • 每位乘客的平均计量数。总计量数除以乘客总数。

除了指标和各行程示例外,信息中心还会在上客或下客时,在行程示例网格上方显示提醒通知。

  1. 获取当前实例的外部 IP 地址:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. 复制该 IP 地址。

  3. 在本地计算机上,打开一个新的网络浏览器并输入网址:

    http://$ip-address:8000.

    您将看到一个页面,其中显示了本教程的信息中心:

    通过本教程中的代码创建的信息中心,包含在显示任何数据之前显示的欢迎辞。

  4. 点击顶部的出租车图标,即可打开与消息流的连接并开始处理消息。

    各行程会通过每三秒钟呈现的九个活动行程示例实现可视化:

    显示活动行程的信息中心。

    您可以随时点击出租车图标以开始或停止 WebSocket 消息流。如果 WebSocket 连接中断,该图标将变为红色,并且系统将停止更新指标和各行程。如需重新连接,请再次点击出租车图标。

性能

以下屏幕截图显示了 Chrome 开发者工具的性能监控,同时浏览器标签页每秒处理约 2100 条消息。

浏览器性能监控窗格显示每秒的 CPU 使用率、堆大小、DOM 节点数和样式重新计算次数。值相对平缓。

由于消息调度的延迟时间约为 30 毫秒,因此平均 CPU 利用率约为 80%。显示的内存利用率最小值为 29 MB,总共分配了 57 MB,并且可以自由增加和减少。

清理

移除防火墙规则

如果您将现有项目用于本教程,则可以移除您创建的防火墙规则。最好尽量减少开放端口。

  1. 删除您为允许端口 8000 上的 TCP 创建的防火墙规则:

    gcloud compute firewall-rules delete websocket
    
  2. 如果您还创建了防火墙规则以允许通过 SSH 连接,请删除防火墙规则以允许 TCP 在端口 22 上运行:

    gcloud compute firewall-rules delete wss-ssh
    

删除项目

如果您不想再次使用此项目,可将其删除。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤