Trabalhar com registros do pipeline

É possível usar a infraestrutura de geração de registros integrada do SDK do Apache Beam para registrar informações ao executar o pipeline. Use o Console do Google Cloud para monitorar as informações de geração de registros durante e após a execução do pipeline.

Adicionar mensagens de registro ao pipeline

Java

O SDK do Apache Beam para Java recomenda gerar registros de mensagens do worker por meio da biblioteca de código aberto Simple Logging Facade for Java (SLF4J). O SDK do Apache Beam para Java implementa a infraestrutura de geração de registros obrigatória. Dessa maneira, o código Java só precisa importar a API SLF4J. Em seguida, ele cria instâncias de um Logger para ativar a geração de registros de mensagens no código do pipeline.

Para código e/ou bibliotecas preexistentes, o SDK do Apache Beam para Java configura uma infraestrutura de registros extra. As mensagens de registro produzidas pelas seguintes bibliotecas de geração de registros para Java são capturadas:

Python

O SDK do Apache Beam para Python fornece o pacote de biblioteca logging, que permite que os workers do pipeline enviem mensagens de registro. Para usar as funções da biblioteca, é necessário importá-la:

import logging

Go

O SDK do Apache Beam para Go fornece o pacote de biblioteca log, que permite que os workers do pipeline enviem mensagens de registro. Para usar as funções da biblioteca, é necessário importá-la:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

Exemplo de código de mensagem de registro do worker

Java

O exemplo a seguir usa o SLF4J para geração de registros do Dataflow. Para saber mais sobre como configurar o SLF4J para geração de registros do Dataflow, consulte o artigo Dicas de Java.

O exemplo WordCount do Apache Beam pode ser modificado para resultar em uma mensagem de registro quando a palavra “love” for localizada em uma linha do texto processado. O código adicionado está indicado em negrito no exemplo a seguir. O código entre parênteses foi incluído para contextualização.

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

É possível modificar o exemplo wordcount.py do Apache Beam para resultar em uma mensagem de registro quando a palavra “love” for localizada em uma linha do texto processado.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

É possível modificar o exemplo wordcount.go do Apache Beam para resultar em uma mensagem de registro quando a palavra “love” for localizada em uma linha do texto processado.

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

Se o pipeline WordCount modificado for executado localmente usando o DirectRunner padrão com a saída enviada para um arquivo local (--output=./local-wordcounts), a saída do console incluirá as mensagens de registro adicionadas:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Por padrão, apenas linhas de registros marcadas como INFO e superiores são enviadas para o Cloud Logging. Se você quiser alterar esse comportamento, consulte Como configurar níveis de registro do trabalhador do canal.

Python

Se o pipeline WordCount modificado for executado localmente usando o DirectRunner padrão com a saída enviada para um arquivo local (--output=./local-wordcounts), a saída do console incluirá as mensagens de registro adicionadas:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Por padrão, apenas linhas de registros marcadas como INFO e superiores são enviadas para o Cloud Logging.

Go

Se o pipeline WordCount modificado for executado localmente usando o DirectRunner padrão com a saída enviada para um arquivo local (--output=./local-wordcounts), a saída do console incluirá as mensagens de registro adicionadas:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

Por padrão, apenas linhas de registros marcadas como INFO e superiores são enviadas para o Cloud Logging.

Controlar o volume de registros

Você também pode reduzir o volume de registros gerados alterando os níveis de registro do pipeline. Se você não quiser continuar ingerindo alguns ou todos os registros do Dataflow, adicione uma exclusão do Logging para excluir registros do Dataflow. Em seguida, exporte os registros para um destino diferente, como BigQuery, Cloud Storage ou Pub/Sub. Para mais informações, consulte Controlar a ingestão de registro do Dataflow.

Limites da geração de registros

As mensagens de registro do worker são limitadas a 15.000 mensagens a cada 30 segundos, por worker. Se o limite for atingido, uma única mensagem de registro do worker será adicionada informando que a geração de registros está limitada:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Nenhuma outra mensagem será registrada até que o intervalo de 30 segundos acabe. Esse limite é compartilhado por mensagens de registro geradas pelo SDK do Apache Beam e pelo código do usuário.

Armazenamento e retenção de registros

Os registros operacionais são armazenados no bucket de registro _Default. O nome do serviço da API de registro é dataflow.googleapis.com. Para mais informações sobre os tipos de recursos monitorados e os serviços do Google Cloud usados no Cloud Logging, consulte Recursos e serviços monitorados.

