Gráficos do job do Dataflow

Quando você seleciona um job específico do Dataflow, a interface de monitoramento fornece uma representação gráfica do job: o gráfico do job. A página do gráfico do job no console também fornece um resumo e um registro do job, além de informações sobre cada etapa do pipeline.

O gráfico do job de um pipeline representa cada transformação no pipeline como uma caixa. Cada caixa contém o nome da transformação e as informações sobre o status do job, que incluem o seguinte:

  • Em execução: a etapa está sendo executada.
  • Em fila: a etapa em que um job do FlexRS entra na fila.
  • Com êxito: a etapa foi concluída.
  • Parado: a etapa foi interrompida porque o job parou.
  • Desconhecido: a etapa falhou ao informar o status.
  • Falha: a etapa não foi concluída.

Por padrão, a página do gráfico de jobs exibe a Visualização de gráfico. Para acessar seu job gráfico como uma tabela, em Visualização das etapas do job, selecione Visualização da tabela. Visualização em tabela contém as mesmas informações em um formato diferente. A visualização em tabela útil nos seguintes cenários:

  • Seu job tem muitos estágios, o que dificulta a navegação no gráfico.
  • Você quer classificar as etapas do job por uma propriedade específica. Por exemplo, você pode classificar na tabela por tempo decorrido para identificar etapas lentas.

Gráfico básico do job

Código do pipeline:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)
Gráfico do job:

O gráfico de execução para um pipeline WordCount conforme mostrado na interface de monitoramento do
              Dataflow.

Figura 1: o código de um pipeline WordCount mostrado com o gráfico de execução resultante na interface de monitoramento do Dataflow.

Transformações compostas

No gráfico do job, as transformações compostas (aquelas que contêm várias subtransformações aninhadas) são expansíveis. Transformações compostas expansíveis são marcadas com uma seta no gráfico. Para expandir a transformação e conferir as subtransformações, clique na seta.

Código do pipeline:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Gráfico do job:

O gráfico do job de um pipeline WordCount com CountWords expandida para mostrar as transformações dos componentes.

Figura 2: o código do pipeline para as subetapas da transformação CountWords. Mostrado com o gráfico do job expandido para todo o pipeline.

No código do pipeline, use o seguinte código para invocar a transformação composta:

result = transform.apply(input);

Transformações compostas invocadas dessa maneira omitem o aninhamento esperado e podem aparecer expandidas na interface de monitoramento do Dataflow. O pipeline também gera avisos ou erros sobre nomes únicos estáveis no ambiente de execução do pipeline.

Para evitar esses problemas, invoque as transformações usando o formato recomendado:

result = input.apply(transform);

Nomes de transformação

O Dataflow tem algumas maneiras diferentes para chegar ao nome da transformação mostrado no gráfico do job de monitoramento: Os nomes de transformação são usados em locais visíveis publicamente, incluindo a interface de monitoramento do Dataflow, os arquivos de registro e as ferramentas de depuração. Não use nomes de transformação que incluam informações de identificação pessoal, como nomes de usuário ou nomes de organizações.

Java

  • O Dataflow pode usar um nome já atribuído quando você aplica a transformação. O primeiro argumento fornecido para o método apply é o nome da transformação.
  • O Dataflow pode inferir o nome da transformação, seja do nome da classe, se você criou uma transformação personalizada, ou do nome do objeto de função DoFn, se você usa uma transformação básica, como ParDo.

Python

  • O Dataflow pode usar um nome já atribuído quando você aplica a transformação. Para definir o nome da transformação, especifique o argumento label dela.
  • O Dataflow pode inferir o nome da transformação, seja do nome da classe, se você criou uma transformação personalizada, ou do nome do objeto de função DoFn, se você usa uma transformação básica, como ParDo.

Go

  • O Dataflow pode usar um nome já atribuído quando você aplica a transformação. É possível definir o nome de transformação especificando Scope.
  • O Dataflow pode inferir o nome de transformação, seja do nome da estrutura se você estiver usando um DoFn estrutural ou do nome da função se estiver usando um DoFn funcional.

Compreender as métricas

Nesta seção, fornecemos detalhes sobre as métricas associadas ao gráfico do job.

Tempo decorrido

Quando você clica em uma etapa, a métrica de Tempo decorrido é exibida no painel Informações da etapa. Esta métrica fornece o tempo total aproximado gasto por meio de todas as linhas de execução em todos os workers nas ações a seguir:

  • Inicialização da etapa
  • Processamento dos dados
  • Embaralhamento dos dados
  • Finalização da etapa

Para etapas compostas, o tempo decorrido informa a soma do tempo gasto nas etapas do componente. Essa estimativa pode ajudar a identificar etapas lentas e diagnosticar qual parte do pipeline está demorando mais tempo do que o necessário.

É possível ver o tempo que leva para uma etapa ser executada no pipeline.
Figura 3: a métrica Tempo decorrido ajuda a garantir que o pipeline esteja funcionando de maneira eficiente.

Métricas de entrada secundária

As métricas de entrada secundária mostram como os algoritmos e os padrões de acesso de entradas secundárias afetam o desempenho do pipeline. Quando o pipeline usa uma entrada secundária, o Dataflow grava a coleção em uma camada permanente (como um disco) e as transformações são lidas nessa coleção permanente. Essas leituras e gravações afetam o tempo de execução do job.

