Resolva problemas e depure pipelines do Dataflow

Esta página oferece dicas de resolução de problemas e estratégias de depuração que podem ser úteis se tiver problemas ao criar ou executar o seu pipeline do Dataflow. Estas informações podem ajudar a detetar uma falha no pipeline, determinar o motivo de uma execução do pipeline com falha e sugerir algumas medidas para corrigir o problema.

O diagrama seguinte mostra o fluxo de trabalho de resolução de problemas do Dataflow descrito nesta página.

Um diagrama que mostra o fluxo de trabalho de resolução de problemas.

O Dataflow fornece feedback em tempo real sobre a sua tarefa e existe um conjunto básico de passos que pode usar para verificar as mensagens de erro, os registos e as condições, como o progresso da tarefa ter parado.

Para orientações sobre erros comuns que pode encontrar ao executar a tarefa do Dataflow, consulte o artigo Resolva problemas de erros do Dataflow. Para monitorizar e resolver problemas de desempenho do pipeline, consulte o artigo Monitorize o desempenho do pipeline.

Práticas recomendadas para pipelines

Seguem-se as práticas recomendadas para pipelines Java, Python e Go.

Java

  • Para tarefas em lote, recomendamos que defina um tempo de vida (TTL) para a localização temporária.

  • Antes de configurar o TTL e como prática recomendada geral, certifique-se de que define a localização de preparação e a localização temporária para localizações diferentes.

  • Não elimine os objetos na localização de preparação, uma vez que estes objetos são reutilizados.

  • Se uma tarefa for concluída ou interrompida e os objetos temporários não forem limpos, remova manualmente estes ficheiros do contentor do Cloud Storage que é usado como uma localização temporária.

Python

As localizações temporárias e de teste têm o prefixo <job_name>.<time>.

  • Certifique-se de que define a localização de preparação e a localização temporária para localizações diferentes.

  • Se necessário, elimine os objetos na localização de preparação após a conclusão ou a paragem de uma tarefa. Além disso, os objetos preparados não são reutilizados em pipelines Python.

  • Se uma tarefa terminar e os objetos temporários não forem limpos, remova manualmente estes ficheiros do contentor do Cloud Storage que é usado como uma localização temporária.

  • Para tarefas em lote, recomendamos que defina um tempo de vida (TTL) para as localizações temporárias e de preparação.

Go

  • As localizações temporárias e de teste têm o prefixo <job_name>.<time>.

  • Certifique-se de que define a localização de preparação e a localização temporária para localizações diferentes.

  • Se necessário, elimine os objetos na localização de preparação após a conclusão ou a paragem de uma tarefa. Além disso, os objetos preparados não são reutilizados em pipelines Go.

  • Se uma tarefa terminar e os objetos temporários não forem limpos, remova manualmente estes ficheiros do contentor do Cloud Storage que é usado como uma localização temporária.

  • Para tarefas em lote, recomendamos que defina um tempo de vida (TTL) para as localizações temporárias e de preparação.

Verifique o estado do seu pipeline

Pode detetar erros nas execuções do pipeline através da interface de monitorização do Dataflow.

  1. Aceda à Google Cloud consola.
  2. Selecione o seu projeto da Google Cloud Platform na lista de projetos.
  3. No menu de navegação, em Big Data, clique em Dataflow. É apresentada uma lista de tarefas em execução no painel do lado direito.
  4. Selecione a tarefa de pipeline que quer ver. Pode ver o estado das tarefas rapidamente no campo Estado: "Em execução", "Concluído" ou "Com falhas".
Uma lista de tarefas do Dataflow na Developers Console com tarefas nos estados em execução, concluído e com falhas.
Figura 1: uma lista de tarefas do Dataflow na Developers Console com tarefas nos estados em execução, concluído e com falhas.

Encontre informações sobre falhas de pipelines

