Resolução de problemas de DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página fornece passos de resolução de problemas e informações para problemas comuns de fluxo de trabalho.

Alguns problemas de execução de DAGs podem ser causados pelo agendador do Airflow que não está a funcionar corretamente ou de forma ideal. Siga as instruções de resolução de problemas do programador para resolver estes problemas.

Fluxo de trabalho de resolução de problemas

Para começar a resolver problemas:

  1. Verifique os registos do Airflow.

    Pode aumentar o nível de registo do Airflow substituindo a seguinte opção de configuração do Airflow.

    Secção Chave Valor
    logging (core no Airflow 1) logging_level O valor predefinido é INFO. Defina como DEBUG para obter mais detalhe nas mensagens de registo.
  2. Consulte o painel de controlo de monitorização.

  3. Reveja o Cloud Monitoring.

  4. Na Google Cloud consola, verifique se existem erros nas páginas dos componentes do seu ambiente.

  5. Na interface Web do Airflow, verifique na vista de gráfico do DAG se existem instâncias de tarefas com falhas.

    Secção Chave Valor
    webserver dag_orientation LR, TB, RL ou BT

Depurar falhas do operador

Para depurar uma falha do operador:

  1. Verifique se existem erros específicos da tarefa.
  2. Verifique os registos do Airflow.
  3. Reveja o Cloud Monitoring.
  4. Verifique os registos específicos do operador.
  5. Corrija os erros.
  6. Carregue o DAG para a pasta /dags.
  7. Na interface Web do Airflow, limpe os estados anteriores do DAG.
  8. Retome ou execute o DAG.

Resolução de problemas de execução de tarefas

O Airflow é um sistema distribuído com muitas entidades, como o agendador, o executor e os trabalhadores, que comunicam entre si através de uma fila de tarefas e da base de dados do Airflow, e enviam sinais (como SIGTERM). O diagrama seguinte mostra uma vista geral das interconexões entre os componentes do Airflow.

Interação entre componentes do Airflow
Figura 1. Interação entre os componentes do Airflow (clique para aumentar)

Num sistema distribuído como o Airflow, podem existir alguns problemas de conetividade de rede ou a infraestrutura subjacente pode ter problemas intermitentes. Isto pode levar a situações em que as tarefas podem falhar e ser reagendadas para execução, ou as tarefas podem não ser concluídas com êxito (por exemplo, tarefas zombie ou tarefas que ficaram bloqueadas na execução). O Airflow tem mecanismos para lidar com estas situações e retomar automaticamente o funcionamento normal. As secções seguintes explicam os problemas comuns que ocorrem durante a execução de tarefas pelo Airflow.

As tarefas falham sem emitir registos

A tarefa falha sem emitir registos devido a erros de análise DAG

Por vezes, podem existir erros DAG subtis que levam a uma situação em que o programador do Airflow pode agendar tarefas para execução, o processador DAG pode analisar o ficheiro DAG, mas, em seguida, o trabalhador do Airflow não consegue executar tarefas do DAG porque existem erros de programação no ficheiro DAG. Isto pode levar a uma situação em que uma tarefa do Airflow é marcada como Failede não existe um registo da sua execução.

Soluções:

  • Verifique nos registos do trabalhador do Airflow se existem erros gerados pelo trabalhador do Airflow relacionados com um DAG em falta ou erros de análise do DAG.

  • Aumente os parâmetros relacionados com a análise DAG:

    • Aumente [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] para, pelo menos, 120 segundos (ou mais, se necessário).

    • Aumente o valor de dag-file-processor-timeout para, pelo menos, 180 segundos (ou mais, se necessário). Este valor tem de ser superior a dagbag-import-timeout.

  • Veja também o artigo Resolução de problemas do processador DAG.

As tarefas são interrompidas abruptamente

Durante a execução de tarefas, os trabalhadores do Airflow podem ser terminados abruptamente devido a problemas não relacionados especificamente com a própria tarefa. Consulte as causas principais comuns para ver uma lista desses cenários e possíveis soluções. As secções seguintes abordam alguns sintomas adicionais que podem ter origem nessas causas principais:

Tarefas zombie

