Como fazer streaming de mensagens do Pub/Sub para WebSockets


Neste tutorial, ilustramos com um aplicativo de front-end, neste caso, uma página da Web, lida com grandes volumes de dados recebidos quando você usa o Google Cloud. O tutorial descreve alguns dos desafios dos streams de alto volume. Um aplicativo de exemplo é fornecido neste tutorial para ilustrar WebSockets para visualizar um stream denso de mensagens publicadas em um tópico do Pub/Sub, processando-as em tempo hábil para manter um front-end com bom desempenho.

Este tutorial é destinado a desenvolvedores familiarizados com a comunicação entre navegadores por HTTP e com a criação de aplicativos de front-end usando HTML, CSS e JavaScript. Para seguir o tutorial, é necessário ter alguma experiência com o Google Cloud e familiaridade com as ferramentas de linha de comando do Linux.

Objetivos

  • Criar e configurar uma instância de máquina virtual (VM) com os componentes necessários para transmitir os payloads de uma assinatura do Pub/Sub para clientes do navegador.
  • Configurar um processo na VM para assinar um tópico do Pub/Sub e enviar as mensagens individuais para um registro.
  • Instalar um servidor da Web para exibir conteúdo estático e transmitir a saída do comando do shell para clientes WebSocket.
  • Visualizar as agregações de stream do WebSocket e amostras de mensagens individuais em um navegador usando HTML, CSS e JavaScript.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  4. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Abra o Cloud Shell para executar os comandos listados neste tutorial.

    ACESSAR o Cloud Shell

    Todos os comandos de terminal neste tutorial são executados pelo Cloud Shell.

  7. Ative a API Compute Engine e a API Pub/Sub:
    gcloud services enable compute pubsub

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Limpeza.

Introdução

À medida que mais aplicativos adotam modelos baseados em eventos, é importante que os aplicativos de front-end possam fazer conexões simples e de baixo atrito com os serviços de mensagens que formam a base dessas arquiteturas.

Há várias opções de streaming de dados para clientes de navegadores da Web, e o mais comum deles é o WebSockets. Neste tutorial, mostramos como instalar um processo que assina um stream de mensagens publicadas em um tópico do Pub/Sub e direciona essas mensagens para o servidor da Web, que as redireciona para clientes conectados por WebSockets.

Para este tutorial, você trabalha com o tópico do Pub/Sub disponível publicamente, usado no NYC Taxi Tycoon Google Dataflow CodeLab. Neste tópico, apresentamos um stream em tempo real de simulações de telemetria de táxi baseada nos dados históricos de viagens feitas na cidade de Nova York com base nos conjuntos de dados do registro de viagens de táxi e limusines.

Arquitetura

O diagrama a seguir mostra a arquitetura criada neste tutorial.

Arquitetura do tutorial

O diagrama mostra um editor de mensagens que está fora do projeto e contém o recurso do Compute Engine. O editor envia mensagens para um tópico do Pub/Sub. A instância do Compute Engine disponibiliza as mensagens por WebSockets para um navegador que executa um painel com base em HTML5 e JavaScript.

Neste tutorial, usamos uma combinação de ferramentas para conectar o Pub/Sub e os Websockets:

  • O pulltop é um programa Node.js que precisa ser instalado como parte deste tutorial. Essa ferramenta assina um tópico do Pub/Sub e transmite mensagens recebidas para a saída padrão.
  • websocketd é uma pequena ferramenta de linha de comando que une um programa de interface de linha de comando atual e permite que ele seja acessado usando um WebSocket.

Ao combinar pulltop e websocketd, é possível ativar o stream de mensagens recebidas do tópico do Pub/Sub para um navegador usando WebSockets.

Como ajustar a capacidade do tópico do Pub/Sub

O tópico público do Pub/Sub referente ao NYC Taxi Tycoon gera de 2.000 a 2.500 simulações de corridas de táxi por segundo (até 8 Mb ou mais por segundo). O controle de stream integrado no Pub/Sub diminui a taxa de mensagens de um assinante automaticamente quando o Pub/Sub detecta uma fila crescente de mensagens não confirmadas. Portanto, é possível que você veja uma alta variação de taxa de mensagens em diferentes estações de trabalho, conexões de rede e código de processamento de front-end.

Como processar mensagens do navegador de maneira eficaz

