Quando você seleciona um job específico do Dataflow, a interface de monitoramento fornece uma representação gráfica do pipeline: o gráfico do job. A página do gráfico do job no console também fornece um resumo e um registro do job, além de informações sobre cada etapa do pipeline.
O gráfico do job de um pipeline representa cada transformação no pipeline como uma caixa. Cada caixa contém o nome da transformação e as informações sobre o status do job, que incluem o seguinte:
- Em execução: a etapa está em execução.
- Em fila: a etapa em que um job do FlexRS entra na fila.
- Com êxito: a etapa foi concluída com sucesso.
- Parado: a etapa foi interrompida porque o job parou.
- Desconhecido: a etapa falhou ao informar o status.
- Com falha: a etapa não foi concluída.
Por padrão, a página do gráfico de jobs exibe a Visualização de gráfico. Para acessar seu job gráfico como uma tabela, em Visualização das etapas do job, selecione Visualização da tabela. Visualização em tabela contém as mesmas informações em um formato diferente. A visualização em tabela útil nos seguintes cenários:
- Seu job tem muitos estágios, o que dificulta a navegação no gráfico.
- Você quer classificar as etapas do job por uma propriedade específica. Por exemplo, você pode classificar na tabela por tempo decorrido para identificar etapas lentas.
Gráfico básico do job
Código do pipeline:Java// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Go// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
Gráfico do job:
|
Transformações compostas
No gráfico do job, as transformações compostas (aquelas que contêm várias subtransformações aninhadas) são expansíveis. Transformações compostas expansíveis são marcadas com uma seta no gráfico. Clique na seta para expandir a transformação e visualizar as subtransformações internas.
Código do pipeline:Java// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Go// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
Gráfico do job:
|
No código do pipeline, você pode ter invocado sua transformação composta, da seguinte forma:
result = transform.apply(input);
Transformações compostas invocadas dessa maneira omitem o aninhamento esperado e podem aparecer expandidas na Interface de monitoramento do Dataflow. O pipeline também gera avisos ou erros sobre nomes únicos estáveis no ambiente de execução do pipeline.
Para evitar esses problemas, chame a transformação usando o formato recomendado:
result = input.apply(transform);
Nomes de transformação
O Dataflow tem algumas maneiras diferentes para chegar ao nome da transformação mostrado no gráfico do job de monitoramento:
Java
- O Dataflow pode usar um nome já atribuído quando você aplica a transformação. O primeiro
argumento fornecido para o método
apply
é o nome da transformação. - O Dataflow pode inferir o nome da transformação, seja do nome da classe, se você criou uma transformação personalizada, ou do nome do objeto de função
DoFn
, se você estiver usando uma transformação básica, comoParDo
.
Python
- O Dataflow pode usar um nome já atribuído quando você aplica a transformação. Para definir o nome da transformação, especifique o argumento
label
dela. - O Dataflow pode inferir o nome da transformação, seja do nome da classe, se você criou uma transformação personalizada, ou do nome do objeto de função
DoFn
, se você estiver usando uma transformação básica, comoParDo
.
Go
- O Dataflow pode usar um nome já atribuído quando você aplica a transformação. É possível definir o nome de transformação especificando
Scope
. - O Dataflow pode inferir o nome de transformação, seja do nome da estrutura se você estiver usando um
DoFn
estrutural ou do nome da função se estiver usando umDoFn
funcional.
Compreender as métricas
Nesta seção, fornecemos detalhes sobre as métricas associadas ao gráfico do job.
Tempo decorrido
Quando você clica em uma etapa, a métrica de Tempo decorrido é exibida no painel Informações da etapa. Esta métrica fornece o tempo total aproximado gasto por meio de todas as linhas de execução em todos os workers nas ações a seguir:
- Inicialização da etapa
- Processamento dos dados
- Embaralhamento dos dados
- Finalização da etapa
Para etapas compostas, o tempo decorrido informa a soma do tempo gasto nas etapas do componente. Essa estimativa pode ajudar a identificar etapas lentas e diagnosticar qual parte do pipeline está demorando mais tempo do que o necessário.
Métricas de entrada secundária
As métricas de entrada secundária mostram como os algoritmos e os padrões de acesso de entradas secundárias afetam o desempenho do pipeline. Quando o pipeline usa uma entrada secundária, o Dataflow grava a coleção em uma camada permanente (como um disco) e as transformações são lidas nessa coleção permanente. Essas leituras e gravações afetam o tempo de execução do job.
A interface de monitoramento do Dataflow exibe métricas de entrada secundária quando você seleciona uma transformação que cria ou consome uma coleção de entrada secundária. É possível ver as métricas na seção Métricas de entrada secundária do painel Informações da etapa.
Transformações que criam uma entrada secundária
Se a transformação selecionada criar uma coleção de entrada secundária, a seção Métricas de entrada secundária exibirá o nome da coleção, junto com as seguintes métricas:
- Tempo gasto na gravação: o tempo gasto gravando a coleção de entrada secundária.
- Bytes gravados: o número total de bytes gravados na coleção de entrada secundária.
- Tempo de leitura da entrada secundária e bytes lidos: uma tabela que contém outras métricas para todas as transformações que consomem a coleção de entradas secundárias, chamadas de consumidores de entrada secundária.
A tabela Tempo de leitura da entrada secundária e bytes lidos exibe as seguintes informações para cada consumidor de entrada secundária:
- Consumidor de entrada secundária: o nome da transformação do consumidor de entrada secundária.
- Tempo gasto lendo: o tempo que esse consumidor gastou lendo a coleção de entrada secundária.
- Bytes lidos: o número de bytes que este consumidor leu da coleção de entrada secundária.
Se o pipeline tiver uma transformação composta que crie uma entrada secundária, expanda a transformação composta até que a subtransformação específica que cria a entrada secundária esteja visível. Depois, selecione essa subtransformação para visualizar a seção Métricas de entrada secundária.
A Figura 4 mostra as métricas de entrada secundária de uma transformação que cria uma coleção de entrada secundária.
Transformações que consomem uma ou mais entradas secundárias
Se a transformação selecionada consumir uma ou mais entradas secundárias, a seção Métricas de entrada secundária exibirá a tabela Tempo de leitura da entrada secundária e bytes lidos. Nesta tabela, as seguintes informações para cada coleção de entrada secundária são exibidas:
- Coleção de entrada secundária: o nome da coleção de entrada secundária.
- Tempo gasto na leitura: o tempo que a transformação gastou lendo a coleção de entrada secundária.
- Bytes lidos: o número de bytes que a transformação leu da coleção de entrada secundária.
Se o pipeline tiver uma transformação composta que crie uma entrada secundária, expanda a transformação composta até que a subtransformação específica que lê a entrada secundária esteja visível. Depois, selecione essa subtransformação para visualizar a seção Métricas de entrada secundária.
A Figura 5 mostra as métricas de entrada secundária de uma transformação que lê uma coleção de entrada secundária.
Identificar problemas de desempenho em entradas secundárias
A reiteração é um problema comum de desempenho da entrada secundária. Se a entrada secundária PCollection
for muito grande, os workers não poderão armazenar em cache a coleção inteira na memória.
Como resultado, os workers leem repetidamente a coleção de entrada secundária permanente.
Na figura 6, as métricas de entrada secundária mostram que o total de bytes lidos da coleção de entrada secundária é muito maior que o tamanho da coleção (total de bytes gravados).
Para melhorar o desempenho desse pipeline, recrie o algoritmo para evitar iterar ou refazer os dados de entrada secundária. Neste exemplo, o pipeline cria o produto cartesiano de duas coleções. O algoritmo itera toda a coleção de entrada secundária para cada elemento da coleção principal. Você pode melhorar o padrão de acesso do pipeline ao agrupar vários elementos da coleção principal. Essa alteração reduz o número de vezes que os workers releem a coleção de entrada secundária.
Outro problema comum de desempenho pode ocorrer se o pipeline executar uma mesclagem aplicando uma ParDo
com uma ou mais entradas secundárias grandes. Nesse caso,
os workers dedicam uma grande porcentagem do tempo de processamento para a operação de junção que leem as
coleções de entrada secundárias.
A Figura 7 mostra um exemplo de métricas de entrada secundária para esse problema:
Para melhorar o desempenho desse pipeline, use CoGroupByKey em vez de entradas secundárias.