Se uma das tarefas do pipeline falhar, pode selecionar a tarefa para ver informações mais detalhadas sobre os erros e os resultados da execução. Quando seleciona uma tarefa, pode ver os principais gráficos do pipeline, o gráfico de execução, o painel Informações da tarefa e o painel Registos com os separadores Registos de tarefas, Registos de trabalhadores, Diagnósticos e Recomendações.

Verifique as mensagens de erro de tarefas

Para ver os registos de tarefas gerados pelo código do pipeline e pelo serviço Dataflow, no painel Registos, clique em Mostrar.

Pode filtrar as mensagens apresentadas nos Registos de tarefas clicando em Informações e Filtrar. Para apresentar apenas mensagens de erro, clique em Informações e selecione Erro.

Para expandir uma mensagem de erro, clique na secção expansível .

O painel de registos mostra registos de tarefas com uma expansão de mensagem de erro realçada.

Em alternativa, pode clicar no separador Diagnósticos. Este separador mostra onde ocorreram erros ao longo da cronologia escolhida, uma contagem de todos os erros registados e possíveis recomendações para o seu pipeline.

Um separador de diagnósticos com dois erros comunicados.

Veja os registos de passos da sua tarefa

Quando seleciona um passo no gráfico do pipeline, o painel de registos alterna entre a apresentação dos registos de tarefas gerados pelo serviço Dataflow e a apresentação dos registos das instâncias do Compute Engine que executam o passo do pipeline.

Um passo do pipeline selecionado com os registos do trabalhador do passo realçados.

O Cloud Logging combina todos os registos recolhidos das instâncias do Compute Engine do seu projeto num único local. Consulte o artigo Registar mensagens de pipeline para mais informações sobre a utilização das várias capacidades de registo do Dataflow.

Resolva a rejeição de pipelines automatizados

Em alguns casos, o serviço Dataflow identifica que o seu pipeline pode acionar problemas conhecidos do SDK. Para evitar o envio de pipelines que provavelmente vão ter problemas, o Dataflow rejeita automaticamente o pipeline e apresenta a seguinte mensagem:

The workflow was automatically rejected by the service because it might trigger an
identified bug in the SDK (details below). If you think this identification is
in error, and would like to override this automated rejection, please re-submit
this workflow with the following override flag: [OVERRIDE FLAG].
Bug details: [BUG DETAILS].
Contact Google Cloud Support for further help.
Please use this identifier in your communication: [BUG ID].

Depois de ler as ressalvas nos detalhes do erro indicados no link, se quiser tentar executar o pipeline na mesma, pode substituir a rejeição automática. Adicione a flag --experiments=<override-flag> e reenvie o pipeline.

Determine a causa de uma falha no pipeline

Normalmente, a execução de um pipeline do Apache Beam com falhas pode ser atribuída a uma das seguintes causas:

  • Erros de construção de gráficos ou pipelines. Estes erros ocorrem quando o Dataflow encontra um problema ao criar o gráfico de passos que compõem o seu pipeline, conforme descrito pelo seu pipeline do Apache Beam.
  • Erros na validação de tarefas. O serviço Dataflow valida qualquer tarefa de pipeline que iniciar. Os erros no processo de validação podem impedir a criação ou a execução bem-sucedida da tarefa. Os erros de validação podem incluir problemas com o contentor do Cloud Storage do seu projeto ou com as autorizações do seu projeto. Google Cloud
  • Exceções no código do trabalhador. Estes erros ocorrem quando existem erros ou erros no código fornecido pelo utilizador que o Dataflow distribui para trabalhadores paralelos, como as instâncias de uma transformação DoFnParDo.
  • Erros causados por falhas temporárias noutros Google Cloud serviços. O seu pipeline pode falhar devido a uma indisponibilidade temporária ou a outro problema nos Google Cloud serviços dos quais o Dataflow depende, como o Compute Engine ou o Cloud Storage.

Detetar erros de construção de gráficos ou pipelines

