Resolva problemas de tarefas de streaming lentas ou bloqueadas

Esta página explica como resolver problemas de causas comuns de tarefas de streaming do Dataflow lentas ou bloqueadas.

Se notar os seguintes sintomas, a tarefa de streaming do Dataflow pode estar a ser executada lentamente ou bloqueada:

Use as informações nas secções seguintes para identificar e diagnosticar o problema.

Identifique a causa principal

  1. Verifique as métricas de atualidade dos dados e bytes pendentes.

    • Se ambas as métricas estiverem a aumentar monotonicamente, significa que o pipeline está bloqueado e não está a progredir.
    • Se a atualidade dos dados estiver a aumentar, mas os bytes pendentes permanecerem normais, significa que um ou mais itens de trabalho estão bloqueados no pipeline.

    Procure as fases em que estas métricas estão a aumentar para identificar qualquer fase com problemas e as operações realizadas nessa fase.

  2. Consulte o gráfico de processamento paralelo para ver se alguma fase está bloqueada devido a paralelismo excessivo ou insuficiente. Consulte o artigo Resolva problemas de paralelismo.

  3. Verifique os registos de tarefas quanto a problemas, como limites de quota, problemas de rutura de stock ou esgotamento de endereços IP.

  4. Verifique se existem avisos e erros nos registos do trabalhador.

    • Se os registos do trabalhador contiverem erros, veja o rastreio da pilha. Investigue se o erro é causado por um erro no seu código.
    • Procure erros do Dataflow. Consulte o artigo Resolva problemas de erros do Dataflow.
    • Procure erros que mostrem que a tarefa excedeu um limite, como o tamanho máximo da mensagem do Pub/Sub.
    • Procure erros de falta de memória, que podem causar um pipeline bloqueado. Se vir erros de falta de memória, siga os passos em Resolva problemas de falta de memória do Dataflow.
    • Para identificar um passo lento ou bloqueado, verifique as mensagens Operation ongoing nos registos do trabalhador. Veja o rastreio de pilha para ver onde o passo está a gastar tempo. Para mais informações, consulte o artigo Processamento bloqueado ou operação em curso.
  5. Se um item de trabalho estiver bloqueado num trabalhador específico, reinicie a VM desse trabalhador.

  6. Se não estiver a usar o Streaming Engine, verifique os registos do misturador quanto a avisos e erros. Se vir um erro de limite de tempo de RPC na porta 12345 ou 12346, a sua tarefa pode não ter uma regra de firewall. Consulte o artigo Regras de firewall para o Dataflow.

  7. Verifique se existem teclas de atalho.

  8. Se o Runner v2 estiver ativado, verifique se existem erros nos registos de preparação. Para mais informações, consulte o artigo Resolva problemas do Runner v2.

Investigue falhas repetidas

Num trabalho de streaming, algumas falhas são repetidas indefinidamente. Estas novas tentativas impedem o progresso do pipeline. Para identificar falhas repetidas, verifique as exceções nos registos do trabalhador.

Identifique trabalhadores com problemas de saúde

Se os trabalhadores que processam a tarefa de streaming não estiverem em bom estado, a tarefa pode ser lenta ou parecer bloqueada. Para identificar trabalhadores com problemas:

Identifique elementos isolados

Um elemento atrasado é um item de trabalho que é lento em relação a outros itens de trabalho na fase. Para obter informações sobre como identificar e corrigir valores atípicos, consulte o artigo Resolva problemas de valores atípicos em tarefas de streaming.

Resolva problemas de paralelismo

Para escalabilidade e eficiência, o Dataflow executa as fases do seu pipeline em paralelo em vários trabalhadores. A unidade mais pequena de processamento paralelo no Dataflow é uma chave. As mensagens recebidas para cada fase combinada estão associadas a uma chave. A chave é definida de uma das seguintes formas:

  • A chave é definida implicitamente pelas propriedades da origem, como as partições do Kafka.
  • A chave é definida explicitamente pela lógica de agregação no pipeline, como GroupByKey.

