Streaming di messaggi Pub/Sub su WebSocket


Questo tutorial illustra un modo a disposizione di un'app di frontend, in questo caso una pagina web, per gestire elevati volumi di dati in entrata quando utilizzi Google Cloud. Il tutorial descrive alcune delle sfide degli stream ad alto volume. Insieme a questo tutorial viene fornita un'app di esempio che illustra come utilizzare WebSockets per visualizzare un flusso denso di messaggi pubblicati in un argomento Pub/Sub, elaborandoli in modo tempestivo mantenendo un frontend ad alte prestazioni.

Questo tutorial è rivolto agli sviluppatori che hanno familiarità con la comunicazione browser-to-server tramite HTTP e con la scrittura di app frontend utilizzando HTML, CSS e JavaScript. Il tutorial presuppone che tu abbia una certa esperienza con Google Cloud e abbia familiarità con gli strumenti a riga di comando di Linux.

Obiettivi

  • Crea e configura un'istanza di macchina virtuale (VM) con i componenti necessari per trasmettere in streaming i payload di una sottoscrizione Pub/Sub ai client del browser.
  • Configurare un processo sulla VM per sottoscrivere un argomento Pub/Sub e generare i singoli messaggi in un log.
  • Installa un server web per fornire contenuti statici e trasmettere l'output del comando shell ai client WebSocket.
  • Visualizza le aggregazioni dei flussi WebSocket e i singoli esempi di messaggi in un browser utilizzando HTML, CSS e JavaScript.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono essere idonei a una prova senza costi aggiuntivi.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Apri Cloud Shell per eseguire i comandi elencati in questo tutorial.

    VAI A Cloud Shell

    Esegui tutti i comandi del terminale in questo tutorial da Cloud Shell.

  7. Abilita l'API Compute Engine e l'API Pub/Sub:
    gcloud services enable compute pubsub

Al termine di questo tutorial, puoi evitare di continuare la fatturazione eliminando le risorse che hai creato. Consulta Pulizia per ulteriori dettagli.

Introduzione

Poiché sempre più app adottano modelli basati su eventi, è importante che le app di frontend siano in grado di creare connessioni semplici e senza attriti ai servizi di messaggistica che costituiscono la pietra miliare di queste architetture.

Esistono diverse opzioni per l'invio di flussi di dati ai client del browser web; la più comune è WebSocket. Questo tutorial illustra come installare un processo che sottoscrive un flusso di messaggi pubblicati in un argomento Pub/Sub e a cui indirizza questi messaggi tramite il server web instradato ai client connessi tramite WebSocket.

Per questo tutorial, utilizzerai l'argomento Pub/Sub disponibile pubblicamente nel NYC Taxi Tycoon CodeLab di Google Dataflow. Questo argomento fornisce un flusso in tempo reale di telemetria sui taxi simulati basati sui dati storici delle corse effettuati a New York dai set di dati relativi alle corse della Commissione taxi e limousine.

Architettura

Il seguente diagramma mostra l'architettura del tutorial creato in questo tutorial.

Architettura del tutorial

Il diagramma mostra un publisher di messaggi esterno al progetto che contiene la risorsa Compute Engine; il publisher invia messaggi a un argomento Pub/Sub. L'istanza Compute Engine rende i messaggi disponibili su WebSocket a un browser che esegue una dashboard basata su HTML5 e JavaScript.

Questo tutorial utilizza una combinazione di strumenti per collegare Pub/Sub e Websockets:

  • pulltop è un programma Node.js che installi nell'ambito di questo tutorial. Lo strumento sottoscrive un argomento Pub/Sub e trasmette i messaggi ricevuti all'output standard.
  • websocketd è un piccolo strumento a riga di comando che esegue il wrapping di un programma di interfaccia a riga di comando esistente e consente l'accesso tramite WebSocket.

Combinando pulltop e websocketd, puoi fare in modo che i messaggi ricevuti dall'argomento Pub/Sub vengano trasmessi in streaming a un browser utilizzando WebSocket.

Regolazione della velocità effettiva dell'argomento Pub/Sub

L'argomento Pub/Sub pubblico NYC Taxi Tycoon genera da 2000 a 2500 aggiornamenti delle corse in taxi simulate al secondo, fino a 8 Mb o più al secondo. Il controllo del flusso integrato in Pub/Sub rallenta automaticamente la velocità dei messaggi di un sottoscrittore se Pub/Sub rileva una coda crescente di messaggi non confermati. Pertanto, potresti notare un'elevata variabilità della velocità dei messaggi tra workstation, connessioni di rete e codice di elaborazione front-end.

Elaborazione effettiva dei messaggi del browser

Dato l'elevato volume di messaggi in arrivo nel flusso WebSocket, devi prestare attenzione nella scrittura del codice frontend che elabora questo flusso. Ad esempio, puoi creare dinamicamente elementi HTML per ogni messaggio. Tuttavia, alla velocità prevista per i messaggi, l'aggiornamento della pagina di ciascun messaggio potrebbe bloccare la finestra del browser. Anche le allocazioni frequenti della memoria derivanti dalla creazione dinamica di elementi HTML estendono la durata della garbage collection, peggiorando l'esperienza utente. In breve, non devi chiamare document.createElement() per ognuno dei circa 2000 messaggi che arrivano ogni secondo.