Pode ocorrer um erro de construção de um gráfico quando o Dataflow está a criar o gráfico de execução para o seu pipeline a partir do código no seu programa do Dataflow. Durante o tempo de criação do gráfico, o Dataflow verifica a existência de operações ilegais.

Se o Dataflow detetar um erro na construção do gráfico, tenha em atenção que não é criado nenhum trabalho no serviço Dataflow. Assim, não vê feedback na interface de monitorização do Dataflow. Em alternativa, é apresentada uma mensagem de erro semelhante à seguinte na consola ou na janela do terminal onde executou o pipeline do Apache Beam:

Java

Por exemplo, se o seu pipeline tentar realizar uma agregação como GroupByKey num PCollection não acionado, sem limites e com janelas globais, é apresentada uma mensagem de erro semelhante à seguinte:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Python

Por exemplo, se o seu pipeline usar sugestões de tipo e o tipo de argumento numa das transformações não for o esperado, ocorre uma mensagem de erro semelhante à seguinte:

... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>

Go

Por exemplo, se o seu pipeline usar um `DoFn` que não receba nenhuma entrada, ocorre uma mensagem de erro semelhante à seguinte:

... panic: Method ProcessElement in DoFn main.extractFn is missing all inputs. A main input is required.
... Full error:
...     inserting ParDo in scope root/CountWords
...     graph.AsDoFn: for Fn named main.extractFn
... ProcessElement method has no main inputs

... goroutine 1 [running]:
... github.com/apache/beam/sdks/v2/go/pkg/beam.MustN(...)
... (more stacktrace)

Se encontrar um erro deste tipo, verifique o código do pipeline para garantir que as operações do pipeline são legais.

Detete erros na validação de tarefas do Dataflow

Assim que o serviço Dataflow receber o gráfico do seu pipeline, o serviço tenta validar a tarefa. Esta validação inclui o seguinte:

  • Certificar-se de que o serviço pode aceder aos contentores do Cloud Storage associados ao seu trabalho para preparação de ficheiros e saída temporária.
  • Verificar as autorizações necessárias no seu Google Cloud projeto.
  • Garantir que o serviço consegue aceder a origens de entrada e saída, como ficheiros.

Se a tarefa falhar no processo de validação, é apresentada uma mensagem de erro na interface de monitorização do Dataflow, bem como na janela da consola ou do terminal se estiver a usar a execução de bloqueio. A mensagem de erro é semelhante à seguinte:

Java

INFO: To access the Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Python

INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477]
... Checking required Cloud APIs are enabled.
... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING.
... Combiner lifting skipped for step group: GroupByKey not followed by a combiner.
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
... Fusing consumer split into read
...
... Starting 1 workers...
...
... Executing operation read+split+pair_with_one+group/Reify+group/Write
... Executing failure step failure14
... Workflow failed.
Causes: ... read+split+pair_with_one+group/Reify+group/Write failed.
Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt.
... Cleaning up.
... Tearing down pending resources...
INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.

Go

A validação de tarefas descrita nesta secção não é atualmente suportada para o Go. Os erros devido a estes problemas aparecem como exceções de trabalhadores.

Detete uma exceção no código do trabalhador

Enquanto a tarefa está em execução, pode encontrar erros ou exceções no código do trabalhador. Geralmente, estes erros significam que os DoFns no código da pipeline geraram exceções não processadas, o que resulta em tarefas falhadas no trabalho do Dataflow.

As exceções no código do utilizador (por exemplo, as suas instâncias DoFn) são comunicadas na interface de monitorização do fluxo de dados. Se executar o pipeline com a execução de bloqueio, as mensagens de erro são apresentadas na janela da consola ou do terminal, como as seguintes:

Java

INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
        at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.

Python

INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
...
INFO:root:... Expanding GroupByKey operations into optimizable parts.
INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:... Annotating graph with Autotuner information.
INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
INFO:root:...: Starting 1 workers...
INFO:root:...: Executing operation group/Create
INFO:root:...: Value "group/Session" materialized.
INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: ...: Workers have started successfully.
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: Traceback (most recent call last):
  File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task)
  ...
  File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda>