No Dataflow, os threads de trabalho são responsáveis pelo processamento de conjuntos de trabalho (mensagens) para uma chave. O número de threads disponíveis para processar as chaves da tarefa é igual a num_of_workers * threads_per_worker. O número de threads por trabalhador é determinado com base no SDK (Java, Python ou Go) e no tipo de tarefa (em lote ou em streaming).

Se o pipeline não tiver chaves suficientes para uma determinada fase, limita o processamento paralelo. Essa fase pode tornar-se um gargalo.

Se o pipeline usar um número muito grande de chaves para uma determinada fase, pode limitar a taxa de transferência da fase e acumular atrasos nas fases anteriores, porque existe alguma sobrecarga por chave. As despesas gerais podem incluir a comunicação de back-end com os trabalhadores, RPCs externos para um destino, como o BigQuery, e outro processamento. Por exemplo, se o processamento de uma chave com uma mensagem demorar 100 ms, também pode demorar cerca de 100 ms a processar 1000 mensagens nesse conjunto de chaves.

Identifique fases com baixo paralelismo

Para identificar se a lentidão do pipeline é causada por um paralelismo baixo, veja as métricas de utilização da CPU. Se a utilização da CPU for baixa, mas estiver distribuída uniformemente pelos trabalhadores, a sua tarefa pode ter um paralelismo insuficiente. Se o seu trabalho estiver a usar o Streaming Engine, para ver se uma fase tem um paralelismo baixo, no separador Job Metrics, veja as métricas de paralelismo. Para mitigar este problema:

Identifique fases com elevado paralelismo

Uma combinação de baixa latência do sistema, atualidade crescente dos dados e aumento da fila de espera e das CPUs de processamento subutilizadas sugere que o pipeline está a ser limitado devido a um grande número de chaves. Consulte o gráfico de processamento paralelo para identificar fases com um grande número de chaves.

As transformações, como Reshuffle, podem gerar milhões de chaves se não especificar explicitamente withNumBuckets. Um grande número de chaves pode levar à criação de vários conjuntos de trabalho mais pequenos, cada um dos quais requer uma thread de trabalho dedicada para processamento. Uma vez que as threads de trabalho disponíveis são limitadas, pode ocorrer um atraso significativo no processamento de chaves, o que causa atrasos à medida que aguardam recursos. Consequentemente, os threads de trabalho não são usados de forma eficiente.

Recomendamos que limite o número de chaves definindo a opção withNumBuckets na transformação Reshuffle. O valor não deve exceder o número total de threads em todos os trabalhadores. A segmentação de chaves (threads_per_worker * max_workers) no pipeline pode não ser ideal. Por vezes, é possível ter menos chaves e conjuntos maiores, que são processados de forma mais eficiente pelo Dataflow devido à utilização de menos trabalhadores. Um número menor de chaves cria conjuntos de trabalho maiores, o que usa os threads de trabalho de forma eficiente e aumenta o débito da fase.

Se existirem vários passos Reshuffle no pipeline, divida o número total de threads pela contagem de passos Reshuffle para calcular withNumBuckets.

Verifique se existem teclas de atalho

Se as tarefas estiverem distribuídas de forma desigual entre os trabalhadores e a utilização dos trabalhadores for muito desigual, a sua conduta pode ter uma tecla frequente. Uma tecla de atalho é uma tecla que tem muito mais elementos a processar em comparação com outras teclas.

Verifique se existem teclas de atalho através do seguinte filtro de registo:

  resource.type="dataflow_step"
  resource.labels.job_id=JOB_ID
  jsonPayload.line:"hot_key_logger"

Substitua JOB_ID pelo ID da sua tarefa.

Para resolver este problema, siga um ou mais dos seguintes passos:

  • Volte a introduzir a chave dos seus dados. Para gerar novos pares de chave-valor, aplique uma transformação ParDo. Para mais informações, consulte a página de transformação do Java ParDo ou a página de transformação do Python ParDo na documentação do Apache Beam.
  • Use .withFanout nas transformações de combinação. Para mais informações, consulte a classe Combine.PerKey no SDK Java ou a operação with_hot_key_fanout no SDK Python.
  • Se tiver um pipeline Java que processe dados ilimitados de grande volume PCollections, recomendamos que faça o seguinte:
    • Use Combine.Globally.withFanout em vez de Combine.Globally.
    • Use Combine.PerKey.withHotKeyFanout em vez de Count.PerKey.

