Como processar registros em grande escala usando o Cloud Dataflow

O Google Cloud fornece a infraestrutura escalonável necessária para manipular operações grandes e variadas de análise de registros. Neste tutorial, mostramos como usar o Google Cloud para criar pipelines analíticos que processam entradas de registro de várias origens. Combine dados de registro para extrair informações significativas e manter insights derivados de dados, que podem ser usados para análise, revisão e geração de relatórios.

Visão geral

À medida que aumenta a complexidade do aplicativo, fica mais complicado conseguir insights sobre os dados capturados nos registros. Os registros provêm de um número cada vez maior de origens, portanto, pode ser difícil verificá-los e consultá-los em busca de informações úteis. Para criar, operar e manter sua própria infraestrutura visando analisar dados de registro em grande escala, talvez seja preciso ter ampla experiência com armazenamento e sistemas distribuídos. Esse tipo de infraestrutura dedicada geralmente representa uma despesa pontual de capital, resultando em capacidade fixa, o que dificulta o escalonamento além do investimento inicial. Essas limitações podem afetar os negócios porque atrasam a geração de insights relevantes e acionáveis a partir dos dados.

Nesta solução, veja como superar essas limitações usando os produtos do Google Cloud, conforme ilustrado no diagrama a seguir.

A solução usa vários componentes do GCP

Nesta solução, um exemplo de conjunto de microsserviços é executado no Google Kubernetes Engine (GKE) para implementar um site. O Cloud Logging coleta registros desses serviços e os salva em buckets do Cloud Storage. Em seguida, o Dataflow processa os registros extraindo metadados e calculando as agregações básicas. O pipeline do Dataflow foi projetado para processar os elementos de registro diariamente, a fim de gerar métricas agregadas de tempos de resposta do servidor, com base nos registros de cada dia. Por fim, a saída do Dataflow é carregada nas tabelas do BigQuery, onde pode ser analisada para fornecer inteligência de negócios. Nesta solução, também explicamos de como mudar o pipeline para que seja executado no modo de streaming, em prol de um processamento de registros assíncrono e de baixa latência.

Neste tutorial, fornecemos um exemplo de pipeline do Dataflow, um exemplo de aplicativo da Web, informações de etapas e passos para executar o exemplo.

Custos

plain

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais informações, consulte Como fazer a limpeza.

Antes de começar

BigQuery, Cloud Storage, Pub/Sub, Dataflow, GKE and Logging storage-component.googleapis.com,pubsub.googleapis.com,bigquery.googleapis.com,dataflow.googleapis.com,container.googleapis.com,stackdriver.googleapis.com
  1. Faça login na sua Conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. No Console do GCP, na página do seletor de projetos, selecione ou crie um projeto do GCP.

    Acesse a página do seletor de projetos

  3. Verifique se o faturamento foi ativado no projeto do Google Cloud Platform. Saiba como confirmar que o faturamento está ativado para seu projeto.

  4. Crie um espaço de trabalho do Cloud Logging. Para mais informações sobre espaços de trabalho, consulte Como gerenciar espaços de trabalho.

    Acessar o Logging

Como configurar o ambiente

Neste tutorial, você usa o Cloud Shell para inserir comandos. O Cloud Shell, além de fornecer acesso à linha de comando no Console do Cloud, inclui o SDK do Cloud e outras ferramentas que é preciso desenvolver no Google Cloud. O Cloud Shell aparece como uma janela na parte inferior do Console do Cloud. A inicialização leva vários minutos, mas a janela aparece imediatamente.

Para usar o Cloud Shell a fim de configurar o ambiente e clonar o repositório git usado neste tutorial, faça o seguinte:

  1. No Console do Cloud, abra o Cloud Shell.

    ABRIR o Cloud Shell

  2. Verifique se está trabalhando no projeto recém-criado. Substitua [YOUR_PROJECT_ID] pelo projeto do Google Cloud recém-criado.

    gcloud config set project [YOUR_PROJECT_ID]
        
  3. Defina a zona do Compute padrão. Para os fins deste tutorial, use us-east1. Se você estiver implantando em um ambiente de produção, faça isso em uma região de sua escolha.

    export REGION=us-east1
        gcloud config set compute/region $REGION
        

Como clonar o repositório de exemplo

  • Clone o repositório que contém os scripts e a lógica do aplicativo que você usará neste tutorial.

    git clone https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.git
        cd processing-logs-using-dataflow/services
        

Configurar variáveis de ambiente

# name your bucket
    export PROJECT_ID=[YOUR_PROJECT_ID]
    