Para saber detalhes sobre o período de retenção das entradas de registro no Logging, consulte as informações sobre retenção em Cotas e limites: períodos de armazenamento de registros.

Para informações sobre como visualizar registros operacionais, consulte Monitorar e visualizar registros de pipeline.

Monitorar e visualizar registros de pipeline

Ao executar o pipeline no serviço do Dataflow, use a interface de monitoramento para ver os registros emitidos pelo pipeline.

Exemplo de registro de worker do Dataflow

É possível executar o canal WordCount modificado na nuvem com as seguintes opções:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Ver registros

Como o canal da nuvem do WordCount usa a execução de bloqueio, as mensagens do console são exibidas durante a execução do canal. Quando o job for iniciado, um link para a página do Console do Google Cloud será enviado ao console seguido pelo ID do job do pipeline:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

O URL do console leva à Interface de monitoramento do Dataflow com uma página de resumo para o job enviado. Ela mostra um gráfico de execução dinâmica à esquerda, com resumos à direita. Clique em no painel inferior para expandir o painel de registros.

O painel de registros exibe como padrão Registros de jobs, que informa o status do job como um todo. É possível filtrar as mensagens que aparecem no painel de registros clicando em Informações e Filtrar registros.

A seleção de uma etapa no gráfico modifica a visualização de Registros de etapas gerados pelo código e também o código de execução gerado na etapa de canal.

Para voltar aos Registros do job, clique fora do gráfico ou use o botão Desmarcar etapa no painel lateral à direita.

Acesse a Análise de registros

Para abrir a Análise de registros e selecionar diferentes tipos, no painel de registros, Clique em Ver na Análise de registros (o botão do link externo).

Na Análise de registros, para mostrar o painel com diferentes tipos de registro, clique na opção Campos de registro.

Na página Análise de registros, a consulta pode filtrar os registros por etapa do job ou por tipo de registro. Para remover filtros, clique no botão de alternância Mostrar consulta e edite a consulta.

Para acessar todos os registros disponíveis para um job, siga estas etapas:

  1. No campo Consulta, digite a seguinte consulta:

    resource.type="dataflow_step"
    resource.labels.job_id="JOB_ID"
    

    Substitua JOB_ID pelo ID do job.

  2. Clique em Executar consulta.

  3. Se você usar essa consulta e não encontrar os registros do job, clique em Editar horário.

  4. Ajuste o horário de início e de término e clique em Aplicar.

Tipos de registro

A Análise de registros também inclui registros de infraestrutura para seu pipeline.

Este é um resumo dos diferentes tipos de registro disponíveis para visualização na página Análise de registros:

  • Os registros job-message contêm mensagens no nível de job que vários componentes do Dataflow geram. Os exemplos incluem a configuração de escalonamento automático, o momento em que os workers são inicializados ou desligados, o progresso na etapa do job e erros no job. Os erros no nível do worker que são originados de falha no código de usuário e que estão presentes em registros dworker também se propagam para os registros jobs-message.
  • Os registros worker são produzidos por workers do Dataflow. Os workers fazem a maior parte do trabalho do pipeline. Por exemplo, aplicar ParDos aos dados. Os registros worker contêm mensagens registradas pelo seu código e pelo Dataflow.
  • Os registros worker-startup estão presentes na maioria dos jobs do Dataflow e podem capturar mensagens relacionadas ao processo de inicialização. O processo de inicialização inclui o download dos jars do job do Cloud Storage e a inicialização dos workers. Se houver um problema ao iniciar os workers, esses logs são um bom local para procurar.
  • Registros de shuffler contêm mensagens de workers que consolidam os resultados das operações paralelas do canal.
  • Registros system contêm mensagens dos sistemas operacionais do host das VMs de worker. Em alguns cenários, eles podem capturar falhas de processos ou eventos de falta de memória (OOM).
  • Os registros docker e kubelet contêm mensagens relacionadas a essas tecnologias públicas, que são usadas em workers do Dataflow.
  • Os registros nvidia-mps contêm mensagens sobre operações do NVIDIA Multi-Process Service (MPS).

Definir níveis de registro do worker do pipeline

Java

O nível padrão de geração de registros SLF4J definido nos workers pelo SDK do Apache Beam para Java é INFO. Todas as mensagens de registro de INFO ou superior (INFO, WARN, ERROR) serão emitidas. Defina um nível de registro padrão diferente para dar suporte a níveis de geração de registros SLF4J (TRACE ou DEBUG) mais baixos ou crie diferentes níveis de registro para diferentes pacotes de classes no código.

