Como processar registros em escala usando o Cloud Dataflow

O Google Cloud Platform fornece a infraestrutura escalonável que você precisa para lidar com operações grandes e diversas de análise de registros. Nesta solução, você aprenderá a usar o Cloud Platform para criar pipelines analíticos que processam entradas de registro de várias origens. Você combinará os dados do registro de maneiras que o ajudem a extrair informações significativas e manter informações obtidas dos dados, que podem ser usados para análise, revisão e relatórios.

Visão geral

À medida que seu aplicativo fica mais complexo, conseguir informações sobre os dados capturados nos registros torna-se mais desafiador. Os registros provêm de um número cada vez maior de origens, então podem ser difíceis de identificar e consultar para encontrar informações úteis. Criar, operar e manter sua própria infraestrutura para analisar os dados do registro em escala pode exigir uma ampla experiência na execução de sistemas e armazenamento distribuídos. Esse tipo de infraestrutura dedicada geralmente representa uma despesa única de capital, resultando em capacidade fixa, o que dificulta a escala além do investimento inicial. Essas limitações podem afetar os negócios porque levam à desaceleração na geração de insights relevantes e viáveis a partir de dados.

Esta solução permite ultrapassar essas limitações usando o Cloud Platform. Nesta solução, um conjunto de microsserviços de amostra é executado no Kubernetes Engine para implementar um site. O Stackdriver Logging coleta registros desses serviços e os salva em intervalos do Google Cloud Storage. O Google Cloud Dataflow processa os registros extraindo metadados e calculando agregações básicas. O pipeline do Cloud Dataflow foi projetado para processar os elementos do registro diariamente a fim de gerar métricas agregadas para os tempos de resposta do servidor, com base nos registros de cada dia. Por fim, o resultado do Cloud Dataflow é carregado nas tabelas do Google BigQuery, nas quais pode ser analisado para fornecer inteligência de negócios. Esta solução também explica como você pode mudar o pipeline para ser executado no modo de streaming, para processamento de registro assíncrono de baixa latência.

A solução usa vários componentes do Cloud Platform

O tutorial incluído fornece um pipeline de exemplo do Cloud Dataflow, um aplicativo de amostra da Web, informações de configuração e etapas para executar a amostra.

Sobre o aplicativo

A implantação de amostra exibe um aplicativo de compras. Neste exemplo, os usuários podem visitar a página inicial de um site de varejo, procurar produtos e depois tentar localizá-los em lojas físicas tradicionais próximas. O aplicativo consiste em três microsserviços: HomeService, BrowseService e LocateService. Cada um deles está disponível a partir de um ponto de extremidade da API em um namespace compartilhado. Os usuários acessam os serviços adicionando /home, /browse e /locate ao URL de base.

O aplicativo é configurado para registrar solicitações HTTP recebidas para stdout.

Como usar o Kubernetes Engine com o Stackdriver Logging

Neste exemplo, os microsserviços são executados em um cluster do Kubernetes Engine, que é um grupo de instâncias ou nós do Google Compute Engine para a execução do Kubernetes. Por padrão, o Kubernetes Engine configura cada node para fornecer uma série de serviços, incluindo monitoramento, verificação de integridade e registro centralizado. Essa solução usa o suporte interno do Stackdriver Logging para enviar registros de cada microsserviço para o Google Cloud Storage. Como uma alternativa para aplicativos que registram informações em arquivos, não cobertos por esta solução, configure a geração de registros no nível de cluster com Kubernetes.

Cada microsserviço é executado em um pod individual no cluster. Cada pod é executado em um node e é exposto como um único ponto de extremidade HTTP usando os serviços do Kubernetes Engine.

Os microsserviços são executados em nodes individuais

Cada node do cluster executa um agente do Stackdriver Logging que captura as mensagens de registro. Depois que os registros são disponibilizados no Stackdriver Logging, eles são exportados automaticamente por um script para um intervalo do Cloud Storage usando o suporte do Stackdriver Logging disponível do SDK do Cloud. Em logging.sh, comandos semelhantes a estes são executados pela amostra:

# Create a Cloud Storage Bucket
gsutil -q mb gs://BUCKET_NAME

# Allow Stackdriver Logging access to the bucket
gsutil -q acl ch -g cloud-logs@google.com:O gs://BUCKET_NAME

# For each microservice, set up Stackdriver Logging exports
gcloud beta logging sinks create SINK_NAME \
  storage.googleapis.com/BUCKET_NAME \
  --log=”kubernetes.home_service…” --project=PROJECT_ID

Observe que você também pode configurar os registros para serem exportados para o Google Cloud Storage usando o Visualizador de registros. Essa solução usa o SDK porque é necessário ao exportar vários registros.

Quando você usa o Cloud Storage como destino da exportação de registros, as entradas de registro do tipo LogEntry são salvas em lotes por hora, em arquivos JSON individuais. Essas entradas estruturadas do Cloud Logging incluem metadados adicionais que especificam quando cada mensagem de registro foi criada, qual recurso ou instância a gerou, qual o nível de gravidade e assim por diante. No exemplo de uma entrada do Stackdriver Logging a seguir, o elemento structPayload.log exibe a mensagem do registro original que o microsserviço gerou:

{
  "metadata": {
    "projectId": "...",
    "serviceName": "compute.googleapis.com",
    "zone": "us-central1-f",
    "labels": {
      "compute.googleapis.com/resource_id": "4154944251817867710",
      "compute.googleapis.com/resource_type": "instance"
    },
    "timestamp": "2015-09-22T20:01:13Z"
  },
  "insertId": "2015-09-22|13:01:17.636360-07|10.106.196.103|1124257302",
  "log": "kubernetes.browse-service-iaus6_default_sample-browse-service",
  "structPayload": {
    "stream": "stdout",
    "log": "2015/09/22 - 20:01:13 | 404 | 176ns | 10.160.0.1:34790 | GET /browse/46"
  }
}

Criar o pipeline do Cloud Dataflow

O Cloud Dataflow é um sistema simples e poderoso que pode ser usado para vários tipos de tarefas de processamento de dados. O SDK do Cloud Dataflow fornece um modelo de dados unificado que representa um conjunto de dados de qualquer tamanho, incluindo um conjunto de dados ilimitado ou infinito de uma origem de dados de atualização contínua. Ele é bem adequado para o trabalho com os dados de registro nesta solução. O serviço gerenciado do Cloud Dataflow pode executar tarefas em lote e de streaming. Isso significa que você pode usar uma única base de código para o processamento de dados assíncrono ou síncrono, em tempo real e baseado em eventos.

O SDK do Dataflow fornece representações de dados simples por meio de uma classe de coleção especializada denominada PCollection O SDK fornece transformações de dados internas e personalizadas por meio da classe PTransform. No Cloud Dataflow, as transformações representam a lógica de processamento de um pipeline. As transformações podem ser usadas para diversas operações de processamento, tais como unir dados, calcular valores matematicamente, filtrar a saída de dados ou converter dados de um formato para outro. Para mais informações sobre pipelines, PCollections, transformações, e origens e coletores de E/S, consulte Modelo de programação de fluxo de dados.

O diagrama a seguir mostra as operações do pipeline para os dados de registro armazenados no Google Cloud Storage:

O pipeline do Cloud Dataflow tem várias etapas

O diagrama pode parecer complexo, mas o Cloud Dataflow facilita a criação e o uso do pipeline. As seções a seguir descrevem as operações específicas em cada etapa do pipeline.

Receber os dados

O pipeline começa consumindo a entrada dos intervalos do Google Cloud Storage que contêm os registros dos três microsserviços. Cada coleção de registros se torna uma PCollection de elementos String, na qual cada elemento corresponde a um único objeto LogEntry. No snippet a seguir, homeLogs, browseLogs e locateLogs são do tipo PCollection<String>:

homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));