Verifique se tem quota insuficiente

Certifique-se de que tem quota suficiente para a origem e o destino. Por exemplo, se o seu pipeline ler entradas do Pub/Sub ou do BigQuery, o seu projeto da Google Cloud Platform pode ter uma quota insuficiente. Para mais informações sobre os limites de quota destes serviços, consulte a quota do Pub/Sub ou a quota do BigQuery.

Se a sua tarefa estiver a gerar um número elevado de erros 429 (Rate Limit Exceeded), pode ter uma quota insuficiente. Para verificar se existem erros, experimente os seguintes passos:

  1. Aceda à Google Cloud consola.
  2. No painel de navegação, clique em APIs e serviços.
  3. No menu, clique em Biblioteca.
  4. Use a caixa de pesquisa para pesquisar Pub/Sub.
  5. Clique em API Cloud Pub/Sub.
  6. Clique em Gerir.
  7. No gráfico Tráfego por código de resposta, procure códigos de erro do cliente (4xx).

Também pode usar o Explorador de métricas para verificar a utilização da quota. Se o seu pipeline usar uma origem ou um destino do BigQuery, para resolver problemas de quotas, use as métricas da API BigQuery Storage. Por exemplo, para criar um gráfico que mostre a quantidade de ligações simultâneas do BigQuery, siga estes passos:

  1. Na Google Cloud consola, selecione Monitorização:

    Aceder a Monitorização

  2. No painel de navegação, selecione Explorador de métricas.

  3. No painel Selecionar uma métrica, para Métrica, filtre por Projeto do BigQuery > Escrever > contagem de ligações simultâneas.

Para ver instruções sobre como ver métricas do Pub/Sub, consulte o artigo Monitorize a utilização da quota em "Monitorize o Pub/Sub no Cloud Monitoring". Para ver instruções sobre como ver as métricas do BigQuery, consulte o artigo Veja a utilização e os limites de quotas em "Crie painéis de controlo, gráficos e alertas".

Ferramentas para depuração

Quando tem um pipeline lento ou bloqueado, as seguintes ferramentas podem ajudar a diagnosticar o problema.

  • Para correlacionar incidentes e identificar gargalos, use o Cloud Monitoring para o Dataflow.
  • Para monitorizar o desempenho do pipeline, use o Cloud Profiler.
  • Algumas transformações são mais adequadas para pipelines de grande volume do que outras. As mensagens de registo podem identificar uma transformação de utilizador bloqueada em pipelines de processamento em lote ou de streaming.
  • Para saber mais sobre uma tarefa bloqueada, use as métricas de tarefas do Dataflow. A lista seguinte inclui métricas úteis:
    • A métrica Bytes pendentes (backlog_bytes) mede a quantidade de entrada não processada em bytes por fase. Use esta métrica para encontrar um passo fundido que não tenha débito. Da mesma forma, a métrica de elementos pendentes (backlog_elements) mede o número de elementos de entrada não processados para uma fase.
    • A métrica Chaves de paralelismo de processamento (processing_parallelism_keys) mede o número de chaves de processamento paralelo para uma fase específica do pipeline nos últimos cinco minutos. Use esta métrica para investigar das seguintes formas:
      • Restrinja o problema a fases específicas e confirme os avisos de teclas de atalho, como A hot key ... was detected.
      • Encontre restrições de débito causadas por paralelismo insuficiente. Estes gargalos podem resultar em pipelines lentos ou bloqueados.
    • A métrica Atraso do sistema (system_lag) e a métrica de atraso do sistema por fase (per_stage_system_lag) medem a quantidade máxima de tempo que um item de dados está a ser processado ou a aguardar processamento. Use estas métricas para identificar fases ineficientes e gargalos das origens de dados.

Para métricas adicionais que não estão incluídas na interface Web de monitorização do Dataflow, consulte a lista completa de métricas do Dataflow nas métricas da Google Cloud Platform.