Dado o grande volume de mensagens enviadas pelo stream do WebSocket, é necessário ter cuidado ao escrever o código de front-end que processa esse stream. Por exemplo, é possível criar dinamicamente elementos HTML para cada mensagem. No entanto, com a taxa de mensagens esperada, a atualização da página para cada mensagem pode bloquear a janela do navegador. As alocações de memória frequentes resultantes da criação dinâmica de elementos HTML também estendem a duração da coleta de lixo, prejudicando a experiência do usuário. Ou seja: não é recomendável chamar document.createElement() para cada uma das aproximadamente 2.000 mensagens que chegam a cada segundo.

A abordagem usada por este tutorial para gerenciar esse stream denso de mensagens é a seguinte:

  • Calcular e atualizar continuamente um conjunto de métricas de stream em tempo real, exibindo a maioria das informações sobre as mensagens observadas como valores agregados.
  • Usar um painel baseado no navegador para visualizar uma pequena amostra de mensagens individuais em uma programação predefinida, mostrando apenas eventos de entrega e retirada em tempo real.

A figura a seguir mostra o painel criado neste tutorial.

Painel criado na página da Web pelo código neste tutorial

A figura mostra uma latência de última mensagem de 24 milissegundos a uma taxa de quase 2.100 mensagens por segundo. Se os caminhos de código mais importantes para o processamento de cada mensagem individual não forem concluídos a tempo, o número de mensagens observadas por segundo diminuirá conforme a latência da última mensagem aumentar. A amostragem de viagem é feita usando a API JavaScript setInterval definida para rotacionar a cada três segundos, o que impede que o front-end crie um enorme número de elementos DOM ao longo da vida útil. De qualquer forma, a maioria desses problemas é praticamente irreversível com taxas superiores a 10 por segundo.

O painel começa a processar eventos no meio do stream. Portanto, as viagens que já estão em andamento são reconhecidas como novas pelo painel, a menos que tenham sido vistas antes. O código usa uma matriz associativa para armazenar cada viagem observada, indexada pelo valor ride_id, e remove a referência a uma viagem específica quando o passageiro chegou ao destino. As viagens no estado "a caminho" ou "embarque" adicionam uma referência a essa matriz, a menos que, no caso de "embarque", a viagem tenha sido observada anteriormente.

Instalar e configurar o servidor WebSocket

Para começar, crie uma instância do Compute Engine que será usada como o servidor WebSocket. Depois de criar a instância, instale as ferramentas necessárias mais tarde.

  1. No Cloud Shell, defina a zona padrão do Compute Engine. O exemplo a seguir mostra us-central1-a, mas use a zona que quiser.

    gcloud config set compute/zone us-central1-a
    
  2. Crie uma instância do Compute Engine chamada websocket-server na zona padrão:

    gcloud compute instances create websocket-server --tags wss
    
  3. Adicione uma regra de firewall que permita o tráfego TCP na porta 8000 para qualquer instância marcada como wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Se você estiver usando um projeto existente, verifique se a porta TCP 22 está aberta para permitir a conectividade SSH com a instância.

    Por padrão, a regra de firewall default-allow-ssh está ativada na rede padrão. No entanto, se você ou seu administrador tiver removido a regra padrão em um projeto existente, a porta TCP 22 poderá não estar aberta. Se você tiver criado um novo projeto para este tutorial, a regra será ativada por padrão e você não precisará fazer nada.

    Adicione uma regra de firewall que permita o tráfego TCP na porta 22 para qualquer instância marcada como wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Conectar-se à instância usando SSH

    gcloud compute ssh websocket-server
    
  6. No comando do terminal da instância, mude as contas para root para instalar o software:

    sudo -s
    
  7. Instale as ferramentas git e unzip:

    apt-get install -y unzip git
    
  8. Instale o binário websocketd na instância:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Instalar o Node.js e o código do tutorial

  1. Em um terminal na instância, instale o Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Faça o download do repositório de origem do tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Altere as permissões em pulltop para permitir a execução:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instale as dependências pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Testar se o pulltop pode ler mensagens

  1. Na instância, execute pulltop no tópico público:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Se o pulltop estiver funcionando, você verá um stream de resultados como este:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Pressione Ctrl+C para interromper o stream.

Estabelecer stream de mensagens para websocketd

Agora que você estabeleceu que pulltop consegue ler o tópico do Pub/Sub, inicie o processo websocketd para começar a enviar mensagens ao navegador.

Capturar mensagens de tópico em um arquivo local