# name your GKE cluster
    export CLUSTER_NAME=cluster-processing-logs-using-dataflow

    # name the bucket for this tutorial
    export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

    # name the logging sink for this tutorial
    export SINK_NAME=sink-processing-logs-using-dataflow

    # name the logging sink for this tutorial
    export DATASET_NAME=processing_logs_using_dataflow
    

Implantar o aplicativo de exemplo em um novo cluster do Google Kubernetes Engine

# create the cluster and deploy sample services
    ./cluster.sh $PROJECT_ID $CLUSTER_NAME up
    

Sobre a implantação do aplicativo de exemplo

O exemplo de implantação modela um aplicativo de compras. Nele, os usuários podem visitar a página inicial de um site de varejo, procurar produtos e depois tentar localizá-los em lojas físicas próximas. O aplicativo consiste em três microsserviços: HomeService, BrowseService e LocateService. Cada um deles está disponível a partir de um endpoint da API em um namespace compartilhado. Os usuários acessam os serviços acrescentando /home, /browse e /locate ao URL de base.

O aplicativo está configurado para registrar solicitações HTTP de entrada para stdout.

Como usar o Google Kubernetes Engine com o Cloud Logging

Neste exemplo, os microsserviços são executados em um cluster do Kubernetes Engine, que é um grupo de instâncias (ou nós) do Compute Engine executadas no Kubernetes (em inglês). Por padrão, o GKE configura cada nó para fornecer uma série de serviços, incluindo monitoramento, verificação de integridade e geração centralizada de registros. Nesta solução, foi aproveitada essa compatibilidade integrada com o Logging para enviar registros de cada microsserviço para o Cloud Storage. Uma alternativa para aplicativos que registram informações em arquivos não cobertos por esta solução é configurar a geração de registros no nível de cluster com o Kubernetes (em inglês).

Cada microsserviço é executado em um pod individual no cluster. Cada pod é executado em um nó, sendo exposto como um único endpoint HTTP ao usar os serviços do GKE.

Os microsserviços são executados em nós individuais.

Cada nó no cluster executa um agente do Cloud Logging que captura as mensagens de registro. Depois que os registros são disponibilizados no Logging, um script os exporta automaticamente para um bucket do Cloud Storage, aproveitando a compatibilidade com o Logging presente no SDK do Cloud.

Também é possível configurar os registros para serem exportados para o Cloud Storage usando o visualizador de registros. Nesta solução, foi usado o SDK do Cloud por ser necessário ao exportar vários registros.

Quando você usa o Cloud Storage como destino de exportação de registros, as entradas de registro do tipo LogEntry são salvas em lotes por hora em arquivos JSON individuais. Essas entradas estruturadas do Logging incluem metadados extras que especificam quando cada mensagem de registro foi criada, qual recurso ou instância a gerou, qual o nível de gravidade e assim por diante. No seguinte exemplo de uma entrada no Logging no elemento structPayload.log, é possível ver a mensagem de registro original gerada pelo microsserviço:

     {
        "insertId": "ugjuig3j77zdi",
        "labels": {
            "compute.googleapis.com/resource_name": "fluentd-gcp-v3.2.0-9q4tr",
            "container.googleapis.com/namespace_name": "default",
            "container.googleapis.com/pod_name": "browse-service-rm7v9",
            "container.googleapis.com/stream": "stdout"
        },
        "logName": "projects/processing-logs-at-scale/logs/browse-service",
        "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
        "resource": {
            "labels": {
                "cluster_name": "cluster-processing-logs-using-dataflow",
                "container_name": "browse-service",
                "instance_id": "640697565266753757",
                "namespace_id": "default",
                "pod_id": "browse-service-rm7v9",
                "project_id": "processing-logs-at-scale",
                "zone": "us-east1-d"
            },
            "type": "container"
        },
        "severity": "INFO",
        "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs |      10.142.0.6 | GET      /browse/product/1\n",
        "timestamp": "2019-03-09T00:33:23.743466177Z"
     }
    

Configurar a geração de registros

Depois que o cluster estiver em execução e os serviços forem implantados, será possível configurar a geração de registros para o aplicativo.

Primeiro, consiga as credenciais do cluster, já que kubectl é usado para receber os nomes de serviços para a configuração dos coletores de exportação do Cloud Logging.

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION
    

No repositório do código, services/logging.sh configura os componentes necessários para o modo de lote ou de streaming. O script aceita estes parâmetros:

    logging.sh [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]
    

Para fins deste tutorial, inicie a geração de registros em lote:

./logging.sh $PROJECT_ID $BUCKET_NAME batch up
    

