Flusso di messaggi Pub/Sub su WebSocket


Questo tutorial illustra un modo per un'app frontend, in questo caso una web per gestire volumi elevati di dati in entrata quando utilizzi in Google Cloud. Il tutorial descrive alcune delle le sfide poste dai flussi di dati ad alto volume. Questo viene fornito con un'app di esempio tutorial che illustra come utilizzare WebSockets per visualizzare un flusso denso di in un argomento Pub/Sub, elaborandoli in modo tempestivo che mantiene un frontend efficiente.

Questo tutorial è rivolto agli sviluppatori che hanno familiarità con la tecnologia browser-to-server la comunicazione su HTTP e la scrittura di app di frontend tramite HTML, CSS e JavaScript. Il tutorial presuppone che tu abbia una certa esperienza con Google Cloud e hai familiarità con gli strumenti a riga di comando di Linux.

Obiettivi

  • Crea e configura un'istanza di una macchina virtuale (VM) con le necessarie per trasmettere in flusso i payload di una sottoscrizione Pub/Sub ai client del browser.
  • configura un processo sulla VM per la sottoscrizione a Pub/Sub e inviare i singoli messaggi a un log.
  • Installa un server web per pubblicare contenuti statici e trasmettere il flusso del comando shell l'output ai client WebSocket.
  • Visualizzare le aggregazioni del flusso WebSocket e il singolo messaggio esempi in un browser utilizzando HTML, CSS e JavaScript.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Apri Cloud Shell per eseguire i comandi elencati in questo tutorial.

    VAI A Cloud Shell

    Esegui tutti i comandi del terminale in questo tutorial in Cloud Shell.

  7. Abilita l'API Compute Engine e l'API Pub/Sub:
    gcloud services enable compute pubsub

Al termine di questo tutorial, puoi evitare la fatturazione continua eliminando il le risorse che hai creato. Per ulteriori dettagli, consulta la sezione Pulizia.

Introduzione

Poiché sempre più app adottano modelli basati su eventi, è importante che il frontend sono in grado di creare connessioni semplici e fluide alla messaggistica e servizi che costituiscono la pietra miliare di queste architetture.

Esistono diverse opzioni per trasmettere i flussi di dati ai client del browser web; il più alto comuni sono WebSocket. Questo tutorial illustra come installare che sottoscrive un flusso di messaggi pubblicati in un Pub/Sub e indirizzarli attraverso il server web verso client connessi tramite WebSocket.

Per questo tutorial utilizzerai Pub/Sub, utilizzato in NYC Taxi Tycoon Google Dataflow CodeLab. Questo argomento fornisce un flusso in tempo reale di dati di telemetria sui taxi simulati in base ai dati storici relativi alle corse scattate a New York dal taxi e Limousine Record di viaggio della commissione set di dati.

Architettura

Il seguente diagramma mostra l'architettura del tutorial che crei in questo tutorial.

Architettura del tutorial

Il diagramma mostra un publisher di messaggi esterno al progetto che contiene la risorsa Compute Engine; il publisher invia messaggi a un Pub/Sub. L'istanza Compute Engine rende disponibili tramite WebSocket a un browser che esegue una dashboard basata su HTML5 e JavaScript.

Questo tutorial usa una combinazione di strumenti per collegare Pub/Sub e WebSocket:

  • pulltop è un programma Node.js da installare nell'ambito di questo tutorial. Lo strumento sottoscrive un argomento Pub/Sub e i flussi di dati ricevuti in un output standard.
  • websocketd è un piccolo strumento a riga di comando che aggrega un'interfaccia a riga di comando esistente e consente l'accesso tramite WebSocket.

Combinando pulltop e websocketd, puoi avere i messaggi che ricevi dall'argomento Pub/Sub trasmesso in flusso a un browser utilizzando WebSocket.

Regolazione della velocità effettiva dell'argomento Pub/Sub

L'argomento Pub/Sub pubblico di NYC Taxi Tycoon genera da 2000 a 2500 aggiornamenti simulati di corse in taxi al secondo, fino a 8 Mb o più al secondo. La il controllo del flusso integrato in Pub/Sub rallenta il messaggio di un sottoscrittore valutano automaticamente se Pub/Sub rileva una coda crescente di messaggi non confermati. Pertanto, potresti notare un'elevata variabilità della frequenza dei messaggi su diverse workstation, connessioni di rete ed elaborazione front-end le API nel tuo codice.

Elaborazione efficace dei messaggi nel browser

Dato l'elevato volume di messaggi provenienti dallo stream WebSocket, di scrivere il codice frontend che elabora questo flusso. Ad esempio, puoi creare dinamicamente elementi HTML per ogni messaggio. Ma al la frequenza di messaggi prevista, l'aggiornamento della pagina per ciascun messaggio potrebbe bloccare finestra del browser. Allocazioni frequenti della memoria derivanti da dinamicamente la creazione di elementi HTML prolunga la durata della garbage collection, diminuendo un'esperienza utente positiva. In breve, non vuoi chiamare document.createElement() per arriva ogni secondo ciascuno dei circa 2000 messaggi.

