Diffuser des messages Pub/Sub via WebSockets


Ce tutoriel montre comment une application d'interface (dans ce cas, une page Web) peut gérer de grands volumes de données entrantes lorsque vous utilisez Google Cloud. Le tutoriel décrit certains des défis liés aux flux à volume élevé. L'exemple d'application fourni avec ce tutoriel montre comment utiliser WebSockets pour visualiser un flux dense de messages publiés dans un sujet Pub/Sub, en les traitant rapidement afin de toujours disposer d'une interface performante.

Ce tutoriel est destiné aux développeurs qui maîtrisent la communication de navigateur à serveur via HTTP et l'écriture d'applications d'interface à l'aide de HTML, CSS et JavaScript. Pour ce tutoriel, nous partons du principe que vous avez une certaine expérience de Google Cloud et que vous maîtrisez les outils de ligne de commande Linux.

Objectifs

  • Créer et configurer une instance de machine virtuelle (VM) avec les composants nécessaires pour diffuser les charges utiles d'un abonnement Pub/Sub aux clients de navigateur.
  • Configurer un processus sur la VM pour vous abonner à un sujet Pub/Sub et délivrer les messages individuels dans un journal.
  • Installer un serveur Web pour diffuser du contenu statique et diffuser le résultat de la commande shell aux clients WebSocket.
  • Visualiser les agrégations de flux WebSocket et des exemples de messages individuels dans un navigateur en HTML, CSS et JavaScript.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Ouvrez Cloud Shell pour exécuter les commandes répertoriées dans ce tutoriel.

    Accéder à Cloud Shell

    Vous allez exécuter toutes les commandes de terminal à partir de Cloud Shell.

  7. Activez l'API Compute Engine et l'API Pub/Sub :
    gcloud services enable compute pubsub

Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Présentation

Alors que de plus en plus d'applications utilisent des modèles événementiels, il est important que les applications d'interface puissent établir des connexions simples à faible friction avec les services de messagerie qui constituent la base de ces architectures.

Il existe plusieurs options pour diffuser des données vers des clients de navigateur Web : la plus courante est WebSockets. Ce tutoriel explique comment installer un processus qui s'abonne à un flux de messages publiés sur un sujet Pub/Sub et acheminer ces messages vers des clients connectés via WebSockets.

Dans le cadre de ce tutoriel, vous travaillez sur le sujet Pub/Sub disponible publiquement utilisé dans l'atelier de programmation Google Dataflow NYC Taxi Tycoon. Cet article fournit un flux en temps réel d'une simulation de télémétrie de taxis basée sur l'historique des données de course relevées à New York à partir des ensembles de données de l'historique des courses de la Commission des taxis et limousines.

Architecture

Le schéma suivant montre l'architecture du tutoriel que vous créez dans ce tutoriel.

Architecture du tutoriel

Le diagramme montre un éditeur de message qui se trouve en dehors du projet et qui contient la ressource Compute Engine. L'éditeur envoie des messages à un sujet Pub/Sub. L'instance Compute Engine transmet les messages via WebSockets à un navigateur qui exécute un tableau de bord basé sur HTML5 et JavaScript.

Ce tutoriel utilise une combinaison d'outils pour relier Pub/Sub et Websockets :

  • pulltop est un programme Node.js que vous installez dans le cadre de ce tutoriel. L'outil s'abonne à un sujet Pub/Sub et diffuse les messages reçus en sortie standard.
  • websocketd est un petit outil de ligne de commande qui encapsule une interface de ligne de commande existante et permet d'y accéder à l'aide d'un WebSocket.

En combinant pulltop et websocketd, vous pouvez diffuser les messages du sujet Pub/Sub dans un navigateur à l'aide de WebSockets.

Ajustement du débit des sujets Pub/Sub

Le sujet Pub/Sub public de NYC Taxi Tycoon génère entre 2 000 et 2 500 simulations de mises à jour de courses en taxi par seconde, jusqu'à 8 Mo ou plus par seconde. Le contrôle de flux intégré dans Pub/Sub ralentit automatiquement la fréquence des messages d'un abonné si Pub/Sub détecte une file d'attente croissante de messages non confirmés. Par conséquent, vous pouvez constater une forte variabilité de la fréquence de messages entre différents postes de travail, connexions réseau et code de traitement d'interface.

Traitement efficace des messages du navigateur

Étant donné le grand nombre de messages envoyés par le flux WebSocket, vous devez être attentif à l'écriture du code d'interface qui traite ce flux. Par exemple, vous pourriez créer des éléments HTML de façon dynamique pour chaque message. Toutefois, à la fréquence attendue, la mise à jour de la page de chaque message risquerait de verrouiller la fenêtre du navigateur. Les fréquentes allocations de mémoire résultant de la création dynamique d'éléments HTML prolongent également la durée de récupération de mémoire, ce qui nuit à l'expérience utilisateur. En bref, vous ne souhaitez pas appeler document.createElement() pour chacun des quelque 2 000 messages reçus chaque seconde.