Para lidar com os desafios de um conjunto de dados de atualização contínua, o SDK do Cloud Dataflow usa uma técnica chamada janelamento (windowing). A gestão de janelas subdivide de maneira lógica os dados em uma PCollection de acordo com os carimbos de data/hora dos elementos individuais. Neste caso, como o tipo da origem é TextIO, todos os objetos são inicialmente lidos em uma janela global, que é o comportamento padrão.

Coletar os dados em objetos

A etapa seguinte combina as PCollections de microsserviços individuais em uma única PCollection usando a operação Flatten.

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

Essa operação é útil porque cada PCollection de origem contém o mesmo tipo de dados e usa a mesma estratégia de janelamento global. As origens e a estrutura de cada registro são iguais nesta solução, mas é possível estender essa abordagem para uma na qual a origem e a estrutura sejam diferentes.

Com uma única PCollection criada, agora você pode processar os elementos String individuais usando uma transformação personalizada que executa várias etapas na entrada de registro. Estas são as etapas:

Uma transformação processa mensagens de string para criar mensagens de registro

  • Desserialize a string JSON em um objeto Java LogEntry do Stackdriver Logging.
  • Extraia o carimbo de data/hora dos metadados LogEntry.
  • Extraia os seguintes campos individuais da mensagem de registro usando expressões regulares: timestamp, responseTime, httpStatusCode, httpMethod, endereço IP source e ponto de extremidade destination. Use esses campos para criar um objeto personalizado LogMessage com carimbo de data/hora.
  • Produza os objetos LogMessage em uma nova PCollection.

O código a seguir executa as etapas:

PCollection<LogMessage> allLogMessages = allLogs
  .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

Agregar os dados por dias

Lembre-se de que o objetivo é processar os elementos em uma base diária para gerar métricas agregadas com base em registros de cada dia. Para ser alcançada, essa agregação requer uma função de janelamento que subdivide os dados por dia. Isso é possível porque cada LogMessage no PCollection tem um carimbo de data/hora. Após o Cloud Dataflow particionar a PCollection pelos limites diários, as operações compatíveis com as PCollections com janelas respeitarão o esquema de janelamento.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply(Window.named("allLogMessageToDaily").<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

Com uma única PCollection com janelas, agora você pode calcular métricas diárias agregadas em todas as três origens de registros de vários dias executando um único job do Cloud Dataflow.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Combine.<String,Double,Double>perKey(new Max.MaxDoubleFn()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

Primeiro, uma transformação pega os objetos LogMessage como entrada e, em seguida, produz uma PCollection de pares de valores-chave que mapeiam os pontos de extremidade de destino como chaves para valores de tempo de resposta. Usando essa PCollection, você pode calcular duas métricas agregadas: tempo máximo de resposta por destino e tempo médio de resposta por destino. Como a PCollection ainda é particionada por dia, a saída de cada cálculo representará os dados de registro de um único dia. Isso significa que você terá duas PCollections finais: uma contendo o tempo máximo de resposta por destino por dia e outra contendo o tempo médio de resposta por destino por dia.

Calcular métricas diárias agregadas

Carregar os dados no BigQuery

A etapa final no pipeline produz as PCollections resultantes para o BigQuery, para análise subsequente e armazenamento de dados.

Primeiro, o pipeline transforma a PCollection que contém objetos LogMessage para todas as origens de registro em uma PCollection de objetos TableRow do BigQuery. Essa etapa é necessária para utilizar o suporte interno no Cloud Dataflow para usar o BigQuery como um coletor de um pipeline.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));

As tabelas do BigQuery requerem esquemas definidos. Nesta solução, os esquemas são definidos em LogAnalyticsPipelineOptions.java usando uma anotação de valor padrão. Por exemplo, o esquema da tabela de tempo máximo de resposta é definido da seguinte maneira:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

Uma operação na PCollections que contém os valores de tempo de resposta agregados os converte em PCollections de objetos TableRow, aplicando os esquemas apropriados e criando as tabelas, se estiverem faltando.