As opções de pipeline a seguir são fornecidas para permitir a definição de níveis de registro do worker na linha de comando ou de maneira programática:

  • --defaultSdkHarnessLogLevel=<level>: use esta opção para definir todos os loggers no nível padrão especificado. Por exemplo, a seguinte opção de linha de comando substituirá o nível de registro padrão INFO do Dataflow e o configurará como DEBUG:
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}: use esta opção para criar o nível de geração de registros para classes ou pacotes especificados. Por exemplo, para substituir o nível de registro de pipeline padrão para o pacote org.apache.beam.runners.dataflow e defini-lo como TRACE:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    Para fazer várias substituições, forneça um mapa JSON:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).
  • As opções de pipeline defaultSdkHarnessLogLevel e sdkHarnessLogLevelOverrides não têm suporte nos pipelines que usam o SDK do Apache Beam versão 2.50.0 e versões anteriores sem o Runner v2. Nesse caso, use as opções de pipeline --defaultWorkerLogLevel=<level> e --workerLogLevelOverrides={"<package or class>":"<level>"}. Para usar várias substituições, forneça um mapa JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

O exemplo a seguir define de maneira programática as opções de geração de registros do pipeline com valores padrão que podem ser substituídas na linha de comando:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

O nível de geração de registros padrão definido nos workers pelo SDK do Apache Beam para Python é INFO. Todas as mensagens de registro de INFO ou superior (INFO, WARNING, ERROR, CRITICAL) serão emitidas. Você pode definir um nível de registro padrão diferente para dar suporte a níveis de registro menores (DEBUG) ou definir diferentes níveis de registro para diferentes módulos no código.

São fornecidas duas opções de pipeline para criar níveis de registro de worker pela linha de comando ou de maneira programática:

  • --default_sdk_harness_log_level=<level>: use esta opção para definir todos os loggers no nível padrão especificado. Por exemplo, a opção de linha de comando a seguir substitui o nível de registro INFO padrão do Dataflow e o define como DEBUG:
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}: use esta opção para definir o nível de geração de registros para módulos especificados. Por exemplo, para substituir o nível de registro do canal padrão para o módulo apache_beam.runners.dataflow e defini-lo como DEBUG:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    Para fazer várias substituições, forneça um mapa JSON:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}).

No exemplo a seguir, a classe WorkerOptions é usada para definir programaticamente as opções de geração de registros do pipeline que podem ser modificadas na linha de comando:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

Substitua:

  • PROJECT_NAME: o nome do projeto.
  • JOB_NAME: o nome do job
  • STORAGE_BUCKET: o nome do Cloud Storage
  • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow

    A sinalização --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Go

Esse recurso não está disponível no SDK do Apache Beam para Go.

Ver o registro de jobs iniciados do BigQuery

Ao usar o BigQuery no pipeline do Dataflow, os jobs do BigQuery são iniciados para executar várias ações em seu nome. Essas ações podem incluir o carregamento e a exportação de dados, entre outros. Para fins de solução de problemas e monitoramento, a interface de monitoramento do Dataflow tem informações adicionais sobre esses jobs do BigQuery disponíveis no painel Registros.

As informações de jobs do BigQuery exibidas no painel Registros são armazenadas e carregadas em uma tabela do sistema do BigQuery. Portanto, um custo de faturamento é gerado quando a tabela subjacente do BigQuery é consultada.

Ver os detalhes do job do BigQuery

Para ver as informações dos jobs do BigQuery, o pipeline precisa usar o Apache Beam 2.24.0 ou posterior.

Para listar os jobs do BigQuery, abra a guia Jobs do BigQuery e selecione o local desses jobs. Em seguida, clique em Carregar jobs do BigQuery e confirme a caixa de diálogo. Após a conclusão da consulta, a lista de jobs é exibida.

O botão &quot;Carregar jobs do BigQuery&quot; na tabela de informações dos jobs do BigQuery

São disponibilizadas informações básicas sobre cada job, incluindo ID, tipo, duração e outras.

uma tabela que mostra os jobs do BigQuery que executados durante a execução atual do pipeline.

Para informações mais detalhadas sobre um job específico, clique em Linha de comando na coluna Mais informações.

Na janela modal da linha de comando, copie o comando bq jobs describe e execute-o localmente ou no Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

O comando bq jobs describe gera JobStatistics, que fornece mais detalhes úteis para diagnosticar um job lento ou travado do BigQuery.

Como alternativa, ao usar o BigQueryIO com uma consulta SQL, um job de consulta é emitido. Para ver a consulta SQL usada pelo job, clique em Visualizar consulta na coluna Mais informações.