Neste tutorial, você captura o stream de mensagens recebido de pulltop e o grava em um arquivo local. Capturar o tráfego de mensagens para um arquivo local adiciona um requisito de armazenamento, mas também separa a operação do processo websocketd das mensagens de tópico do Pub/Sub do stream. Capturar as informações localmente permite cenários em que é possível interromper temporariamente o stream do Pub/Sub (talvez para ajustar os parâmetros de controle de stream), mas não forçar uma redefinição de clientes WebSocket conectados. Quando o stream de mensagens é restabelecido, o websocketd retoma automaticamente o streaming de mensagens para os clientes.

  1. Na instância, execute pulltop no tópico público e redirecione a saída da mensagem para o arquivo taxi.json local. O comando nohup instrui o SO a manter o processo pulltop em execução se você sair ou fechar o terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifique se as mensagens JSON estão sendo gravadas no arquivo:

    tail /var/tmp/taxi.json
    

    Se as mensagens estiverem sendo gravadas no arquivo taxi.json, a saída será semelhante a esta:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Altere para a pasta da Web do seu aplicativo:

    cd ../web
    
  4. Inicie websocketd para começar a transmitir o conteúdo do arquivo local usando WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Isso executa o comando websocketd em segundo plano. A ferramenta websocketd consome a saída do comando tail e transmite cada elemento como uma mensagem WebSocket.

  5. Verifique o conteúdo de nohup.out para confirmar se o servidor foi iniciado corretamente:

    tail nohup.out
    

    Se tudo estiver funcionando corretamente, a saída será semelhante a esta:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Como visualizar mensagens

As mensagens de viagem individuais publicadas no tópico do Pub/Sub têm uma estrutura como esta:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Com base nesses valores, você calcula várias métricas para o cabeçalho do painel. Os cálculos são executados uma vez por cada evento de viagem de entrada. Os valores incluem o seguinte:

  • Latência da última mensagem. O número de segundos entre o carimbo de data/hora do último evento de viagem e o horário atual (derivado do relógio no sistema que hospeda o navegador da Web).
  • Viagens ativas. O número de viagens em andamento. Esse número pode crescer rapidamente e diminui quando é observado um valor ride_status de dropoff.
  • Taxa de mensagens. O número médio de eventos de viagem processados por segundo.
  • Valor total medido. A soma dos metros de todas as viagens ativas. Esse número diminui à medida que as viagens são finalizadas.
  • Número total de passageiros. O número de passageiros em todas as viagens. Esse número diminui à medida que as viagens são concluídas.
  • Número médio de passageiros por viagem. O número total de viagens, dividido pelo número total de passageiros.
  • Valor medido médio por passageiro. O valor total medido dividido pelo número total de passageiros.

Além das métricas e amostras de viagens individuais, quando um passageiro é pego ou entregue, o painel mostra uma notificação de alerta acima da grade de amostras de viagem.

  1. Consiga o endereço IP externo da instância atual:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copie o endereço IP.

  3. Na máquina local, abra um novo navegador da Web e insira o URL:

    http://$ip-address:8000.

    Você verá uma página que mostra o painel deste tutorial:

    Painel criado por código neste tutorial, com mensagem de boas-vindas e antes da exibição dos dados.

  4. Clique no ícone de táxi na parte superior para abrir uma conexão com o stream e começar a processar as mensagens.

    As viagens individuais são visualizadas com uma amostra de nove viagens ativas sendo renderizadas a cada três segundos:

    Painel mostrando viagens ativas.

    Clique no ícone de táxi a qualquer momento para iniciar ou interromper o fluxo do WebSocket. Se a conexão do WebSocket for cortada, o ícone ficará vermelho e as atualizações das métricas e viagens individuais serão interrompidas. Para se reconectar, clique no ícone de táxi novamente.

Desempenho

A captura de tela a seguir mostra o monitor de desempenho das Ferramentas para Desenvolvedores do Chrome enquanto a guia do navegador processa cerca de 2.100 mensagens por segundo.

Painel do monitor de desempenho do navegador que mostra o uso da CPU, o tamanho do heap, os nós DOM e os recálculos de estilo por segundo. Os valores são relativamente planos.

Com o envio de mensagens a uma latência de aproximadamente 30 ms, a utilização da CPU segue uma média de cerca de 80%. A utilização da memória é mostrada como sendo de no mínimo 29 MB, com 57 MB alocados no total, aumentando e diminuindo livremente.

Limpar

Remover regras de firewall

Se você usou um projeto existente para este tutorial, pode remover as regras de firewall criadas. É recomendável minimizar as portas abertas.

  1. Exclua a regra de firewall que você criou para permitir o TCP na porta 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Se você também criou uma regra de firewall para permitir a conectividade SSH, exclua essa regra para permitir o TCP na porta 22:

    gcloud compute firewall-rules delete wss-ssh
    

Exclua o projeto

Se não quiser usar este projeto novamente, exclua-o.

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

A seguir