L'approccio di questo tutorial per la gestione di questo flusso denso di messaggi è il seguente:

  • Calcola e aggiorna continuamente un insieme di metriche relative ai flussi in tempo reale, mostrare la maggior parte delle informazioni sui messaggi osservati sotto forma di valori aggregati.
  • Utilizza una dashboard basata su browser per visualizzare un piccolo campione di i messaggi in base a una programmazione predefinita, che mostra solo gli eventi di consegna e ritiro in tempo reale.

La figura seguente mostra la dashboard creata nell'ambito di questa tutorial di Google Cloud.

Dashboard creata nella pagina web dal codice in questo tutorial

La figura mostra una latenza dell'ultimo messaggio di 24 millisecondi di quasi 2100 messaggi al secondo. Se i percorsi critici del codice per l'elaborazione ogni singolo messaggio non viene completato in tempo, il numero di messaggi osservati al secondo con l'aumento della latenza dell'ultimo messaggio. Il campionamento delle corse viene eseguito utilizzando l'API JavaScript setInterval impostata sul ciclo una volta ogni tre secondi, impedendo al frontend di creare un'enorme di elementi DOM nel corso della sua durata. (La stragrande maggioranza di questi sono comunque praticamente non osservabili con velocità superiori ai 10 al secondo.)

La dashboard inizia a elaborare gli eventi a metà dello stream, quindi già in corso vengono riconosciuti come nuovi dalla dashboard, a meno che non siano stati mai visti prima. Il codice utilizza un array associativo per memorizzare ogni corsa osservata, indicizzato dal valore ride_id e rimuove il riferimento a una determinata corsa quando il passeggero è stato fatto scendere. Corse in un percorso o "ritiro" stato aggiungere un riferimento a quell'array a meno che (nel caso di "inroute") la corsa non abbia osservato in precedenza.

Installare e configurare il server WebSocket

Per iniziare, crea un'istanza Compute Engine che utilizzerai come WebSocket. Dopo aver creato l'istanza, installerai al suo interno strumenti che di cui avrai bisogno in seguito.

  1. In Cloud Shell imposta la zona Compute Engine predefinita. L'esempio seguente mostra us-central1-a, ma puoi utilizzare qualsiasi zona che desiderato.

    gcloud config set compute/zone us-central1-a
    
  2. Crea un'istanza Compute Engine denominata websocket-server in la zona predefinita:

    gcloud compute instances create websocket-server --tags wss
    
  3. Aggiungi una regola firewall che consenta il traffico TCP sulla porta 8000 verso qualsiasi istanza con tag wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Se utilizzi un progetto esistente, assicurati che la porta TCP 22 sia e aprirli per consentire la connettività SSH all'istanza.

    Per impostazione predefinita, la regola firewall default-allow-ssh è abilitata per impostazione predefinita in ogni rete. Tuttavia, se tu o il tuo amministratore avete rimosso il valore predefinito in un progetto esistente, la porta TCP 22 potrebbe non essere aperta. (Se creato un nuovo progetto per questo tutorial, la regola è abilitata per impostazione predefinita, e non devi fare nulla.)

    Aggiungi una regola firewall che consenta il traffico TCP sulla porta 22 verso qualsiasi istanza con tag wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Connettiti all'istanza mediante SSH:

    gcloud compute ssh websocket-server
    
  6. Al comando del terminale dell'istanza, passa all'account root in modo che che puoi installare il software:

    sudo -s
    
  7. Installa gli strumenti git e unzip:

    apt-get install -y unzip git
    
  8. Installa il programma binario websocketd sull'istanza:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Installa Node.js e il codice del tutorial

  1. In un terminale sull'istanza, installa Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Scarica il repository di origine del tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Modifica le autorizzazioni su pulltop per consentire l'esecuzione:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Installa le dipendenze pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Verifica che il pulltop possa leggere i messaggi

  1. Nell'istanza, esegui pulltop sull'argomento pubblico:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Se pulltop funziona, vedrai uno stream di risultati come il seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Premi Ctrl+C per interrompere lo stream.

Stabilisci flusso di messaggi verso websocket

Ora che hai stabilito che pulltop può leggere Pub/Sub puoi avviare il processo websocketd per iniziare a inviare messaggi all' del browser.

Acquisire i messaggi degli argomenti in un file locale