O Airflow deteta dois tipos de incompatibilidade entre uma tarefa e um processo que executa a tarefa:

  • As tarefas zombie são tarefas que deveriam estar em execução, mas não estão. Isto pode acontecer se o processo da tarefa tiver sido terminado ou não estiver a responder, se o trabalhador do Airflow não tiver comunicado o estado de uma tarefa a tempo porque está sobrecarregado ou se a VM onde a tarefa é executada tiver sido encerrada. O Airflow encontra essas tarefas periodicamente e falha ou tenta novamente a tarefa, consoante as definições da tarefa.

    Descubra tarefas zombie

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • As tarefas mortas são tarefas que não deveriam estar em execução. O Airflow encontra essas tarefas periodicamente e termina-as.

Consulte as Causas principais comuns para ver informações adicionais sobre como resolver problemas de tarefas órfãs.

Sinais SIGTERM

Os sinais SIGTERM são usados pelo Linux, Kubernetes, programador do Airflow e Celery para terminar processos responsáveis pela execução de trabalhadores ou tarefas do Airflow.

Podem existir vários motivos pelos quais os sinais SIGTERM são enviados num ambiente:

  • Uma tarefa tornou-se uma tarefa fantasma e tem de ser interrompida.

  • O agendador detetou um duplicado de uma tarefa e envia sinais de instância de terminação e SIGTERM para a tarefa para a parar.

  • Na escala automática horizontal de pods, o plano de controlo do GKE envia sinais SIGTERM para remover os pods que já não são necessários.

  • O agendador pode enviar sinais SIGTERM para o processo DagFileProcessorManager. Estes sinais SIGTERM são usados pelo Scheduler para gerir o ciclo de vida do processo DagFileProcessorManager e podem ser ignorados em segurança.

    Exemplo:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condição de corrida entre a chamada de retorno do heartbeat e as chamadas de retorno de saída no local_task_job, que monitoriza a execução da tarefa. Se o heartbeat detetar que uma tarefa foi marcada como bem-sucedida, não consegue distinguir se a tarefa em si foi bem-sucedida ou se foi dito ao Airflow para considerar a tarefa bem-sucedida. No entanto, termina um executor de tarefas sem esperar que este termine.

    Estes sinais SIGTERM podem ser ignorados com segurança. A tarefa já está no estado de êxito e a execução da execução do DAG como um todo não será afetada.

    A entrada de registo Received SIGTERM. é a única diferença entre a saída normal e a terminação da tarefa no estado de êxito.

    Condição de corrida entre o heartbeat e os callbacks de saída
    Figura 2. Condição de corrida entre o heartbeat e os callbacks de saída (clique para aumentar)
  • Um componente do Airflow usa mais recursos (CPU, memória) do que o permitido pelo nó do cluster.

  • O serviço GKE realiza operações de manutenção e envia sinais SIGTERM para os pods que são executados num nó que está prestes a ser atualizado.

    Quando uma instância de tarefa é terminada com SIGTERM, pode ver as seguintes entradas de registo nos registos de um trabalhador do Airflow que executou a tarefa:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Possíveis soluções:

Este problema ocorre quando uma VM que executa a tarefa fica sem memória. Isto não está relacionado com as configurações do Airflow, mas sim com a quantidade de memória disponível para a VM.

  • No Cloud Composer 1, pode recriar o seu ambiente usando um tipo de máquina com melhor desempenho.

  • Pode diminuir o valor da opção de configuração do [celery]worker_concurrencyconcurrency Airflow. Esta opção determina quantas tarefas são executadas em simultâneo por um determinado worker do Airflow.

A tarefa do Airflow foi interrompida por Negsignal.SIGKILL

Por vezes, a sua tarefa pode estar a usar mais memória do que a atribuída ao trabalhador do Airflow. Nesta situação, pode ser interrompido por Negsignal.SIGKILL. O sistema envia este sinal para evitar um maior consumo de memória que possa afetar a execução de outras tarefas do Airflow. No registo do trabalhador do Airflow, pode ver a seguinte entrada de registo:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL também pode aparecer como o código -9.

Possíveis soluções:

  • Diminuição do worker_concurrency de trabalhadores do Airflow.

  • Atualize para um tipo de máquina maior usado no cluster do Cloud Composer.

  • Otimize as suas tarefas para usar menos memória.

A tarefa falha devido à pressão dos recursos