L'approccio adottato da questo tutorial per gestire questo flusso denso di messaggi è il seguente:

  • Calcola e aggiorna continuamente un insieme di metriche di flusso in tempo reale, mostrando la maggior parte delle informazioni sui messaggi osservati come valori aggregati.
  • Utilizza una dashboard basata su browser per visualizzare un piccolo campione di singoli messaggi in base a una programmazione predefinita, mostrando in tempo reale solo gli eventi di abbandono e prelievo.

La figura seguente mostra la dashboard creata nell'ambito di questo tutorial.

Dashboard creata nella pagina web dal codice di questo tutorial

La figura mostra una latenza dell'ultimo messaggio di 24 millisecondi a una frequenza di quasi 2100 messaggi al secondo. Se i percorsi del codice critici per l'elaborazione di ogni singolo messaggio non vengono completati nel tempo, il numero di messaggi osservati al secondo diminuisce con l'aumento della latenza dell'ultimo messaggio. Il campionamento delle corse viene eseguito utilizzando l'API JavaScript setInterval impostata su un ciclo una volta ogni tre secondi, il che impedisce al frontend di creare un numero enorme di elementi DOM nel corso della sua durata. (la stragrande maggioranza è praticamente non osservabile a velocità superiori a 10 al secondo).

La dashboard inizia a elaborare gli eventi a metà dello stream, pertanto le corse già in corso vengono riconosciute come nuove dalla dashboard, a meno che non siano state viste in precedenza. Il codice utilizza un array associativo per memorizzare ogni corsa osservata, indicizzata dal valore ride_id, e rimuove il riferimento a una determinata corsa quando il passeggero è stato consegnato. Le corse in stato "instradato" o "ritiro" aggiungono un riferimento a tale array, a meno che la corsa non sia stata osservata in precedenza.

Installa e configura il server WebSocket

Per iniziare, crea un'istanza Compute Engine che utilizzerai come server WebSocket. Dopo aver creato l'istanza, installi gli strumenti di cui hai bisogno in seguito.

  1. In Cloud Shell, imposta la zona predefinita di Compute Engine. L'esempio seguente mostra us-central1-a, ma puoi utilizzare qualsiasi zona che vuoi.

    gcloud config set compute/zone us-central1-a
    
  2. Crea un'istanza Compute Engine denominata websocket-server nella zona predefinita:

    gcloud compute instances create websocket-server --tags wss
    
  3. Aggiungi una regola firewall che consenta il traffico TCP sulla porta 8000 a qualsiasi istanza contrassegnata come wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Se utilizzi un progetto esistente, assicurati che la porta TCP 22 sia aperta per consentire la connettività SSH all'istanza.

    Per impostazione predefinita, la regola firewall default-allow-ssh è abilitata nella rete predefinita. Tuttavia, se tu o il tuo amministratore avete rimosso la regola predefinita in un progetto esistente, la porta TCP 22 potrebbe non essere aperta. Se hai creato un nuovo progetto per questo tutorial, la regola è abilitata per impostazione predefinita e non devi fare nulla.

    Aggiungi una regola firewall che consenta il traffico TCP sulla porta 22 a qualsiasi istanza contrassegnata come wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Connettiti all'istanza tramite SSH:

    gcloud compute ssh websocket-server
    
  6. Al comando del terminale dell'istanza, passa all'account root in modo da poter installare il software:

    sudo -s
    
  7. Installa gli strumenti git e unzip:

    apt-get install -y unzip git
    
  8. Installa il programma binario websocketd sull'istanza:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Installa Node.js e il codice tutorial

  1. In un terminale dell'istanza, installa Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Scarica il repository del codice sorgente del tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Modifica le autorizzazioni su pulltop per consentire l'esecuzione:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Installa le dipendenze pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Testa che il pulltop può leggere i messaggi

  1. Nell'istanza, esegui pulltop sull'argomento pubblico:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Se pulltop funziona, vedrai un flusso di risultati come il seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Premi Ctrl+C per interrompere lo stream.

Stabilisci il flusso di messaggi in websocketd

Ora che hai stabilito che pulltop può leggere l'argomento Pub/Sub, puoi avviare il processo websocketd per iniziare a inviare messaggi al browser.

Acquisisci i messaggi degli argomenti in un file locale