As etapas abaixo dão exemplos de comandos executados no modo de lote:

  1. Crie um bucket do Cloud Storage.

    gsutil -q mb gs://[BUCKET_NAME]

  2. Permita o acesso do Cloud Logging ao bucket.

    gsutil -q acl ch -g cloud-logs@google.com:O gs://[BUCKET_NAME]

  3. Para cada microsserviço, configure as exportações do Cloud Logging usando um coletor.

    gcloud logging sinks create [SINK_NAME] \ storage.googleapis.com/[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

Atualizar permissões de destino

As permissões de destino, que, neste caso, é o bucket do Cloud Storage, não são modificadas quando você cria um coletor. É preciso alterar as configurações de permissão do seu bucket do Cloud Storage para conceder permissão de gravação ao coletor.

Para atualizar as permissões no seu bucket do Cloud Storage:

  1. Identifique a Identidade do gravador do seu coletor:

    1. Acesse a página Visualizador de registros

      Acessar a página "Visualizador de registros"

    2. Selecione Exportações no menu à esquerda para ver um resumo dos seus coletores, incluindo a Identidade do gravador.

    3. IMPORTANTE: para cada um dos três coletores, há um e-mail de conta de serviço individual que precisa receber permissões no bucket do Cloud Storage.

  2. No Console do Cloud, clique em Armazenamento > Navegador:

    Acessar o navegador

  3. Para abrir a visualização detalhada, clique no nome do seu bucket.

  4. Selecione Permissões e clique em Adicionar membros.

  5. Defina o Papel como Storage Object Creator e insira a identidade do gravador do coletor.

Consulte Permissões de destino para mais informações.

É possível verificar os caminhos dos objetos de registro com o comando a seguir:

gsutil ls gs://$BUCKET_NAME | grep service
    

Quando a saída contiver todas as três entradas, será possível prosseguir com as etapas para executar o pipeline de dados:

     gs://$BUCKET_NAME/browse-service/
     gs://$BUCKET_NAME/home-service/
     gs://$BUCKET_NAME/locate-service/
    

Criar o conjunto de dados do BigQuery

bq mk $DATASET_NAME
    

Gerar uma carga nos serviços de aplicativos

Instalar os utilitários do servidor HTTP Apache

Use a ferramenta de comparativo de mercado do servidor HTTP Apache (ab) (em inglês) para gerar a carga nos serviços.

sudo apt-get update

    sudo apt-get install -y apache2-utils
    

O script de shell load.sh gera carga nos microsserviços ao solicitar respostas de HomeService, BrowseService e LocateService.

Um único conjunto de carga é composto por uma solicitação para o serviço inicial e 20 solicitações para cada um dos serviços de localização e procura.

A opção abaixo gerará mil conjuntos de carga, com a simultaneidade configurada para três solicitações.

cd ../services
    ./load.sh 1000 3
    

Aguarde alguns minutos para que seja criada uma quantidade suficiente de registros.

Iniciar o pipeline do Dataflow

Depois de permitir que uma quantidade suficiente de tráfego atinja os serviços, inicie o pipeline do Dataflow.

Para este tutorial, o pipeline do Dataflow é executado no modo em lote. O script de shell pipeline.sh inicia o pipeline manualmente.

cd ../dataflow
    ./pipeline.sh $PROJECT_ID $DATASET_NAME $BUCKET_NAME run
    

Como entender o pipeline do Dataflow

O Dataflow pode ser usado para muitos tipos de tarefas de processamento de dados. O SDK do Cloud Dataflow fornece um modelo de dados unificado que representa um conjunto de dados de qualquer tamanho, incluindo um ilimitado ou infinito de uma origem de dados de atualização contínua. Ele é ideal para trabalhar com os dados de registro nesta solução. O serviço gerenciado do Dataflow pode executar jobs em lote e de streaming. Isso significa que é possível usar uma única codebase para o processamento de dados assíncrono ou síncrono, em tempo real e baseado em eventos.

O SDK do Dataflow fornece representações de dados simples por meio de uma classe de coleção especializada denominada PCollection. O SDK realiza transformações de dados personalizadas por meio da classe PTransform. No Dataflow, as transformações representam a lógica de processamento de um pipeline. As transformações podem ser usadas para diversas operações de processamento, como mesclagem de dados, cálculo matemático de valores, filtragem da saída de dados ou conversão de dados de um formato em outro. Para mais informações sobre pipelines, PCollections, transformações e origens e coletores de E/S, consulte Modelo de programação do Cloud Dataflow.

O diagrama a seguir mostra as operações do pipeline para os dados de registro armazenados no Cloud Storage:

As etapas de operações do pipeline.

O diagrama pode parecer complexo, mas o Dataflow facilita a criação e o uso do pipeline. Nas seções a seguir, descrevemos as operações específicas em cada estágio do pipeline.

Como receber os dados

O pipeline começa consumindo a entrada dos buckets do Cloud Storage que contém os registros dos três microsserviços. Cada conjunto de registros se torna uma PCollection de elementos String, em que cada elemento corresponde a um único objeto LogEntry. No snippet a seguir, homeLogs, browseLogs e locateLogs são do tipo PCollection<String>:

homeLogs = p.apply("homeLogsTextRead", TextIO.read().from(options.getHomeLogSource()));
    browseLogs = p.apply("browseLogsTextRead", TextIO.read().from(options.getBrowseLogSource()));
    locateLogs = p.apply("locateLogsTextRead", TextIO.read().from(options.getLocateLogSource()));

Para lidar com os desafios de um conjunto de dados de atualização contínua, o SDK do Dataflow usa uma técnica chamada gestão de janelas. Ela subdivide de maneira lógica os dados em uma PCollection, de acordo com os carimbos de data/hora de cada elemento. Como o tipo de origem é TextIO neste caso, todos os objetos são inicialmente lidos em uma única janela global, o que é o comportamento padrão.

Como coletar os dados em objetos

Na próxima etapa, as PCollections de microsserviços individuais são combinadas em uma única PCollection por meio da operação Flatten.

PCollection<String> allLogs = PCollectionList
      .of(homeLogs)
      .and(browseLogs)
      .and(locateLogs)
      .apply(Flatten.<String>pCollections());

Essa operação é útil porque cada PCollection de origem contém o mesmo tipo de dados e usa a mesma estratégia global de gestão de janelas. As origens e a estrutura de cada registro são iguais nesta solução, mas essa abordagem também pode ser adotada quando a origem e a estrutura são diferentes.

Com uma única PCollection criada, agora é possível processar cada elemento String usando uma transformação personalizada que executa várias etapas na entrada de registro. Veja essas etapas ilustradas no diagrama a seguir:

Uma transformação processa mensagens de string para criar mensagens de registro.

  • Desserializar a string JSON em um objeto Java LogEntry do Cloud Logging.
  • Extrair o carimbo de data/hora dos metadados LogEntry.
  • Extrair os seguintes campos individuais da mensagem de registro usando expressões regulares: timestamp, responseTime, httpStatusCode, httpMethod, endereço IP source e endpoint destination. Use esses campos para criar um objeto personalizado LogMessage com carimbo de data/hora.
  • Produzir os objetos LogMessage em um novo PCollection.

O código a seguir executa os passos:

PCollection<LogMessage> allLogMessages = allLogs
      .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

Como agregar os dados por dias

Lembre-se de que o objetivo é processar os elementos diariamente para gerar métricas agregadas com base nos registros de cada dia. Para ser alcançada, essa agregação requer uma função de gerenciamento de janelas que subdivide os dados por dia. Isso é possível porque cada LogMessage na PCollection tem um carimbo de data/hora. Após o Dataflow particionar a PCollection em limites diários, as operações compatíveis com as PCollections com janelas respeitarão o esquema de gerenciamento de janelas.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
      .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

Com uma PCollection única e com janelas, agora é possível calcular métricas diárias agregadas em todas as três origens de registros de vários dias executando um único job do Cloud Dataflow.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
      .apply(Max.<String>doublesPerKey());
     // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

    PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
      .apply(Mean.<String,Double>perKey());

Primeiro, uma transformação usa objetos LogMessage como entrada e, em seguida, gera uma saída de PCollection de pares de chave-valor que mapeiam os endpoints de destino como chaves para os valores de tempo de resposta, conforme ilustrado no diagrama a seguir.

Cálculo de métricas diárias agregadas.

Usando essa PCollection, é possível calcular duas métricas agregadas: tempo máximo de resposta por destino e tempo médio de resposta por destino. Como a PCollection ainda é particionada por dia, o resultado de cada cálculo representará os dados de registro de um único dia. Isso significa que você terá duas PCollections finais: uma contendo o tempo máximo de resposta por destino por dia e outra contendo o tempo médio de resposta por destino por dia.

Como carregar os dados no BigQuery

O passo final no pipeline produz as PCollections resultantes no BigQuery, para análise posterior e armazenamento de dados.

Primeiro, o pipeline transforma a PCollection que contém objetos LogMessage de todas as origens de registros em uma PCollection de objetos TableRow do BigQuery. Essa etapa é necessária para aproveitar a compatibilidade integrada do Cloud Dataflow para usar o BigQuery como coletor de um pipeline.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
      .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

As tabelas do BigQuery precisam de esquemas definidos. Nesta solução, os esquemas são definidos em LogAnalyticsPipelineOptions.java usando uma anotação de valor padrão. Por exemplo, o esquema da tabela de tempo máximo de resposta é definido da seguinte maneira:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

Uma operação nas PCollections que contêm os valores de tempo de resposta agregados os converte em PCollections de objetos TableRow, aplicando os esquemas apropriados e criando as tabelas, se estiverem ausentes.

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()
      .to(options.getAllLogsTableName())
      .withSchema(allLogsTableSchema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Essa solução sempre anexa novos dados aos dados existentes. Essa escolha é apropriada porque este pipeline é executado periodicamente para analisar novos dados de registro. No entanto, se fizer mais sentido em um cenário diferente, também será possível truncar os dados da tabela atual ou apenas gravar na tabela se ela estiver vazia.

Como consultar os dados do BigQuery

É possível usar o Console do BigQuery para executar consultas nos dados de saída e se conectar a ferramentas de inteligência de negócios de terceiros, como o Tableau e o QlikView (links em inglês), para fazer mais análises.

  1. No Console do Cloud, abra o BigQuery.

    ABRIR o BigQuery

  2. Clique no projeto processing-logs-at-scale e, em seguida, clique no conjunto de dados processing_logs_using_dataflow.

  3. Selecione all_logs_table e, no painel de dados, selecione Visualizar para ver um exemplo dos dados na tabela todos os registros.

  4. No Editor de consultas, insira a seguinte consulta:

    SELECT *
        FROM `processing_logs_using_dataflow.max_response_time_table`
        ORDER BY aggResponseTime DESC
        LIMIT 100;
        
  5. Para executar a consulta, clique em Executar.

    O console do BigQuery executa uma consulta nos dados do registro

Como usar um pipeline de streaming

No exemplo, é possível executar o pipeline em modo de lote ou streaming. Bastam apenas algumas etapas para mudar o pipeline de lote para streaming. Primeiro, a configuração do Cloud Logging exporta informações da geração de registros para o Pub/Sub, não para o Cloud Storage. A próxima etapa é mudar as origens de entrada no pipeline do Dataflow do Cloud Storage para assinaturas em tópicos do Pub/Sub. É necessário ter uma assinatura para cada origem de entrada.

O pipeline Pub/Sub usa assinaturas.

Você vê os comandos do SDK em uso em logging.sh.

As PCollections criadas a partir dos dados de entrada do Pub/Sub usam uma janela global ilimitada. No entanto, as entradas individuais já incluem carimbos de data/hora. Isso significa que não é necessário extrair dados de carimbo de data/hora do objeto LogEntry do Cloud Logging, basta extrair os carimbos de data/hora de registros para criar os objetos LogMessage personalizados.

Quando se usa o pipeline do Pub/Sub, é possível extrair carimbos de data/hora dos registros

O restante do pipeline permanece como está, incluindo operações posteriores de achatamento, transformação, agregação e saída.

Como monitorar o pipeline

Ao executar o job do Dataflow, use o Console do Google Cloud para monitorar o progresso e visualizar informações sobre cada estágio no pipeline.

Na imagem a seguir, veja o Console do Cloud durante a execução de um pipeline de exemplo:

O Console do Cloud mostra um job do Dataflow em execução.

Como fazer a limpeza

Excluir o projeto

no_tutorial_note
  1. No Console do GCP, acesse a página Gerenciar recursos.

    Acessar a página Gerenciar recursos

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir todos os componentes

Confirme se determinadas variáveis de ambiente ainda estão definidas com os valores usados durante a configuração.

  1. Exclua o conjunto de dados do BigQuery:

    bq rm $DATASET_NAME
        
  2. Desative as exportações do Cloud Logging. Esta etapa exclui as exportações e o bucket do Cloud Storage especificado:

    cd ../services
        ./logging.sh $PROJECT_ID $BUCKET_NAME batch down
        
  3. Exclua o cluster do Compute Engine usado para executar os exemplos de aplicativos da Web:

    /cluster.sh $PROJECT_ID $CLUSTER_NAME down
        

Como estender a solução

O pipeline e o conjunto de operações descritos nesta solução podem ser estendidos de diversas maneiras. As extensões mais óbvias seriam a realização agregações extras nos dados LogMessage. Por exemplo, se a sessão ou as informações anônimas do usuário fossem incluídas na saída do registro, seria possível criar agregações em torno da atividade do usuário. Também é possível usar a transformação ApproximateQuantiles (em inglês) para gerar uma distribuição de tempos de resposta.

A seguir