Transmite mensajes de Pub/Sub a través de WebSockets


En este instructivo, se ilustra una manera para que una app de frontend (en este caso, una página web) maneje los grandes volúmenes de datos entrantes cuando usas Google Cloud. En este instructivo, se describen algunos de los desafíos de las transmisiones de gran volumen. Junto con este instructivo, se proporciona una app de ejemplo con la que se ilustra cómo usar WebSockets para visualizar un flujo denso de mensajes publicados en un tema de Pub/Sub y procesarlos a tiempo a fin de mantener un frontend de rendimiento.

Este instructivo es para desarrolladores familiarizados con la comunicación de navegador a servidor mediante HTTP y con la escritura de apps de frontend mediante HTML, CSS y JavaScript. En el instructivo, se supone que tienes experiencia con Google Cloud y que estás familiarizado con las herramientas de línea de comandos de Linux.

Objetivos

  • Crear y configurar una instancia de máquina virtual (VM) con los componentes necesarios para transmitir las cargas útiles de una suscripción a Pub/Sub a los clientes del navegador
  • Configurar un proceso en la VM para suscribirse a un tema de Pub/Sub y enviar los mensajes individuales a un registro
  • Instalar un servidor web para entregar contenido estático y transmitir el resultado del comando de shell a los clientes de WebSocket
  • Visualizar las agregaciones de transmisión de WebSocket y las muestras de mensajes individuales en un navegador mediante HTML, CSS y JavaScript

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  4. En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Abre Cloud Shell para ejecutar los comandos detallados en este instructivo.

    IR A Cloud Shell

    Ejecuta todos los comandos de la terminal que se incluyen en este instructivo desde Cloud Shell.

  7. Habilita la API de Compute Engine y la API de Pub/Sub:
    gcloud services enable compute pubsub

Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Consulta Limpieza para obtener más información.

Introducción

A medida que más apps adoptan modelos controlados por eventos, es importante que las apps de frontend puedan establecer conexiones simples y con pocas restricciones con los servicios de mensajería que forman la base de estas arquitecturas.

Existen varias opciones para transmitir datos a los clientes del navegador web; la más común es WebSockets. En este instructivo, se instala un proceso que se suscribe a una transmisión de mensajes publicados en un tema de Pub/Sub; estos mensajes se enrutan a través del servidor web hacia los clientes conectados en WebSockets.

Para este instructivo, trabajas con el tema de Pub/Sub disponible al público que se usa en NYC Taxi Tycoon Google Dataflow CodeLab (CodeLab de Google Dataflow del gigante de taxis de la ciudad de Nueva York). En este tema, se ofrece una transmisión de telemetría de taxis simulada en tiempo real basada en datos históricos de viajes tomados en la ciudad de Nueva York a partir de los conjuntos de datos de los registros de viajes de la Comisión de Taxis y Limusinas de la Ciudad de Nueva York.

Arquitectura

En el siguiente diagrama, se muestra la arquitectura del instructivo que compilarás en este instructivo.

Arquitectura del instructivo

En el diagrama, se muestra un publicador de mensajes que está fuera del proyecto que contiene el recurso de Compute Engine; el publicador envía mensajes a un tema de Pub/Sub. La instancia de Compute Engine hace que los mensajes estén disponibles a través de WebSockets para un navegador que ejecuta un panel basado en HTML5 y JavaScript.

En este instructivo, se usa una combinación de herramientas como puente entre Pub/Sub y WebSockets:

  • pulltop es un programa de Node.js que instalas como parte de este instructivo. La herramienta se suscribe a un tema de Pub/Sub y transmite los mensajes recibidos al resultado estándar.
  • websocketd es una herramienta de línea de comandos pequeña que se une a un programa de interfaz de línea de comandos existente y permite que se acceda a él mediante un WebSocket.

Si combinas pulltop y websocketd, puedes hacer que los mensajes que se reciben del tema de Pub/Sub se transmitan a un navegador mediante WebSockets.

Ajusta la capacidad de procesamiento del tema de Pub/Sub

El tema de Pub/Sub público del gigante de taxis de la ciudad de Nueva York genera entre 2,000 y 2,500 actualizaciones de viajes en taxi simulados por segundo, hasta 8 Mb o más por segundo. El control de flujo integrado en Pub/Sub ralentiza la frecuencia de mensajes de un suscriptor automáticamente si Pub/Sub detecta una cola creciente de mensajes no confirmados. Por lo tanto, es posible que veas una alta variabilidad de la frecuencia de mensajes en las diferentes estaciones de trabajo, las conexiones de red y el código de procesamiento de frontend.

Procesamiento eficaz de mensajes del navegador