ValueError: invalid literal for int() with base 10: 'www'

Go

... 2022-05-26T18:32:52.752315397Zprocess bundle failed for instruction
...     process_bundle-4031463614776698457-2 using plan s02-6 : while executing
...     Process for Plan[s02-6] failed: Oh no! This is an error message!

Considere proteger-se contra erros no seu código adicionando processadores de exceções. Por exemplo, se quiser ignorar elementos que falham alguma validação de entrada personalizada feita num ParDo, processe a exceção no seu DoFn e ignore o elemento.

Também pode acompanhar os elementos com falhas de várias formas:

  • Pode registar os elementos com falhas e verificar o resultado através do Cloud Logging.
  • Pode verificar os registos de início do worker e do worker do Dataflow para ver avisos ou erros seguindo as instruções em Ver registos.
  • Pode fazer com que o ParDo escreva os elementos com falhas num output adicional para inspeção posterior.

Para acompanhar as propriedades de um pipeline em execução, pode usar a classe Metrics, conforme mostrado no exemplo seguinte:

Java

final Counter counter = Metrics.counter("stats", "even-items");
PCollection<Integer> input = pipeline.apply(...);
...
input.apply(ParDo.of(new DoFn<Integer, Integer>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    if (c.element() % 2 == 0) {
      counter.inc();
    }
});

Python

class FilterTextFn(beam.DoFn):
      """A DoFn that filters for a specific key based on a regex."""

      def __init__(self, pattern):
        self.pattern = pattern
        # A custom metric can track values in your pipeline as it runs. Create
        # custom metrics to count unmatched words, and know the distribution of
        # word lengths in the input PCollection.
        self.word_len_dist = Metrics.distribution(self.__class__,
                                                  'word_len_dist')
        self.unmatched_words = Metrics.counter(self.__class__,
                                               'unmatched_words')

      def process(self, element):
        word = element
        self.word_len_dist.update(len(word))
        if re.match(self.pattern, word):
          yield element
        else:
          self.unmatched_words.inc()

    filtered_words = (
        words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))

Go

func addMetricDoFnToPipeline(s beam.Scope, input beam.PCollection) beam.PCollection {
    return beam.ParDo(s, &MyMetricsDoFn{}, input)
}

func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metrics.QueryResults, error) {
    pr, err := beam.Run(ctx, runner, p)
    if err != nil {
        return metrics.QueryResults{}, err
    }

    // Request the metric called "counter1" in namespace called "namespace"
    ms := pr.Metrics().Query(func(r beam.MetricResult) bool {
        return r.Namespace() == "namespace" && r.Name() == "counter1"
    })

    // Print the metric value - there should be only one line because there is
    // only one metric called "counter1" in the namespace called "namespace"
    for _, c := range ms.Counters() {
        fmt.Println(c.Namespace(), "-", c.Name(), ":", c.Committed)
    }
    return ms, nil
}

type MyMetricsDoFn struct {
    counter beam.Counter
}

func init() {
    beam.RegisterType(reflect.TypeOf((*MyMetricsDoFn)(nil)))
}

func (fn *MyMetricsDoFn) Setup() {
    // While metrics can be defined in package scope or dynamically
    // it's most efficient to include them in the DoFn.
    fn.counter = beam.NewCounter("namespace", "counter1")
}

func (fn *MyMetricsDoFn) ProcessElement(ctx context.Context, v beam.V, emit func(beam.V)) {
    // count the elements
    fn.counter.Inc(ctx, 1)
    emit(v)
}

Resolva problemas de pipelines lentos ou falta de saída

Consulte as seguintes páginas:

Erros comuns e cursos de ação

Quando souber o erro que causou a falha do pipeline, consulte a página Resolva problemas de erros do Dataflow para obter orientações sobre a resolução de problemas de erros.