Voici la marche à suivre pour gérer ce flux dense de messages :

  • Calculez et mettez à jour en continu un ensemble de métriques de flux en temps réel, en affichant la majorité des informations sur les messages observés sous forme de valeurs globales.
  • Utilisez un tableau de bord basé sur un navigateur pour visualiser un petit échantillon de messages individuels selon une planification prédéfinie, en affichant uniquement les événements de dépôt et de prise en charge en temps réel.

La figure suivante montre le tableau de bord créé dans le cadre de ce tutoriel.

Tableau de bord créé sur la page Web par le code de ce tutoriel

La figure illustre une latence de dernier message de 24 millisecondes à une fréquence de près de 2 100 messages par seconde. Si les chemins de code critiques pour le traitement de chaque message individuel ne se terminent pas à temps, le nombre de messages observés par seconde diminue à mesure que la dernière latence augmente. L'échantillonnage des courses est effectué à l'aide de l'API JavaScript setInterval définie pour un cycle toutes les trois secondes, ce qui empêche l'interface de créer un nombre considérable d'éléments DOM au cours de sa durée de vie. (La grande majorité d'entre eux sont presque impossibles à observer à des taux supérieurs à 10 par seconde.)

Le tableau de bord commence à traiter les événements au milieu du flux. Les courses déjà en cours sont donc reconnues comme nouvelles par le tableau de bord, sauf si elles ont déjà été vues. Le code utilise un tableau associatif pour stocker chaque course observée, indexée par la valeur ride_id, et supprime la référence à une course particulière lorsque le passager est déposé. Les courses à l'état "en route" ou "déposé" ajoutent une référence à ce tableau, sauf (pour "en route") si la course a déjà été observée.

Installer et configurer le serveur WebSocket

Pour commencer, créez une instance Compute Engine que vous utiliserez comme serveur WebSocket. Une fois l'instance créée, installez-y les outils dont vous aurez besoin ultérieurement.

  1. Dans Cloud Shell, définissez la zone Compute Engine par défaut. L'exemple suivant montre us-central1-a, mais vous pouvez utiliser n'importe quelle zone.

    gcloud config set compute/zone us-central1-a
    
  2. Créez une instance Compute Engine nommée websocket-server dans la zone par défaut :

    gcloud compute instances create websocket-server --tags wss
    
  3. Ajoutez une règle de pare-feu qui autorise le trafic TCP sur le port 8000 vers toute instance marquée comme wss :

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Si vous utilisez un projet existant, assurez-vous que le port TCP 22 est ouvert pour autoriser la connectivité SSH à l'instance.

    Par défaut, la règle de pare-feu default-allow-ssh est activée dans le réseau par défaut. Toutefois, si vous ou votre administrateur avez supprimé la règle par défaut dans un projet existant, le port TCP 22 n'est peut-être pas ouvert. (Si vous avez créé un projet pour ce tutoriel, la règle est activée par défaut, et aucune action de votre part n'est requise.)

    Ajoutez une règle de pare-feu qui autorise le trafic TCP sur le port 22 vers toute instance marquée comme wss :

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Connectez-vous à l'instance via SSH :

    gcloud compute ssh websocket-server
    
  6. À la commande de terminal de l'instance, basculez les comptes sur root pour pouvoir installer le logiciel :

    sudo -s
    
  7. Installez les outils git et unzip :

    apt-get install -y unzip git
    
  8. Installez le binaire websocketd sur l'instance :

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Installer Node.js et le code du tutoriel

  1. Dans un terminal de l'instance, installez Node.js :

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Téléchargez le dépôt source du tutoriel :

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Modifiez les autorisations sur pulltop pour autoriser l'exécution :

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Installez les dépendances pulltop :

    cd pulltop
    npm install
    sudo npm link
    

Vérifier que pulltop peut lire les messages

  1. Sur l'instance, exécutez pulltop sur le sujet public :

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Si pulltop fonctionne, un flux de résultats semblable à celui-ci s'affiche :

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Appuyez sur Ctrl+C pour arrêter le flux.

Établir le flux de messages vers websocketd

Maintenant que pulltop peut lire le sujet Pub/Sub, vous pouvez démarrer le processus websocketd pour commencer à envoyer des messages au navigateur.

Capturer des messages de sujet dans un fichier local