Per questo tutorial, acquisterai il flusso di messaggi che ricevi da pulltop e lo scriverai in un file locale. L'acquisizione del traffico dei messaggi in un file locale aggiunge un requisito di archiviazione, ma disaccoppia anche l'operazione del processo websocketd dai messaggi dell'argomento Pub/Sub in modalità flusso. L'acquisizione delle informazioni localmente consente scenari in cui potresti voler interrompere temporaneamente i flussi di dati Pub/Sub (ad esempio per regolare i parametri di controllo del flusso), ma non forzare la reimpostazione dei client WebSocket attualmente connessi. Quando il flusso di messaggi viene ristabilito, websocketd riprende automaticamente il flusso di messaggi ai client.

  1. Nell'istanza, esegui pulltop sull'argomento pubblico e reindirizza l'output del messaggio al file taxi.json locale. Il comando nohup indica al sistema operativo di mantenere in esecuzione il processo pulltop se ti disconnetti o chiudi il terminale.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifica che i messaggi JSON vengano scritti nel file:

    tail /var/tmp/taxi.json
    

    Se i messaggi vengono scritti nel file taxi.json, l'output è simile al seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Passa alla cartella web della tua app:

    cd ../web
    
  4. Avvia websocketd per iniziare il flusso di contenuti del file locale utilizzando WebSocket:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Questa operazione esegue il comando websocketd in background. Lo strumento websocketd utilizza l'output del comando tail e trasmette ogni elemento come messaggio WebSocket.

  5. Controlla i contenuti di nohup.out per verificare che il server sia stato avviato correttamente:

    tail nohup.out
    

    Se tutto funziona correttamente, l'output è simile al seguente:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualizzazione dei messaggi

I singoli messaggi relativi alle corse pubblicati nell'argomento Pub/Sub hanno una struttura simile alla seguente:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

In base a questi valori, calcoli diverse metriche per l'intestazione della dashboard. I calcoli vengono eseguiti una volta per ogni evento di corsa in entrata. I valori includono:

  • Latenza ultimo messaggio. Il numero di secondi tra il timestamp dell'evento dell'ultima corsa osservata e l'ora attuale (derivata dall'orologio del sistema che ospita il browser web).
  • Giri in bici. Il numero di corse attualmente in corso. Questo numero può aumentare rapidamente e diminuisce quando viene osservato un valore ride_status pari a dropoff.
  • Frequenza messaggi. Il numero medio di eventi di corsa elaborati al secondo.
  • Importo totale misurato. La somma dei metri di tutte le corse attive. Questo numero diminuisce man mano che le corse vengono dismesse.
  • Numero totale di passeggeri. Il numero di passeggeri per tutte le corse. Questo numero diminuisce man mano che vengono completate le corse.
  • Numero medio di passeggeri per corsa. Il numero totale di corse, diviso per il numero totale di passeggeri.
  • Importo medio misurato per passeggero. Il totale misurato diviso per il numero totale di passeggeri.

Oltre alle metriche e ai singoli esempi di corse, quando un passeggero viene preso o consegnato a un passeggero, la dashboard mostra una notifica di avviso sopra la griglia dei campioni delle corse.

  1. Ottieni l'indirizzo IP esterno dell'istanza attuale:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copia l'indirizzo IP.

  3. Sul computer locale, apri un nuovo browser web e inserisci l'URL:

    http://$ip-address:8000.

    Viene visualizzata una pagina che mostra la dashboard di questo tutorial:

    Dashboard creata tramite codice in questo tutorial, con messaggio di benvenuto e prima della visualizzazione dei dati.

  4. Fai clic sull'icona del taxi in alto per aprire una connessione allo stream e iniziare a elaborare i messaggi.

    Le singole corse vengono visualizzate con un campione di nove corse attive visualizzate ogni tre secondi:

    Pannello che mostra le corse attive.

    Puoi fare clic sull'icona del taxi in qualsiasi momento per avviare o arrestare il flusso WebSocket. Se la connessione a WebSocket viene interrotta, l'icona diventa rossa e gli aggiornamenti delle metriche e delle singole corse vengono interrotti. Per riconnetterti, fai di nuovo clic sull'icona del taxi.

Prestazioni

Il seguente screenshot mostra il Performance Monitor degli Strumenti per sviluppatori di Chrome mentre la scheda del browser elabora circa 2100 messaggi al secondo.

Riquadro di monitoraggio delle prestazioni del browser che mostra l'utilizzo della CPU, le dimensioni dello heap, i nodi DOM e i ricalcoli di stile al secondo. I valori sono relativamente piatti.

Con l'invio dei messaggi a una latenza di circa 30 ms, l'utilizzo medio della CPU si aggira intorno all'80%. L'utilizzo della memoria è mostrato come minimo 29 MB, di cui 57 MB in totale vengono allocati e possono aumentare e ridursi liberamente.

Esegui la pulizia

Rimuovi regole firewall

Se hai utilizzato un progetto esistente per questo tutorial, puoi rimuovere le regole firewall che hai creato. È buona norma ridurre al minimo le porte aperte.

  1. Elimina la regola firewall che hai creato per consentire il protocollo TCP sulla porta 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Se hai anche creato una regola firewall per consentire la connettività SSH, elimina la regola firewall per consentire TCP sulla porta 22:

    gcloud compute firewall-rules delete wss-ssh
    

Elimina il progetto

Se non vuoi utilizzare di nuovo questo progetto, puoi eliminarlo.

  1. Nella console Google Cloud, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Passaggi successivi