Sintoma: durante a execução de uma tarefa, o subprocesso do worker do Airflow responsável pela execução da tarefa do Airflow é interrompido abruptamente. O erro visível no registo do trabalhador do Airflow pode ser semelhante ao apresentado abaixo:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solução:

A tarefa falha devido à remoção do pod

Os pods do Google Kubernetes Engine estão sujeitos ao ciclo de vida dos pods do Kubernetes e à expulsão de pods. Os picos de tarefas e a co-programação de trabalhadores são duas das causas mais comuns para a remoção de pods no Cloud Composer.

A remoção de um pod pode ocorrer quando um determinado pod usa excessivamente os recursos de um nó, relativamente às expetativas de consumo de recursos configuradas para o nó. Por exemplo, a expulsão pode ocorrer quando várias tarefas com utilização intensiva de memória são executadas num pod e a respetiva carga combinada faz com que o nó onde este pod é executado exceda o limite de consumo de memória.

Se um pod de trabalho do Airflow for removido, todas as instâncias de tarefas em execução nesse pod são interrompidas e, posteriormente, são marcadas como falhadas pelo Airflow.

Os registos são colocados em buffer. Se um pod de trabalho for removido antes de o buffer ser esvaziado, os registos não são emitidos. A falha da tarefa sem registos é uma indicação de que os trabalhadores do Airflow foram reiniciados devido a falta de memória (OOM). Alguns registos podem estar presentes no Cloud Logging, mesmo que os registos do Airflow não tenham sido emitidos.

Para ver registos:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Registos.

  4. Veja os registos de trabalhadores individuais do Airflow em Todos os registos > Registos do Airflow > Trabalhadores.

Sintoma:

  1. Na Google Cloud consola, aceda à página Workloads.

    Aceda a Cargas de trabalho

  2. Se existirem airflow-workerpods que mostrem Evicted, clique em cada pod removido e procure a mensagem The node was low on resource: memory na parte superior da janela.

Solução:

  • Crie um novo ambiente do Cloud Composer 1 com um tipo de máquina maior do que o tipo de máquina atual.

  • Verifique os registos dos pods airflow-worker para possíveis causas de despejo. Para mais informações sobre como obter registos de Pods individuais, consulte o artigo Resolva problemas com cargas de trabalho implementadas.

  • Certifique-se de que as tarefas no DAG são idempotentes e podem ser repetidas.

  • Evite transferir ficheiros desnecessários para o sistema de ficheiros local dos trabalhadores do Airflow.

    Os trabalhadores do Airflow têm uma capacidade limitada do sistema de ficheiros local. Quando o espaço de armazenamento se esgota, o pod do trabalhador do Airflow é removido pelo plano de controlo do GKE. Esta ação falha todas as tarefas que o trabalhador removido estava a executar.

    Exemplos de operações problemáticas:

    • Transferir ficheiros ou objetos e armazená-los localmente num worker do Airflow. Em alternativa, armazene estes objetos diretamente num serviço adequado, como um contentor do Cloud Storage.
    • Aceder a objetos grandes na pasta /data a partir de um trabalhador do Airflow. O trabalhador do Airflow transfere o objeto para o respetivo sistema de ficheiros local. Em alternativa, implemente os seus DAGs para que os ficheiros grandes sejam processados fora do pod do trabalhador do Airflow.

Origens dos problemas comuns

O trabalhador do Airflow ficou sem memória

Cada trabalhador do Airflow pode executar até [celery]worker_concurrency instâncias de tarefas em simultâneo. Se o consumo de memória cumulativo dessas instâncias de tarefas exceder o limite de memória para um worker do Airflow, um processo aleatório no mesmo é terminado para libertar recursos.

Descubra eventos de falta de memória do worker do Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

Por vezes, a falta de memória num worker do Airflow pode levar ao envio de pacotes com formato incorreto durante uma sessão do SQL Alchemy para a base de dados, para um servidor DNS ou para qualquer outro serviço chamado por um DAG. Neste caso, a outra extremidade da ligação pode rejeitar ou interromper as ligações do trabalhador do Airflow. Por exemplo:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Soluções:

O trabalhador do Airflow foi removido

As remoções de pods são uma parte normal da execução de cargas de trabalho no Kubernetes. O GKE remove pods se ficarem sem armazenamento ou para libertar recursos para cargas de trabalho com uma prioridade mais elevada.

