Gerar registros de mensagens do pipeline

É possível usar a infraestrutura de geração de registros integrada do SDK do Apache Beam para registrar informações durante a execução do pipeline. Use o Console do Google Cloud para monitorar as informações de geração de registros durante e após a execução do pipeline.

Como adicionar mensagens de registro ao canal

Java: SDK 2.x

Segundo o SDK do Apache Beam para Java, é recomendável que você registre mensagens do worker por meio da biblioteca de código aberto SLF4J (Simple Logging Facade for Java) [em inglês]. O SDK do Apache Beam para Java implementa a infraestrutura de geração de registros necessária para que o código Java só precise importar a API SLF4J. Em seguida, ele cria instâncias de um Logger para ativar a geração de registros de mensagens no código do pipeline.

Para código e/ou bibliotecas preexistentes, o SDK do Apache Beam para Java configura uma infraestrutura de registros extra. Isso ocorre ao executar no worker para capturar mensagens de log produzidas pelas seguintes bibliotecas de log para Java:

Python

O SDK do Apache Beam para Python fornece o pacote de bibliotecas logging, que permite que os workers do pipeline exibam mensagens de registro. Para usar as funções da biblioteca, é necessário importá-la:

import logging

Java: SDK 1.x

Exemplo de código de mensagem de registro do worker

Java: SDK 2.x

O exemplo WordCount do Apache Beam pode ser modificado para resultar em uma mensagem de registro quando a palavra “love” for localizada em uma linha do texto processado. O código adicionado está indicado abaixo em negrito. O restante do código foi incluído para contextualização.

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

É possível modificar o exemplo wordcount.py do Apache Beam para resultar em uma mensagem de registro quando a palavra “love” for localizada em uma linha do texto processado.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java: SDK 1.x

Java: SDK 2.x

Se o pipeline WordCount modificado for executado localmente usando o DirectRunner padrão com a saída enviada para um arquivo local (--output=./local-wordcounts), a saída do console incluirá as mensagens de registro adicionadas:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Por padrão, apenas linhas de registro marcadas como INFO e superiores são enviadas para o Cloud Logging. Para modificar esse comportamento, consulte Como configurar níveis de registro de worker do pipeline.

Python

Se o pipeline WordCount modificado for executado localmente usando o DirectRunner padrão com a saída enviada para um arquivo local (--output=./local-wordcounts), a saída do console incluirá as mensagens de registro adicionadas:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Por padrão, apenas linhas de registros marcadas como INFO e superiores são enviadas para o Cloud Logging.

Java: SDK 1.x

Limites da geração de registros

As mensagens de registro do worker são limitadas a 15.000 mensagens a cada 30 segundos, por worker. Se esse limite for atingido, uma única mensagem de registro do worker será adicionada, dizendo que a geração de registros é limitada:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Nenhuma outra mensagem é registrada até o fim do intervalo de 30 segundos. Esse limite é compartilhado por mensagens de registro geradas pelo SDK do Apache Beam e pelo código do usuário.

Como monitorar registros do canal

Ao executar o pipeline no serviço do Dataflow, use a interface de monitoramento para ver os registros emitidos pelo pipeline.

Exemplo de registro de worker do Dataflow

É possível executar o canal WordCount modificado na nuvem com as seguintes opções:

Java: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java: SDK 1.x

Como ver registros

Como o canal da nuvem do WordCount usa a execução de bloqueio, as mensagens do console são exibidas durante a execução do canal. Quando o job for iniciado, um link para a página do Console do Cloud será enviado ao console seguido pelo ID do job do pipeline:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

O URL do console leva à Interface de monitoramento do Dataflow com uma página de resumo para o job enviado. Ela mostra um gráfico de execução dinâmica à esquerda, com resumos à direita. Clique em no painel inferior para expandir o painel de registros.

O painel de registros exibe como padrão Registros de jobs, que informa o status do job como um todo. É possível filtrar as mensagens que aparecem no painel de registros clicando em Informações e Filtrar registros.

A seleção de uma etapa no gráfico modifica a visualização de Registros de etapas gerados pelo código e também o código de execução gerado na etapa de canal.

Para voltar aos Registros do job, clique fora do gráfico ou use o botão Desmarcar etapa no painel lateral à direita.

Clique no botão do link externo no painel de registros para acessar o Logging com um menu que permite selecionar vários tipos de registros.

O Logging também inclui outros registros de infraestrutura para o canal. Para saber mais sobre como explorar seus registros, consulte o guia do Explorador de registros.

Confira um resumo dos vários tipos de registro disponíveis para visualização na página do Logging:

  • Os registros job-message contêm mensagens no nível de job que vários componentes do Dataflow geram. Os exemplos incluem a configuração de escalonamento automático, o momento em que os workers são inicializados ou desligados, o progresso na etapa do job e erros no job. Os erros no nível do worker que são originados de falha no código de usuário e que estão presentes em registros dworker também se propagam para os registros jobs-message.
  • Os registros worker são produzidos por workers do Dataflow. Os workers fazem a maior parte do trabalho do pipeline. Por exemplo, aplicar ParDos aos dados. Os registros worker contêm mensagens registradas pelo seu código e pelo Dataflow.
  • Os registros worker-startup estão presentes na maioria dos jobs do Dataflow e podem capturar mensagens relacionadas ao processo de inicialização. Esse processo inclui o download de jars do job do Google Cloud Storage e a inicialização dos workers. Se houver um problema ao iniciar os workers, esses logs são um bom local para procurar.
  • Registros de shuffler contêm mensagens de workers que consolidam os resultados das operações paralelas do canal.
  • Os registros docker e kubelet contêm mensagens relacionadas a essas tecnologias públicas, que são usadas em workers do Dataflow.