Pour ce tutoriel, vous allez capturer le flux de messages que vous recevez de pulltop et l'écrire dans un fichier local. La capture du trafic des messages dans un fichier local entraîne une augmentation de l'espace de stockage, mais dissocie également l'opération du processus websocketd des messages de sujet Pub/Sub diffusés. La capture locale des informations permet de suspendre temporairement la diffusion Pub/Sub (par exemple pour ajuster les paramètres de contrôle de flux) sans forcer la réinitialisation des clients WebSocket actuellement connectés. Lorsque le flux de messages est rétabli, websocketd reprend automatiquement la diffusion des messages aux clients.

  1. Sur l'instance, exécutez pulltop sur le sujet public et redirigez le message vers le fichier local taxi.json. La commande nohup demande au système d'exploitation de maintenir le processus pulltop en cours d'exécution si vous vous déconnectez ou fermez le terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Vérifiez que les messages JSON sont écrits dans le fichier :

    tail /var/tmp/taxi.json
    

    Si les messages sont écrits dans le fichier taxi.json, le résultat est semblable à celui-ci :

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Accédez au dossier Web de votre application :

    cd ../web
    
  4. Démarrez websocketd pour lancer la diffusion du contenu du fichier local à l'aide de WebSockets :

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    La commande websocketd est exécutée en arrière-plan. L'outil websocketd utilise le résultat de la commande tail et diffuse chaque élément en tant que message WebSocket.

  5. Vérifiez le contenu de nohup.out pour vous assurer que le serveur a démarré correctement :

    tail nohup.out
    

    Si tout fonctionne correctement, le résultat ressemble à ce qui suit :

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualiser les messages

Les messages individuels concernant les courses envoyés au sujet Pub/Sub ont la structure suivante :

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

En fonction de ces valeurs, vous calculez plusieurs métriques pour l'en-tête du tableau de bord. Les calculs sont exécutés une seule fois par événement de course entrant. Les valeurs sont les suivantes :

  • Latence du dernier message. Il s'agit du nombre de secondes entre l'horodatage de l'événement de la dernière course constatée et l'heure actuelle (dérivée de l'horloge du système qui héberge le navigateur Web).
  • Courses actives. Nombre de courses en cours. Ce nombre peut augmenter rapidement et diminuer lorsqu'une valeur ride_status pour dropoff est observée.
  • Fréquence des messages. Nombre moyen d'événements de course traités par seconde.
  • Montant total au compteur. Somme des compteurs pour toutes les courses actives. Ce nombre diminue à mesure que les clients sont déposés.
  • Nombre total de passagers. Nombre de passagers toutes courses confondues. Ce nombre diminue à mesure que les courses sont terminées.
  • Nombre moyen de passagers par course. Nombre total de courses, divisé par le nombre total de passagers.
  • Montant moyen au compteur par passager. Montant total au compteur divisé par le nombre total de passagers.

En plus des métriques et des exemples de courses individuels, le tableau de bord affiche une notification d'alerte au-dessus de la grille des exemples lorsqu'un client est pris en charge ou déposé.

  1. Obtenez l'adresse IP externe de l'instance actuelle :

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copiez l'adresse IP.

  3. Sur votre ordinateur local, ouvrez un nouveau navigateur Web et saisissez l'URL suivante :

    http://$ip-address:8000

    Une page contenant le tableau de bord de ce tutoriel s'affiche :

    Tableau de bord créé par le code de ce tutoriel, avec message de bienvenue avant affichage des données.

  4. Cliquez sur l'icône représentant un taxi en haut pour ouvrir une connexion au flux et commencer à traiter les messages.

    Les courses individuelles sont visualisées avec un échantillon de neuf courses actives toutes les trois secondes :

    Tableau de bord affichant les courses actives.

    Vous pouvez cliquer sur l'icône représentant un taxi à tout moment pour démarrer ou arrêter le flux WebSocket. Si la connexion WebSocket est interrompue, l'icône devient rouge et les mises à jour des métriques et des courses individuelles sont interrompues. Pour vous reconnecter, cliquez à nouveau sur l'icône représentant un taxi.

Performances

La capture d'écran suivante montre l'outil de contrôle des performances des Outils pour les développeurs Chrome alors que l'onglet du navigateur traite environ 2 100 messages par seconde.

Volet de l'outil de contrôle des performances du navigateur affichant l'utilisation du processeur, la taille du tas de mémoire, les nœuds DOM et les nouveaux calculs de style par seconde. Les valeurs sont relativement plates.

Avec une distribution des messages à une latence d'environ 30 ms, l'utilisation du processeur est en moyenne d'environ 80 %. L'utilisation de la mémoire est au minimum de 29 Mo avec un total de 57 Mo alloué et augmente ou diminue librement.

Nettoyer

Supprimer des règles de pare-feu

Si vous avez utilisé un projet existant pour ce tutoriel, vous pouvez supprimer les règles de pare-feu que vous avez créées. Il est recommandé de réduire les ports ouverts.

  1. Supprimez la règle de pare-feu que vous avez créée pour autoriser TCP sur le port 8000 :

    gcloud compute firewall-rules delete websocket
    
  2. Si vous avez également créé une règle de pare-feu pour autoriser la connectivité SSH, supprimez-la pour autoriser TCP sur le port 22 :

    gcloud compute firewall-rules delete wss-ssh
    

Supprimer le projet

Si vous ne souhaitez plus utiliser ce projet, vous pouvez le supprimer.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Étape suivante