Descubra as expulsões de trabalhadores do Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluções:

  • Se uma remoção for causada pela falta de armazenamento, pode reduzir a utilização de armazenamento ou remover ficheiros temporários assim que não forem necessários. Em alternativa, pode aumentar o armazenamento disponível ou executar cargas de trabalho num pod dedicado com KubernetesPodOperator.

O trabalhador do Airflow foi terminado

Os trabalhadores do Airflow podem ser removidos externamente. Se as tarefas em execução não terminarem durante um período de encerramento controlado, são interrompidas e podem acabar por ser detetadas como zombies.

Descubra as terminações de pods de trabalho do Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Possíveis cenários e soluções:

  • Os trabalhadores do Airflow são reiniciados durante as modificações do ambiente, como atualizações ou instalação de pacotes:

    Descubra modificações do ambiente do Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Pode realizar estas operações quando não estiverem a ser executadas tarefas críticas ou ativar as repetições de tarefas.

  • Vários componentes podem estar temporariamente indisponíveis durante as operações de manutenção.

    Descubra as operações de manutenção do GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Pode especificar períodos de manutenção para minimizar

    se sobrepõe à execução de tarefas críticas.

O trabalhador do Airflow estava sob carga elevada

A quantidade de recursos de CPU e memória disponíveis para um trabalhador do Airflow é limitada pela configuração do ambiente. Se a utilização de recursos se aproximar dos limites, pode causar uma contenção de recursos e atrasos desnecessários durante a execução da tarefa. Em situações extremas, quando faltam recursos durante períodos mais longos, isto pode causar tarefas zombie.

Soluções:

Consultas do Cloud Logging para descobrir os motivos dos reinícios ou despejos de pods

Os ambientes do Cloud Composer usam clusters do GKE como camada de infraestrutura de computação. Nesta secção, pode encontrar consultas úteis que podem ajudar a encontrar motivos para reinícios ou despejos de trabalhadores ou programadores do Airflow.

As consultas apresentadas posteriormente podem ser ajustadas da seguinte forma:

  • Pode especificar a cronologia necessária no Cloud Logging. Por exemplo, as últimas 6 horas, 3 dias ou pode definir o seu intervalo de tempo personalizado.

  • Tem de especificar o nome do cluster do seu ambiente em CLUSTER_NAME.

  • Pode limitar a pesquisa a um Pod específico adicionando o comando POD_NAME.

Descubra contentores reiniciados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Consulta alternativa para limitar os resultados a um agrupamento específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Descubra contentores encerrados devido a um evento de falta de memória

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar os resultados a um agrupamento específico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Descubra contentores que deixaram de ser executados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar os resultados a um agrupamento específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

A base de dados do Airflow estava sob carga elevada

Uma base de dados é usada por vários componentes do Airflow para comunicarem entre si e, em particular, para armazenarem os sinais de pulsação de instâncias de tarefas. A escassez de recursos na base de dados leva a tempos de consulta mais longos e pode afetar a execução de tarefas.

Por vezes, os seguintes erros estão presentes nos registos de um trabalhador do Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Soluções:

A base de dados do Airflow esteve temporariamente indisponível

Um trabalhador do Airflow pode demorar algum tempo a detetar e processar corretamente erros intermitentes, como problemas de conetividade temporários. Pode exceder o limite de deteção de zombies predefinido.

Descubra os limites de tempo de pulsação do Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluções:

  • Aumente o limite de tempo para tarefas inativas e substitua o valor da [scheduler]scheduler_zombie_task_threshold opção de configuração do Airflow:

    Secção Chave Valor Notas
    scheduler scheduler_zombie_task_threshold Novo limite de tempo (em segundos) O valor predefinido é 300

As tarefas falham porque ocorreu um erro durante a execução

A terminar instância

O Airflow usa o mecanismo de instância de terminação para encerrar as tarefas do Airflow. Este mecanismo é usado nas seguintes situações:

  • Quando um agendador termina uma tarefa que não foi concluída a tempo.
  • Quando uma tarefa excede o tempo limite ou é executada durante demasiado tempo.