Como configurar níveis de registro de worker do pipeline

Java: SDK 2.x

O nível padrão de geração de registros SLF4J definido nos workers pelo SDK do Apache Beam para Java é INFO. Todas as mensagens de registro de INFO ou superior (INFO, WARN, ERROR) serão emitidas. Defina um nível de registro padrão diferente para dar suporte a níveis de geração de registros SLF4J (TRACE ou DEBUG) mais baixos ou crie diferentes níveis de registro para diferentes pacotes de classes no código.

São fornecidas duas opções de pipeline para criar níveis de registro de worker pela linha de comando ou de maneira programática:

  • --defaultWorkerLogLevel=<level>: use esta opção para definir todos os loggers no nível padrão especificado. Por exemplo, a seguinte opção de linha de comando substituirá o nível de registro padrão INFO do Dataflow e o configurará como DEBUG:
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: use esta opção para criar o nível de geração de registros para classes ou pacotes especificados. Por exemplo, para modificar o nível de registro do pipeline padrão para o pacote com.google.cloud.dataflow e defini-lo como TRACE:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    ou para modificar o nível de registro do pipeline padrão para a classe com.google.cloud.Foo e defini-lo como DEBUG:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    É possível realizar várias modificações fornecendo um mapa JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

O exemplo a seguir define de maneira programática as opções de geração de registros do pipeline com valores padrão que podem ser substituídas na linha de comando:

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

Esse recurso ainda não está disponível no SDK do Apache Beam para Python.

Java: SDK 1.x

Como visualizar o registro de jobs iniciados no BigQuery

Ao usar o BigQuery no pipeline do Dataflow, os jobs do BigQuery são iniciados para executar várias ações em seu nome, como carregar, exportar dados etc. Para fins de solução de problemas e monitoramento, a IU de monitoramento do Dataflow tem informações adicionais sobre esses jobs do BigQuery disponíveis no painel Registros.

As informações de jobs do BigQuery exibidas no painel Registros são armazenadas e carregadas em uma tabela do sistema do BigQuery. Portanto, um custo de faturamento é gerado quando a tabela subjacente do BigQuery é consultada.

Configurar o projeto

Para ver as informações de jobs do BigQuery, o pipeline precisa usar o Apache Beam 2.24.0 ou posterior. No entanto, até que ela seja lançada, é necessário usar uma versão do desenvolvedor do SDK do Apache Beam criada a partir da ramificação principal.

Java: SDK 2.x

  1. Adicione o seguinte perfil ao arquivo pom.xml do seu projeto.

    <profiles>
      <!-- Additional profiles listed here. -->
      <profile>
        <id>snapshot</id>
        <repositories>
          <repository>
            <id>apache.snapshots</id>
            <url>https://repository.apache.org/content/repositories/snapshots</url>
          </repository>
        </repositories>
      </profile>
    </profiles>
    
  2. Ao testar ou executar seu projeto, defina a opção de perfil como o valor id listado em pom.xml e defina a propriedade beam.version como 2.24.0-SNAPSHOT ou posterior. Exemplo:

    mvn test -Psnapshot -Dbeam.version=2.24.0-SNAPSHOT
    

    Para mais valores de snapshot, consulte o índice de snapshots.

Python

  1. Faça login no GitHub.

  2. Navegue até a lista de resultados para versões do Python SDK do Apache Beam concluídas com êxito.

  3. Clique em um job recém-concluído, criado a partir da ramificação principal (mestre).

  4. No painel lateral, clique em Listar arquivos no bucket do Google Cloud Storage.

  5. No painel principal, expanda o Arquivo de lista no bucket do Google Cloud Storage.

  6. Faça o download do arquivo ZIP a partir da lista de arquivos para um computador ou local onde você executa seu projeto Python.

    O nome do bucket do Cloud Storage é beam-wheels-staging. Portanto, inclua-o ao criar o URL de download. Exemplo:

    gsutil cp gs://beam-wheels-staging/master/02bf081d0e86f16395af415cebee2812620aff4b-207975627/apache-beam-2.25.0.dev0.zip <var>SAVE_TO_LOCATION</var>
    
  7. Instale o arquivo ZIP salvo.

    pip install apache-beam-2.25.0.dev0.zip
    
  8. Ao executar o pipeline do Apache Beam, transmita a sinalização --sdk_location e faça referência ao arquivo ZIP do SDK.

    --sdk_location=apache-beam-2.25.0.dev0.zip
    

Java: SDK 1.x

Como visualizar detalhes do job do BigQuery

Para listar os jobs do BigQuery, abra a guia Jobs do BigQuery e selecione o local desses jobs. Em seguida, clique em Carregar jobs do BigQuery e confirme a caixa de diálogo. Após a conclusão da consulta, a lista de jobs é exibida.

O botão &quot;Carregar jobs do BigQuery&quot; na tabela de informações dos jobs do BigQuery

São disponibilizadas informações básicas sobre cada job, incluindo código, tipo, duração e outras.

uma tabela que mostra os jobs do BigQuery que executados durante a execução atual do pipeline.

Para informações mais detalhadas sobre um job específico, clique em Linha de comando na coluna Mais informações.

Na janela modal da linha de comando, copie o comando bq jobs describe e execute-o localmente ou no Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

O comando bq jobs describe gera JobStatistics, que fornece mais detalhes úteis para diagnosticar um job lento ou travado do BigQuery.

Como alternativa, ao usar o BigQueryIO com uma consulta SQL, um job de consulta é emitido. Clique em Visualizar consulta na coluna Mais informações para ver a consulta SQL usada pelo job.