logsAsTableRows.apply(BigQueryIO.Write
  .named("allLogsToBigQuery")
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Essa solução sempre anexa novos dados aos dados existentes. Essa é uma escolha apropriada porque este pipeline é executado periodicamente para analisar novos dados de registro. No entanto, é possível truncar os dados de tabela existentes ou apenas gravar na tabela se estiver vazia, caso uma dessas opções tenha mais sentido em um cenário diferente.

Consultar os dados

Para fazer mais análises, no Console do BigQuery, é possível executar consultas nos dados de saída e se conectar a ferramentas de business intelligence de terceiros, como Tableau e QlikView.

O console do BigQuery executa uma consulta nos dados do registro

Usar um pipeline de streaming

O exemplo inclui suporte para executar o pipeline em modo de lote ou streaming. Bastam apenas algumas etapas para mudar o pipeline de lote para streaming. Primeiro, a configuração do Stackdriver Logging exporta informações da geração de registros para o Cloud Pub/Sub em vez de para o Google Cloud Storage. A próxima etapa é mudar as origens de entrada no pipeline do Cloud Dataflow do Google Cloud Storage para inscrições em tópicos do Cloud Pub/Sub. É necessário ter uma inscrição para cada origem de entrada.

O pipeline Cloud Pub/Sub usa assinaturas

Veja os comandos do SDK em uso no logging.sh.

As PCollections criadas a partir dos dados de entrada do Cloud Pub/Sub usam uma janela global ilimitada. No entanto, as entradas individuais já incluem carimbos de data/hora. Isso significa que não é necessário extrair os dados de carimbo de data/hora do objeto LogEntry do Stackdriver Logging. Basta extrair os carimbos de data/hora do registro para criar os objetos LogMessage personalizados.

Ao usar o pipeline Cloud Pub/Sub, você pode extrair carimbos de data/hora dos registros

O restante do pipeline permanece como está, incluindo operações subjacentes niveladas, de transformação, de agregação e de saída.

Executar o pipeline

As etapas para configurar, criar e implantar o pipeline do Cloud Dataflow, juntamente com as etapas necessárias para implantar os microsserviços e configurar as exportações do Stackdriver Logging, podem ser encontradas no tutorial. Quando você executa o job do Cloud Dataflow, pode usar o Console do Google Cloud Platform para monitorar o andamento e visualizar informações de cada etapa do pipeline.

A imagem a seguir mostra a interface do usuário do console durante a execução de um exemplo de pipeline:

O console do GCP mostra um job do Cloud Dataflow em execução

Estender a solução

O pipeline e o conjunto de operações descrito nesta solução podem ser estendidos de diversas maneiras. As extensões mais óbvias seria a realização de agregações adicionais nos dados LogMessage. Por exemplo, se a sessão ou as informações anônimas do usuário fossem incluídas na saída do registro, você poderia criar agregações em torno da atividade do usuário. Você também pode usar ApproximateQuantiles para gerar uma distribuição de tempos de resposta.

Recursos e custo

Este tutorial usa vários componentes faturáveis do Google Cloud Platform, como:

  • Kubernetes Engine para implantação de microsserviços;
  • Stackdriver Logging para receber e exportar registros;
  • Google Cloud Storage para armazenar os registros exportados em modo de lote;
  • Cloud Pub/Sub para fazer o streaming de registros exportados no modo de streaming;
  • Cloud Dataflow para processar os dados de registro;
  • BigQuery para armazenar a saída do processamento e dar suporte a consultas avançadas nesse resultado.

O custo de execução deste tutorial varia de acordo com o tempo de execução. Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. Os novos usuários do Cloud Platform podem estar qualificados para uma avaliação gratuita.

Tutorial

O conteúdo completo do tutorial, incluindo as instruções e o código-fonte, estão disponíveis no GitHub, em https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.

Próximas etapas

  • Conheça outros recursos do Google Cloud Platform. Veja nossos tutoriais.
  • Aprenda a usar os produtos do Google Cloud Platform para criar soluções completas.
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…