Quando o Airflow termina instâncias de tarefas, pode ver as seguintes entradas de registo nos registos de um trabalhador do Airflow que executou a tarefa:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possíveis soluções:

  • Verifique se o código da tarefa tem erros que possam fazer com que seja executado durante demasiado tempo.

  • Aumente o valor da opção de [celery_broker_transport_options]visibility_timeout configuração do Airflow.

    Como resultado, o agendador aguarda mais tempo que uma tarefa seja concluída, antes de a considerar uma tarefa zumbi. Esta opção é especialmente útil para tarefas demoradas que duram muitas horas. Se o valor for demasiado baixo (por exemplo, 3 horas), o programador considera as tarefas que são executadas durante 5 ou 6 horas como "suspensas" (tarefas zombi).

  • Aumente o valor da opção de configuração do [core]killed_task_cleanup_timeAirflow.

    Um valor mais longo dá mais tempo aos trabalhadores do Airflow para terminarem as respetivas tarefas de forma elegante. Se o valor for demasiado baixo, as tarefas do Airflow podem ser interrompidas abruptamente, sem tempo suficiente para terminar o respetivo trabalho de forma adequada.

A execução do DAG não termina dentro do tempo esperado

Sintoma:

Por vezes, uma execução de DAG não termina porque as tarefas do Airflow ficam bloqueadas e a execução de DAG dura mais tempo do que o esperado. Em condições normais, as tarefas do Airflow não permanecem indefinidamente no estado de fila ou em execução, porque o Airflow tem procedimentos de tempo limite e limpeza que ajudam a evitar esta situação.

Correção:

  • Use o parâmetro dagrun_timeout para os DAGs. Por exemplo: dagrun_timeout=timedelta(minutes=120). Como resultado, cada execução do DAG tem de ser concluída dentro do limite de tempo da execução do DAG. Para mais informações sobre os estados das tarefas do Airflow, consulte a documentação do Apache Airflow.

  • Use o parâmetro task execution timeout para definir um limite de tempo predefinido para tarefas executadas com base nos operadores do Apache Airflow.

A ligação ao servidor Postgres / MySQL foi perdida durante a execução da tarefa ou imediatamente após a mesma, quando é lançada uma exceção de consulta

As Lost connection to Postgres / MySQL server during queryexceções ocorrem frequentemente quando as seguintes condições são cumpridas:

  • O seu DAG usa PythonOperator ou um operador personalizado.
  • O seu DAG faz consultas à base de dados do Airflow.

Se forem feitas várias consultas a partir de uma função acionável, os rastreios podem apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True) no código do Airflow. Trata-se da primeira consulta à base de dados após a execução da tarefa. A causa real da exceção ocorre antes, quando uma sessão do SQLAlchemy não é fechada corretamente.

As sessões do SQLAlchemy têm âmbito de um segmento e são criadas numa função executável. Posteriormente, a sessão pode ser continuada no código do Airflow. Se existirem atrasos significativos entre as consultas numa sessão, a ligação pode já ter sido fechada pelo servidor Postgres ou MySQL. O limite de tempo da ligação nos ambientes do Cloud Composer está definido para aproximadamente 10 minutos.

Solução:

  • Use o decorador airflow.utils.db.provide_session. Este decorator fornece uma sessão válida à base de dados do Airflow no parâmetro session e fecha corretamente a sessão no final da função.
  • Não use uma única função de execução prolongada. Em alternativa, mova todas as consultas de base de dados para funções separadas, para que existam várias funções com o decorador airflow.utils.db.provide_session. Neste caso, as sessões são fechadas automaticamente após a obtenção dos resultados da consulta.

Interrupções temporárias ao estabelecer ligação à base de dados de metadados do Airflow

O Cloud Composer é executado numa infraestrutura distribuída. Significa que, ocasionalmente, podem surgir alguns problemas temporários que podem interromper a execução das suas tarefas do Airflow.

Nestas situações, pode ver as seguintes mensagens de erro nos registos dos trabalhadores do Airflow:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

ou

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Estes problemas intermitentes também podem ser causados por operações de manutenção realizadas para os seus ambientes do Cloud Composer.

Normalmente, estes erros são intermitentes e, se as suas tarefas do Airflow forem idempotentes e tiver repetições configuradas, não afetam o seu trabalho. Também pode considerar definir períodos de manutenção.

Outro motivo para estes erros pode ser a falta de recursos no cluster do seu ambiente. Nestes casos, pode aumentar a escala ou otimizar o seu ambiente, conforme descrito nas instruções Aumentar a escala dos ambientes ou Otimizar o seu ambiente.