Dado el gran volumen de mensajes que llegan a través de la transmisión de WebSocket, debes tener cuidado cuando escribes el código de frontend que procesa esta transmisión. Por ejemplo, puedes crear elementos HTML de forma dinámica para cada mensaje. Sin embargo, con la frecuencia de mensajes esperada, actualizar la página por cada mensaje podría bloquear la ventana del navegador. Las asignaciones de memoria frecuentes que resultan de la creación dinámica de elementos HTML también extienden la duración de la recolección de elementos no usados, lo que degrada la experiencia del usuario. En resumen, no es conveniente que llames a document.createElement() para cada uno de los 2,000 mensajes aproximados que llegan por segundo.

El enfoque que se sigue en este instructivo para administrar este flujo denso de mensajes es el siguiente:

  • Calcula y actualiza de forma continua un conjunto de métricas de transmisiones en tiempo real, que muestre la mayor parte de la información sobre los mensajes observados como valores agregados.
  • Usa un panel basado en el navegador para visualizar una muestra pequeña de mensajes individuales en un horario predefinido, que muestra solo los eventos de destino y partida en tiempo real.

En la siguiente figura, se muestra el panel que se creó como parte de este instructivo.

Panel creado en la página web mediante el código de este instructivo

En la figura, se muestra una latencia del último mensaje de 24 milisegundos a una frecuencia de casi 2,100 mensajes por segundo. Si las rutas del código fundamentales para procesar cada mensaje individual no se completan a tiempo, la cantidad de mensajes observados por segundo disminuye a medida que aumenta la latencia del último mensaje. El muestreo de viajes se realiza mediante la API de JavaScript setInterval, configurada para completar un ciclo cada tres segundos; esto impide que el frontend cree una cantidad enorme de elementos DOM a lo largo de su vida útil. (La gran mayoría de ellos casi no se puede observar a frecuencias superiores a 10 por segundo).

El panel comienza a procesar eventos en el medio de la transmisión, por lo que a los viajes en curso los reconoce como nuevos, a menos que se los haya visto antes. El código usa un arreglo asociativo para almacenar cada viaje observado, indexado por el valor ride_id, y quita la referencia a un viaje en particular cuando el pasajero llegó al destino. Los viajes en estado “en camino” o “partida” agregan una referencia a ese arreglo, a menos que, en el caso de “en camino”, se haya observado antes.

Instala y configura el servidor WebSocket

Para comenzar, crea una instancia de Compute Engine que usarás como servidor WebSocket. Después de crear la instancia, debes instalar las herramientas que necesitarás más adelante.

  1. En Cloud Shell, configura la zona predeterminada de Compute Engine. En el siguiente ejemplo, se muestra us-central1-a, pero puedes usar cualquier zona que desees.

    gcloud config set compute/zone us-central1-a
    
  2. Crea una instancia de Compute Engine llamada websocket-server en la zona predeterminada:

    gcloud compute instances create websocket-server --tags wss
    
  3. Agrega una regla de firewall que permita el tráfico TCP en el puerto 8000 a cualquier instancia etiquetada como wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Si usas un proyecto existente, asegúrate de que el puerto TCP 22 esté abierto para permitir la conectividad SSH a la instancia.

    De forma predeterminada, la regla de firewall default-allow-ssh está habilitada en la red predeterminada. Sin embargo, si tú o tu administrador quitaron la regla predeterminada de un proyecto existente, es posible que el puerto TCP 22 no esté abierto. (Si creaste un proyecto nuevo para este instructivo, la regla está habilitada de forma predeterminada y no tienes que hacer nada).

    Agrega una regla de firewall que permita el tráfico de TCP en el puerto 22 a cualquier instancia etiquetada como wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Conéctate a la instancia mediante SSH:

    gcloud compute ssh websocket-server
    
  6. En el comando de la terminal de la instancia, cambia de cuenta a root para poder instalar el software:

    sudo -s
    
  7. Instala las herramientas git y unzip:

    apt-get install -y unzip git
    
  8. Instala el objeto binario websocketd en la instancia:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Instala Node.js y el código del instructivo

  1. En una terminal en la instancia, instala Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Descarga el repositorio de código fuente del instructivo:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Cambia los permisos en pulltop para permitir la ejecución:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instala las dependencias pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Prueba si pulltop puede leer mensajes

  1. En la instancia, ejecuta pulltop en el tema público:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Si pulltop funciona, verás una transmisión de resultados como los siguientes:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Presiona Ctrl+C para detener la transmisión.

Establece el flujo de mensajes como websocketd

Ahora que ya estableciste que pulltop pueda leer el tema de Pub/Sub, puedes iniciar el proceso websocketd para comenzar a enviar mensajes al navegador.

Captura mensajes de temas en un archivo local