A interface de monitoramento do Dataflow exibe métricas de entrada secundária quando você seleciona uma transformação que cria ou consome uma coleção de entrada secundária. É possível ver as métricas na seção Métricas de entrada secundária do painel Informações da etapa.

Transformações que criam uma entrada secundária

Se a transformação selecionada criar uma coleção de entrada secundária, a seção Métricas de entrada secundária exibirá o nome da coleção, junto com as seguintes métricas:

  • Tempo gasto na gravação: o tempo gasto gravando a coleção de entrada secundária.
  • Bytes gravados: o número total de bytes gravados na coleção de entrada secundária.
  • Tempo de leitura da entrada secundária e bytes lidos: uma tabela que contém outras métricas para todas as transformações que consomem a coleção de entradas secundárias, chamadas de consumidores de entrada secundária.

A tabela Tempo de leitura da entrada secundária e bytes lidos exibe as seguintes informações para cada consumidor de entrada secundária:

  • Consumidor de entrada secundária: o nome da transformação do consumidor de entrada secundária.
  • Tempo gasto lendo: o tempo que esse consumidor gastou lendo a coleção de entrada secundária.
  • Bytes lidos: o número de bytes que este consumidor leu da coleção de entrada secundária.

Se o pipeline tiver uma transformação composta que crie uma entrada secundária, expanda a transformação composta até que a subtransformação específica que cria a entrada secundária esteja visível. Depois, selecione essa subtransformação para visualizar a seção Métricas de entrada secundária.

A Figura 4 mostra as métricas de entrada secundária de uma transformação que cria uma coleção de entrada secundária.

Selecione a subtransformação. Assim, as métricas de entrada secundária estarão
          visíveis no painel lateral de informações da etapa.
Figura 4: o gráfico do job tem uma transformação composta expandida (MakeMapView). A subtransformação que cria a entrada secundária (CreateDataflowView) é selecionada, e as métricas de entrada secundária podem ser vistas no painel lateral Informações da etapa.

Transformações que consomem uma ou mais entradas secundárias

Se a transformação selecionada consumir uma ou mais entradas secundárias, a seção Métricas de entrada secundária exibirá a tabela Tempo de leitura da entrada secundária e bytes lidos. Nesta tabela, as seguintes informações para cada coleção de entrada secundária são exibidas:

  • Coleção de entrada secundária: o nome da coleção de entrada secundária.
  • Tempo gasto na leitura: o tempo que a transformação gastou lendo a coleção de entrada secundária.
  • Bytes lidos: o número de bytes que a transformação leu da coleção de entrada secundária.

Se o pipeline tiver uma transformação composta que crie uma entrada secundária, expanda a transformação composta até que a subtransformação específica que lê a entrada secundária esteja visível. Depois, selecione essa subtransformação para visualizar a seção Métricas de entrada secundária.

A Figura 5 mostra as métricas de entrada secundária de uma transformação que lê uma coleção de entrada secundária.

Selecione a transformação. Assim, as métricas de entrada secundária estarão
         visíveis no painel lateral de informações da etapa.
Figura 5: a transformação JoinBothCollections faz leituras em uma coleção de entradas secundárias. JoinBothCollections está selecionada no gráfico do job e as métricas de entrada secundária podem ser visualizadas no painel lateral Informações da etapa.

Identificar problemas de desempenho em entradas secundárias

A reiteração é um problema comum de desempenho da entrada secundária. Se a entrada secundária PCollection for muito grande, os workers não poderão armazenar em cache a coleção inteira na memória. Como resultado, os workers leem repetidamente a coleção de entrada secundária permanente.

Na figura 6, as métricas de entrada secundária mostram que o total de bytes lidos da coleção de entrada secundária é muito maior que o tamanho da coleção (total de bytes gravados).

Selecione a transformação. Assim, as métricas de entrada secundária estarão
         visíveis no painel lateral de informações da etapa.
Figura 6: um exemplo de reiteração. A coleção de entradas secundárias é de 563 MB e a soma dos bytes lidos por transformações de consumo é de quase 12 GB.

Para melhorar o desempenho desse pipeline, recrie o algoritmo para evitar iterar ou refazer os dados de entrada secundária. Neste exemplo, o pipeline cria o produto cartesiano de duas coleções. O algoritmo itera toda a coleção de entrada secundária para cada elemento da coleção principal. Você pode melhorar o padrão de acesso do pipeline ao agrupar vários elementos da coleção principal. Essa alteração reduz o número de vezes que os workers releem a coleção de entrada secundária.

Outro problema comum de desempenho pode ocorrer se o pipeline executar uma mesclagem aplicando uma ParDo com uma ou mais entradas secundárias grandes. Nesse caso, os workers dedicam uma grande porcentagem do tempo de processamento para a operação de junção que leem as coleções de entrada secundárias.

A Figura 7 mostra um exemplo de métricas de entrada secundária para esse problema:

Selecione a transformação. Assim, as métricas de entrada secundária estarão
         visíveis no painel lateral de informações da etapa.
Figura 7: a transformação JoinBothCollections tem um tempo de processamento total de 18 minutos e 31 segundos. Os workers passam a maior parte desse tempo (10 minutos e 3 segundos) na leitura da coleção de entrada secundária de 10 GB.

Para melhorar o desempenho desse pipeline, use CoGroupByKey em vez de entradas secundárias.