Uma execução de DAG está marcada como bem-sucedida, mas não tem tarefas executadas

Se uma execução de DAG execution_date for anterior à start_date do DAG, pode ver execuções de DAG que não têm execuções de tarefas, mas que ainda estão marcadas como bem-sucedidas.

Uma execução de DAG bem-sucedida sem tarefas executadas
Figura 3. Uma execução de DAG bem-sucedida sem tarefas executadas (clique para aumentar)

Causa

Esta situação pode ocorrer num dos seguintes casos:

  • A não correspondência é causada pela diferença de fuso horário entre o DAG execution_date e o start_date. Por exemplo, isto pode acontecer quando usa pendulum.parse(...) para definir start_date.

  • O start_date do DAG está definido como um valor dinâmico, por exemplo, airflow.utils.dates.days_ago(1)

Solução

  • Certifique-se de que execution_date e start_date estão a usar o mesmo fuso horário.

  • Especifique um start_date estático e combine-o com catchup=False para evitar a execução de DAGs com datas de início no passado.

Práticas recomendadas

Impacto das operações de atualização ou atualização no desempenho das tarefas do Airflow

As operações de atualização ou atualização interrompem as tarefas do Airflow em execução, a menos que uma tarefa seja executada no modo adiável.

Recomendamos que realize estas operações quando esperar um impacto mínimo nas execuções de tarefas do Airflow e configure mecanismos de repetição adequados nos seus DAGs e tarefas.

Não agende DAGs gerados programaticamente para o mesmo horário

A geração de objetos DAG de forma programática a partir de um ficheiro DAG é um método eficiente para criar muitos DAGs semelhantes que só têm pequenas diferenças.

É importante não agendar todos esses DAGs para execução imediata. Existe uma grande probabilidade de os trabalhadores do Airflow não terem recursos de CPU e memória suficientes para executar todas as tarefas agendadas ao mesmo tempo.

Para evitar problemas com o agendamento de DAGs programáticos:

  • Aumente a simultaneidade dos trabalhadores e expanda o seu ambiente para que possa executar mais tarefas em simultâneo.
  • Gere DAGs de forma a distribuir as respetivas agendas uniformemente ao longo do tempo, para evitar agendar centenas de tarefas ao mesmo tempo, de modo que os trabalhadores do Airflow tenham tempo para executar todas as tarefas agendadas.

Controlar o tempo de execução de DAGs, tarefas e execuções paralelas do mesmo DAG

Se quiser controlar a duração de uma única execução de DAG para um DAG específico, pode usar o parâmetro dagrun_timeout do DAG. Por exemplo, se esperar que uma única execução de DAG (independentemente de a execução terminar com êxito ou falha) não dure mais de 1 hora, defina este parâmetro para 3600 segundos.

Também pode controlar a duração de uma única tarefa do Airflow. Para o fazer, pode usar execution_timeout.

Se quiser controlar o número de execuções de DAGs ativas que quer ter para um DAG específico, pode usar a [core]max-active-runs-per-dag opção de configuração do Airflow para o fazer.

Se quiser ter apenas uma única instância de um DAG em execução num determinado momento, defina o parâmetro max-active-runs-per-dag como 1.

Evite o aumento do tráfego de rede para e a partir da base de dados do Airflow

A quantidade de tráfego de rede entre o cluster do GKE do seu ambiente e a base de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da forma como os DAGs acedem aos dados na base de dados do Airflow. Os seguintes fatores podem influenciar a utilização da rede:

  • Consultas à base de dados do Airflow. Se os DAGs fizerem muitas consultas, geram grandes quantidades de tráfego. Exemplos: verificar o estado das tarefas antes de avançar com outras tarefas, consultar a tabela XCom, transferir o conteúdo da base de dados do Airflow.

  • Grande número de tarefas. Quanto mais tarefas houver para agendar, mais tráfego de rede é gerado. Esta consideração aplica-se tanto ao número total de tarefas nos seus DAGs como à frequência de agendamento. Quando o programador do Airflow agenda execuções de DAGs, faz consultas à base de dados do Airflow e gera tráfego.

  • A interface Web do Airflow gera tráfego de rede porque faz consultas à base de dados do Airflow. A utilização intensiva de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.

O que se segue?