En este instructivo, capturas la transmisión de mensajes que recibes de pulltop y la escribes en un archivo local. Capturar el tráfico de mensajes en un archivo local agrega un requisito de almacenamiento, pero también separa la operación del proceso websocketd de los mensajes de transmisión del tema de Pub/Sub. Capturar la información de forma local permite situaciones en las que tal vez quieras interrumpir de manera temporal la transmisión de Pub/Sub (tal vez para ajustar los parámetros del control de flujo), pero no forzar el restablecimiento de los clientes de WebSocket conectados en este momento. Cuando se restablece la transmisión de mensajes, websocketd reanuda automáticamente la transmisión de mensajes a los clientes.

  1. En la instancia, ejecuta pulltop en el tema público y redirecciona el resultado del mensaje al archivo local taxi.json. El comando nohup le indica al SO que mantenga el proceso pulltop en ejecución si sales de la terminal o la cierras.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifica que los mensajes JSON se escriban en el archivo:

    tail /var/tmp/taxi.json
    

    Si los mensajes se escriben en el archivo taxi.json, el resultado es similar al siguiente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Cambia a la carpeta web de tu app:

    cd ../web
    
  4. Inicia websocketd para comenzar a transmitir el contenido del archivo local mediante WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Esto ejecuta el comando websocketd en segundo plano. La herramienta websocketd consume el resultado del comando tail y transmite cada elemento como un mensaje WebSocket.

  5. Comprueba el contenido de nohup.out para verificar si el servidor se inició de forma correcta:

    tail nohup.out
    

    Si todo funciona de manera correcta, el resultado es similar al siguiente:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualiza mensajes

Los mensajes de viajes individuales publicados en el tema de Pub/Sub tienen una estructura como la siguiente:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

En función de estos valores, calcularás varias métricas para el encabezado del panel. Los cálculos se ejecutan una vez por evento de viaje entrante. Esto es lo que incluyen los valores:

  • Latencia del último mensaje. La cantidad de segundos entre la marca de tiempo del último evento del viaje observado y la hora actual (derivada del reloj del sistema que aloja el navegador web)
  • Viajes activos. La cantidad de viajes en curso en este momento. Esta cantidad puede crecer muy rápido y disminuye cuando se observa un valor ride_status de dropoff
  • Frecuencia de mensajes. La cantidad promedio de eventos de viajes procesados por segundo
  • Cantidad total de uso medido. La suma de los medidores de todos los viajes activos. Este número disminuye a medida que los viajes llegan a destino
  • Cantidad total de pasajeros. La cantidad de pasajeros en todos los viajes. Este número disminuye a medida que se completan los viajes
  • Promedio de pasajeros por viaje. La cantidad total de viajes, dividida por la cantidad total de pasajeros
  • Cantidad de metros promedio por pasajero. La cantidad de metros total dividida por la cantidad total de pasajeros

Además de las métricas y las muestras de viajes individuales, cuando se recoge o se deja a un pasajero, el panel muestra una notificación de alerta arriba de la cuadrícula de las muestras de viajes.

  1. Obtén la dirección IP externa de la instancia actual:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copia la dirección IP.

  3. En tu máquina local, abre un navegador web nuevo y escribe la URL:

    http://$ip-address:8000.

    Verás una página que muestra el panel de este instructivo:

    Panel creado mediante código en este instructivo, con un mensaje de bienvenida y antes de que se muestren los datos.

  4. Haz clic en el ícono de taxi en la parte superior para abrir una conexión a la transmisión y comenzar a procesar los mensajes.

    Los viajes individuales se visualizan con una muestra de nueve viajes activos que se procesan cada tres segundos:

    Panel que muestra los viajes activos.

    Puedes hacer clic en el ícono de taxi en cualquier momento para iniciar o detener la transmisión de WebSocket. Si la conexión de WebSocket se corta, el ícono se vuelve rojo y las actualizaciones de las métricas y los viajes individuales se detienen. Para volver a establecer conexión, haz clic de nuevo en el ícono de taxi.

Rendimiento

En la siguiente captura de pantalla, se muestra la supervisión del rendimiento de las Herramientas para desarrolladores de Chrome mientras la pestaña del navegador procesa alrededor de 2,100 mensajes por segundo.

Panel de supervisión del rendimiento del navegador que muestra el uso de CPU, el tamaño de la pila, los nodos DOM y los cálculos nuevos de estilo por segundo. Los valores son relativamente estables.

Dado que el envío de mensajes ocurre con una latencia de alrededor de 30 ms, el uso promedio de CPU es de alrededor del 80%. El uso de la memoria se muestra en un mínimo de 29 MB, con 57 MB en total asignados, y crece y se reduce sin impedimentos.

Limpia

Quita reglas de firewall

Si usaste un proyecto existente para este instructivo, puedes quitar las reglas de firewall que creaste. Recomendamos minimizar los puertos abiertos.

  1. Borra la regla de firewall que creaste para permitir el TCP en el puerto 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Si también creaste una regla de firewall para permitir la conectividad SSH, borra la regla de firewall para permitir el TCP en el puerto 22:

    gcloud compute firewall-rules delete wss-ssh
    

Borra el proyecto

Si no deseas volver a usar este proyecto, puedes borrarlo.

  1. En la consola de Google Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

¿Qué sigue?