Per questo tutorial, acquisirai lo stream di messaggi che ottieni da pulltop e la scrivi in un file locale. L'acquisizione del traffico dei messaggi verso un file locale aggiunge un di archiviazione, ma disaccoppia anche il funzionamento dell'websocketd dai messaggi di argomento Pub/Sub in modalità flusso. Acquisizione in corso... le informazioni a livello locale consentono a scenari in cui potresti voler interrompere il flusso di dati di Pub/Sub (ad esempio per regolare i parametri di controllo del flusso) ma non forzare il ripristino ai client WebSocket attualmente connessi. Quando il flusso di messaggi viene ristabilito, websocketd ripristina automaticamente il flusso di messaggi verso i client.

  1. Nell'istanza, esegui pulltop sull'argomento pubblico e reindirizza nel file taxi.json locale. Il comando nohup indica il sistema operativo per mantenere in esecuzione il processo pulltop se ti disconnetti o chiudi o nel terminale.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifica che i messaggi JSON vengano scritti nel file:

    tail /var/tmp/taxi.json
    

    Se i messaggi vengono scritti nel file taxi.json, l'output viene simile al seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Passa alla cartella web dell'app:

    cd ../web
    
  4. Avvia websocketd per avviare lo streaming dei contenuti del file locale usando WebSocket:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Il comando websocketd verrà eseguito in background. Lo strumento websocketd consuma l'output del comando tail e trasmette in flusso ogni elemento come Messaggio WebSocket.

  5. Controlla i contenuti di nohup.out per verificare che il server sia stato avviato correttamente:

    tail nohup.out
    

    Se tutto funziona correttamente, l'output è simile al seguente:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualizzazione dei messaggi

I singoli messaggi relativi alle corse pubblicati nell'argomento Pub/Sub hanno un struttura simile alla seguente:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

In base a questi valori, vengono calcolate diverse metriche per l'intestazione della dashboard. I calcoli vengono eseguiti una volta per ogni evento di corsa in entrata. I valori includono le seguenti:

  • Latenza dell'ultimo messaggio. Il numero di secondi tra il timestamp tra il timestamp dell'evento dell'ultima corsa osservata e l'ora corrente (derivata dall'orologio sul sistema che ospita il browser web).
  • Gite attive. Il numero di corse attualmente in corso. Questo numero può crescere rapidamente e il numero diminuisce quando un valore ride_status di dropoff.
  • Percentuale dei messaggi. Il numero medio di eventi di corsa elaborati al secondo.
  • Importo totale misurato. La somma dei metri di tutte le corse attive. Questo numero diminuisce quando le corse vengono ritirate.
  • Numero totale di passeggeri. Il numero di passeggeri in tutti corse. Questo numero diminuisce man mano che le corse vengono completate.
  • Numero medio di passeggeri per corsa. Il numero totale di corse, diviso per il numero totale di passeggeri.
  • Quantità media a consumo per passeggero. L'importo totale misurato diviso per il numero totale di passeggeri.

Oltre alle metriche e ai singoli campioni di corsa, quando un passeggero ricevuto o consegnato, la dashboard mostra una notifica di avviso sopra griglia di campioni delle corse.

  1. Ottieni l'indirizzo IP esterno dell'istanza attuale:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copia l'indirizzo IP.

  3. Sul computer locale, apri un nuovo browser web e inserisci l'URL:

    http://$ip-address:8000.

    Vedrai una pagina che mostra la dashboard per questo tutorial:

    Dashboard creata tramite codice in questo tutorial, con il messaggio di benvenuto e prima della visualizzazione di tutti i dati.

  4. Fai clic sull'icona del taxi in alto per stabilire una connessione allo stream e iniziare nell'elaborazione dei messaggi.

    Vengono visualizzate le singole corse con un campione di nove corse attive il cui rendering viene eseguito ogni tre secondi:

    Pannello che mostra le corse attive.

    Puoi fare clic sull'icona del taxi in qualsiasi momento per avviare o arrestare WebSocket flusso di dati. Se la connessione WebSocket viene interrotta, l'icona diventa rossa e gli aggiornamenti delle metriche e le singole corse vengono interrotti. Per riconnetterti, fai clic sul di nuovo l'icona taxi.

Prestazioni

Il seguente screenshot mostra il monitoraggio delle prestazioni degli Strumenti per sviluppatori di Chrome mentre la scheda del browser elabora circa 2100 messaggi al secondo.

Riquadro del monitoraggio delle prestazioni del browser che mostra l'utilizzo della CPU, le dimensioni dell'heap, i nodi DOM e i ricollegamenti degli stili al secondo. I valori sono relativamente piatti.

Quando l'invio dei messaggi avviene con una latenza di circa 30 ms, l'utilizzo medio si attesta intorno all'80%. L'utilizzo della memoria è mostrato come minimo 29 MB, per un totale di 57 MB assegnati che crescono e si riducono liberamente.

Esegui la pulizia

Rimuovi regole firewall

Se hai utilizzato un progetto esistente per questo tutorial, puoi rimuovere il firewall le regole che hai creato. È buona norma ridurre al minimo le porte aperte.

  1. Elimina la regola firewall che hai creato per consentire il protocollo TCP sulla porta 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Se hai creato anche una regola firewall per consentire la connettività SSH, elimina il regola firewall per consentire il protocollo TCP sulla porta 22:

    gcloud compute firewall-rules delete wss-ssh
    

Elimina il progetto

Se non vuoi utilizzare di nuovo questo progetto, puoi eliminarlo.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Passaggi successivi