Risoluzione dei problemi dei DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina fornisce passaggi per la risoluzione dei problemi e informazioni sui problemi comuni del flusso di lavoro.

Alcuni problemi di esecuzione dei DAG potrebbero essere causati dal mancato funzionamento corretto o ottimale dello scheduler Airflow. Segui le istruzioni per la risoluzione dei problemi dello strumento di pianificazione per risolvere questi problemi.

Risoluzione dei problemi del workflow

Per iniziare la risoluzione dei problemi:

  1. Controlla i log di Airflow.

    Puoi aumentare il livello di logging di Airflow eseguendo l'override dell'opzione di configurazione di Airflow seguente.

    Sezione Chiave Valore
    logging (core in Airflow 1) logging_level Il valore predefinito è INFO. Imposta su DEBUG per ottenere un livello di dettaglio maggiore nei messaggi di log.
  2. Controlla la dashboard di monitoraggio.

  3. Esamina Cloud Monitoring.

  4. Nella console Google Cloud , controlla la presenza di errori nelle pagine dei componenti del tuo ambiente.

  5. Nell'interfaccia web di Airflow, controlla nella visualizzazione del grafico del DAG le istanze di attività non riuscite.

    Sezione Chiave Valore
    webserver dag_orientation LR, TB, RL o BT

Debug degli errori dell'operatore

Per eseguire il debug di un errore dell'operatore:

  1. Controlla la presenza di errori specifici dell'attività.
  2. Controlla i log di Airflow.
  3. Esamina Cloud Monitoring.
  4. Controlla i log specifici dell'operatore.
  5. Correggi gli errori.
  6. Carica il DAG nella cartella /dags.
  7. Nell'interfaccia web di Airflow, cancella gli stati precedenti del DAG.
  8. Riprendi o esegui il DAG.

Risoluzione dei problemi di esecuzione delle attività

Airflow è un sistema distribuito con molte entità come scheduler, executor, worker che comunicano tra loro tramite una coda di attività e il database Airflow e inviano segnali (come SIGTERM). Il seguente diagramma mostra una panoramica delle interconnessioni tra i componenti di Airflow.

Interazione tra i componenti di Airflow
Figura 1. Interazione tra i componenti Airflow (fai clic per ingrandire)

In un sistema distribuito come Airflow potrebbero verificarsi problemi di connettività di rete o l'infrastruttura sottostante potrebbe presentare problemi intermittenti. Ciò può portare a situazioni in cui le attività possono non riuscire e vengono riprogrammate per l'esecuzione oppure le attività potrebbero non essere completate correttamente (ad esempio, attività zombie o attività bloccate durante l'esecuzione). Airflow dispone di meccanismi per gestire queste situazioni e ripristinare automaticamente il normale funzionamento. Le sezioni seguenti spiegano i problemi comuni che si verificano durante l'esecuzione delle attività da parte di Airflow.

Le attività non riescono senza generare log

L'attività non riesce a emettere log a causa di errori di analisi DAG

A volte potrebbero verificarsi errori DAG sottili che portano a una situazione in cui lo scheduler Airflow può pianificare l'esecuzione delle attività, il processore DAG può analizzare il file DAG, ma poi il worker Airflow non riesce a eseguire le attività dal DAG perché ci sono errori di programmazione nel file DAG. Ciò potrebbe portare a una situazione in cui un'attività Airflow viene contrassegnata come Failed e non è presente alcun log della sua esecuzione.

Soluzioni:

  • Verifica nei log worker di Airflow che non siano presenti errori generati dal worker di Airflow correlati a un DAG mancante o a errori di analisi DAG.

  • Aumenta i parametri relativi all'analisi DAG:

    • Aumenta [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] ad almeno 120 secondi (o più, se necessario).

    • Aumenta dag-file-processor-timeout ad almeno 180 secondi (o più, se necessario). Questo valore deve essere superiore a dagbag-import-timeout.

  • Vedi anche Risoluzione dei problemi del processore DAG.

Le attività vengono interrotte bruscamente

Durante l'esecuzione delle attività, i worker Airflow possono terminare bruscamente a causa di problemi non specificamente correlati all'attività stessa. Consulta Cause principali comuni per un elenco di questi scenari e delle possibili soluzioni. Le seguenti sezioni trattano alcuni sintomi aggiuntivi che potrebbero derivare da queste cause principali:

Attività zombie

Airflow rileva due tipi di mancata corrispondenza tra un'attività e un processo che la esegue:

  • Le attività zombie sono attività che dovrebbero essere in esecuzione, ma non lo sono. Ciò può accadere se il processo dell'attività è stato terminato o non risponde, se il worker Airflow non ha segnalato lo stato di un'attività in tempo perché è sovraccarico o se la VM in cui viene eseguita l'attività è stata arrestata. Airflow trova periodicamente queste attività e le interrompe o le riprova, a seconda delle impostazioni dell'attività.

    Scoprire le attività zombie

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Le attività non riuscite sono attività che non dovrebbero essere in esecuzione. Airflow trova periodicamente queste attività e le termina.

Per ulteriori informazioni su come risolvere i problemi relativi alle attività zombie, consulta la sezione Principali cause comuni.

Segnali SIGTERM

I segnali SIGTERM vengono utilizzati da Linux, Kubernetes, dallo scheduler di Airflow e da Celery per terminare i processi responsabili dell'esecuzione dei worker o delle attività di Airflow.

Esistono diversi motivi per cui i segnali SIGTERM vengono inviati in un ambiente:

  • Un'attività è diventata un'attività zombie e deve essere interrotta.

  • Lo scheduler ha rilevato un duplicato di un'attività e invia i segnali Terminating instance e SIGTERM all'attività per interromperla.

  • Nella scalabilità automatica pod orizzontale, il control plane GKE invia segnali SIGTERM per rimuovere i pod non più necessari.

  • Lo scheduler può inviare segnali SIGTERM al processo DagFileProcessorManager. Questi segnali SIGTERM vengono utilizzati dallo scheduler per gestire il ciclo di vita del processo DagFileProcessorManager e possono essere ignorati in sicurezza.

    Esempio:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condizione di competizione tra il callback heartbeat e i callback di uscita in local_task_job, che monitora l'esecuzione dell'attività. Se il battito cardiaco rileva che un'attività è stata contrassegnata come riuscita, non può distinguere se l'attività stessa è riuscita o se è stato comunicato ad Airflow di considerare l'attività riuscita. Tuttavia, terminerà un task runner senza attendere che esca.

    Questi segnali SIGTERM possono essere ignorati in sicurezza. L'attività è già nello stato Riuscito e l'esecuzione dell'intera esecuzione del DAG non sarà interessata.

    La voce di log Received SIGTERM. è l'unica differenza tra l'uscita normale e l'interruzione dell'attività nello stato di esecuzione corretta.

    Condizione di competizione tra i callback heartbeat e exit
    Figura 2. Condizione di competizione tra i callback heartbeat e di uscita (fai clic per ingrandire)
  • Un componente Airflow utilizza più risorse (CPU, memoria) di quelle consentite dal nodo del cluster.

  • Il servizio GKE esegue operazioni di manutenzione e invia segnali SIGTERM ai pod eseguiti su un nodo di cui verrà eseguito l'upgrade.

    Quando un'istanza di attività viene terminata con SIGTERM, puoi visualizzare le seguenti voci di log nei log di un worker Airflow che ha eseguito l'attività:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Possibili soluzioni:

Questo problema si verifica quando la memoria di una VM che esegue l'attività è esaurita. Questo non è correlato alle configurazioni di Airflow, ma alla quantità di memoria disponibile per la VM.

  • In Cloud Composer 1, puoi ricreare l'ambiente utilizzando un tipo di macchina con prestazioni migliori.

  • Puoi ridurre il valore dell'opzione di configurazione di Airflow [celery]worker_concurrency concurrency. Questa opzione determina il numero di attività eseguite contemporaneamente da un determinato worker Airflow.

L'attività Airflow è stata interrotta da Negsignal.SIGKILL

A volte l'attività potrebbe utilizzare più memoria di quella allocata al worker Airflow. In questa situazione, potrebbe essere interrotta da Negsignal.SIGKILL. Il sistema invia questo segnale per evitare un ulteriore consumo di memoria che potrebbe influire sull'esecuzione di altre attività Airflow. Nel log del worker Airflow potresti vedere la seguente voce di log:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL potrebbe anche essere visualizzato come codice -9.

Possibili soluzioni:

  • worker_concurrency inferiore dei worker Airflow.

  • Esegui l'upgrade a un tipo di macchina più grande utilizzato nel cluster Cloud Composer.

  • Ottimizza le attività per utilizzare meno memoria.

L'attività non riesce a causa della pressione delle risorse

Sintomo: durante l'esecuzione di un'attività, il sottoprocesso del worker di Airflow responsabile dell'esecuzione dell'attività di Airflow viene interrotto bruscamente. L'errore visibile nel log del worker Airflow potrebbe essere simile a quello riportato di seguito:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Soluzione:

L'attività non riesce a causa dell'eliminazione del pod

I pod Google Kubernetes Engine sono soggetti al ciclo di vita dei pod Kubernetes e all'espulsione dei pod. I picchi di attività e la co-pianificazione dei worker sono due delle cause più comuni di espulsione dei pod in Cloud Composer.

L'eliminazione dei pod può verificarsi quando un determinato pod utilizza eccessivamente le risorse di un nodo, rispetto alle aspettative di consumo delle risorse configurate per il nodo. Ad esempio, l'espulsione potrebbe verificarsi quando in un pod vengono eseguite diverse attività che richiedono molta memoria e il loro carico combinato fa sì che il nodo in cui viene eseguito questo pod superi il limite di consumo di memoria.

Se un pod worker di Airflow viene rimosso, tutte le istanze di attività in esecuzione su quel pod vengono interrotte e successivamente contrassegnate come non riuscite da Airflow.

I log vengono memorizzati nel buffer. Se un pod di lavoro viene rimosso prima dello svuotamento del buffer, i log non vengono emessi. L'errore dell'attività senza log indica che i worker di Airflow vengono riavviati a causa dell'esaurimento della memoria (OOM). Alcuni log potrebbero essere presenti in Cloud Logging anche se i log di Airflow non sono stati emessi.

Per visualizzare i log:

  1. Nella console Google Cloud , vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Log.

  4. Visualizza i log dei singoli worker Airflow in Tutti i log > Log di Airflow > Worker.

Sintomo:

  1. Nella console Google Cloud , vai alla pagina Workload.

    Vai a Carichi di lavoro

  2. Se sono presenti airflow-worker pod che mostrano Evicted, fai clic su ogni pod espulso e cerca il messaggio The node was low on resource: memory nella parte superiore della finestra.

Soluzione:

  • Crea un nuovo ambiente Cloud Composer 1 con un tipo di macchina più grande di quello attuale.

  • Controlla i log dei pod airflow-worker per possibili cause di espulsione. Per ulteriori informazioni sul recupero dei log dai singoli pod, consulta Risoluzione dei problemi relativi ai carichi di lavoro di cui è stato eseguito il deployment.

  • Assicurati che le attività nel DAG siano idempotenti e riprovabili.

  • Evita di scaricare file non necessari nel file system locale dei worker Airflow.

    I worker Airflow hanno una capacità limitata del file system locale. Quando lo spazio di archiviazione si esaurisce, il pod worker Airflow viene espulso dal control plane GKE. Vengono interrotte tutte le attività che il worker espulso stava eseguendo.

    Esempi di operazioni problematiche:

    • Download di file o oggetti e memorizzazione locale in un worker Airflow. Archivia invece questi oggetti direttamente in un servizio adatto, ad esempio un bucket Cloud Storage.
    • Accesso a oggetti di grandi dimensioni nella cartella /data da un worker Airflow. Il worker Airflow scarica l'oggetto nel file system locale. Implementa invece i DAG in modo che i file di grandi dimensioni vengano elaborati al di fuori del pod worker di Airflow.

Principali cause comuni

Il worker Airflow ha esaurito la memoria

Ogni worker Airflow può eseguire fino a [celery]worker_concurrency istanze di attività contemporaneamente. Se il consumo cumulativo di memoria di queste istanze di attività supera il limite di memoria per un worker Airflow, un processo casuale viene terminato per liberare risorse.

Scopri gli eventi di esaurimento memoria dei worker di Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

A volte, la carenza di memoria in un worker Airflow può comportare l'invio di pacchetti malformati durante una sessione SQL Alchemy al database, a un server DNS o a qualsiasi altro servizio chiamato da un DAG. In questo caso, l'altra estremità della connessione potrebbe rifiutare o interrompere le connessioni dal worker Airflow. Ad esempio:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Soluzioni:

Il worker Airflow è stato rimosso

L'espulsione dei pod è una parte normale dell'esecuzione dei carichi di lavoro su Kubernetes. GKE elimina i pod se lo spazio di archiviazione è esaurito o per liberare risorse per i carichi di lavoro con una priorità più elevata.

Scopri le eliminazioni dei worker Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluzioni:

Il worker Airflow è stato terminato

I worker di Airflow potrebbero essere rimossi esternamente. Se le attività attualmente in esecuzione non terminano durante un periodo di interruzione controllata, vengono interrotte e potrebbero essere rilevate come zombie.

Scopri le terminazioni dei pod worker Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Possibili scenari e soluzioni:

  • I worker Airflow vengono riavviati durante le modifiche all'ambiente, ad esempio aggiornamenti o installazione di pacchetti:

    Scopri le modifiche all'ambiente Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Puoi eseguire queste operazioni quando non sono in esecuzione attività critiche o attivare i nuovi tentativi di esecuzione delle attività.

  • Durante le operazioni di manutenzione, vari componenti potrebbero essere temporaneamente non disponibili.

    Scopri le operazioni di manutenzione di GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Puoi specificare i periodi di manutenzione per ridurre al minimo

    si sovrappone all'esecuzione delle attività critiche.

Il worker Airflow era molto carico

La quantità di risorse di CPU e memoria disponibili per un worker Airflow è limitata dalla configurazione dell'ambiente. Se l'utilizzo delle risorse si avvicina ai limiti, potrebbe causare una contesa delle risorse e ritardi inutili durante l'esecuzione dell'attività. In situazioni estreme, quando le risorse scarseggiano per periodi di tempo più lunghi, potrebbero verificarsi attività zombie.

Soluzioni:

Query Cloud Logging per scoprire i motivi dei riavvii o delle eliminazioni dei pod

Gli ambienti Cloud Composer utilizzano i cluster GKE come livello di infrastruttura di calcolo. In questa sezione puoi trovare query utili che possono aiutarti a trovare i motivi per cui il worker Airflow o lo scheduler Airflow vengono riavviati o rimossi.

Le query presentate ulteriormente possono essere modificate nel seguente modo:

  • Puoi specificare la sequenza temporale richiesta in Cloud Logging. Ad esempio, le ultime 6 ore, 3 giorni oppure puoi definire un intervallo di tempo personalizzato.

  • Devi specificare il nome del cluster del tuo ambiente in CLUSTER_NAME.

  • Puoi limitare la ricerca a un pod specifico aggiungendo POD_NAME.

Scopri i container riavviati

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Scopri i container arrestati a seguito di un evento di esaurimento della memoria

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Scopri i container che hanno interrotto l'esecuzione

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Il database Airflow era molto carico

Un database viene utilizzato da vari componenti di Airflow per comunicare tra loro e, in particolare, per archiviare i battiti del cuore delle istanze di attività. La carenza di risorse nel database comporta tempi di query più lunghi e potrebbe influire sull'esecuzione delle attività.

A volte, nei log di un worker Airflow sono presenti i seguenti errori:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Soluzioni:

Il database Airflow non era temporaneamente disponibile

Un worker Airflow potrebbe impiegare del tempo per rilevare e gestire correttamente gli errori intermittenti, come i problemi di connettività temporanei. Potrebbe superare la soglia predefinita di rilevamento dei zombie.

Scopri i timeout heartbeat di Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluzioni:

  • Aumenta il timeout per le attività zombie ed esegui l'override del valore dell'opzione di configurazione di Airflow [scheduler]scheduler_zombie_task_threshold:

    Sezione Chiave Valore Note
    scheduler scheduler_zombie_task_threshold Nuovo timeout (in secondi) Il valore predefinito è 300

Le attività non riescono perché si è verificato un errore durante l'esecuzione

Terminazione dell'istanza in corso…

Airflow utilizza il meccanismo di istanza di terminazione per arrestare le attività di Airflow. Questo meccanismo viene utilizzato nelle seguenti situazioni:

  • Quando uno scheduler termina un'attività che non è stata completata in tempo.
  • Quando un'attività scade o viene eseguita per troppo tempo.

Quando Airflow termina le istanze di attività, nei log di un worker di Airflow che ha eseguito l'attività puoi visualizzare le seguenti voci di log:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possibili soluzioni:

  • Controlla il codice dell'attività per rilevare errori che potrebbero causarne l'esecuzione troppo a lungo.

  • Aumenta il valore dell'opzione di configurazione Airflow [celery_broker_transport_options]visibility_timeout.

    Di conseguenza, lo scheduler attende più a lungo il completamento di un'attività, prima di considerarla un'attività zombie. Questa opzione è particolarmente utile per le attività che richiedono molto tempo e durano molte ore. Se il valore è troppo basso (ad esempio, 3 ore), lo scheduler considera le attività che vengono eseguite per 5 o 6 ore come "bloccate" (attività zombie).

  • Aumenta il valore dell'opzione di configurazione di Airflow [core]killed_task_cleanup_time.

    Un valore più lungo offre più tempo ai worker Airflow per completare le loro attività in modo controllato. Se il valore è troppo basso, le attività Airflow potrebbero essere interrotte improvvisamente, senza avere il tempo sufficiente per terminare il lavoro in modo controllato.

L'esecuzione del DAG non termina entro il tempo previsto

Sintomo:

A volte un'esecuzione DAG non termina perché le attività Airflow si bloccano e l'esecuzione DAG dura più del previsto. In condizioni normali, le attività Airflow non rimangono indefinitamente nello stato In coda o In esecuzione, perché Airflow dispone di procedure di timeout e pulizia che aiutano a evitare questa situazione.

Correzione:

  • Utilizza il parametro dagrun_timeout per i DAG. Ad esempio: dagrun_timeout=timedelta(minutes=120). Di conseguenza, ogni esecuzione del DAG deve essere completata entro il timeout di esecuzione del DAG. Per saperne di più sugli stati delle attività Airflow, consulta la documentazione di Apache Airflow.

  • Utilizza il parametro Timeout di esecuzione dell'attività per definire un timeout predefinito per le attività eseguite in base agli operatori Apache Airflow.

La connessione al server Postgres / MySQL è stata interrotta durante l'esecuzione dell'attività o subito dopo

Le eccezioni Lost connection to Postgres / MySQL server during query si verificano spesso quando sono soddisfatte le seguenti condizioni:

  • Il tuo DAG utilizza PythonOperator o un operatore personalizzato.
  • Il DAG esegue query sul database Airflow.

Se vengono eseguite diverse query da una funzione chiamabile, le tracebacks potrebbero puntare in modo errato alla riga self.refresh_from_db(lock_for_update=True) nel codice Airflow; si tratta della prima query del database dopo l'esecuzione dell'attività. La causa effettiva dell'eccezione si verifica prima, quando una sessione SQLAlchemy non viene chiusa correttamente.

Le sessioni SQLAlchemy sono limitate a un thread e create in una funzione chiamabile. La sessione può essere continuata in un secondo momento all'interno del codice Airflow. Se si verificano ritardi significativi tra le query all'interno di una sessione, la connessione potrebbe essere già chiusa dal server Postgres o MySQL. Il timeout della connessione negli ambienti Cloud Composer è impostato su circa 10 minuti.

Soluzione:

  • Utilizza il decoratore airflow.utils.db.provide_session. Questo decoratore fornisce una sessione valida al database Airflow nel parametro session e chiude correttamente la sessione alla fine della funzione.
  • Non utilizzare una singola funzione a lunga esecuzione. Sposta invece tutte le query del database in funzioni separate, in modo che ci siano più funzioni con il decoratore airflow.utils.db.provide_session. In questo caso, le sessioni vengono chiuse automaticamente dopo il recupero dei risultati della query.

Interruzioni temporanee durante la connessione al database dei metadati Airflow

Cloud Composer viene eseguito su un'infrastruttura distribuita. Ciò significa che di tanto in tanto potrebbero verificarsi alcuni problemi temporanei che potrebbero interrompere l'esecuzione delle attività Airflow.

In queste situazioni, nei log dei worker Airflow potresti visualizzare i seguenti messaggi di errore:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Questi problemi intermittenti potrebbero essere causati anche dalle operazioni di manutenzione eseguite per gli ambienti Cloud Composer.

In genere, questi errori sono intermittenti e, se le attività Airflow sono idempotenti e hai configurato i tentativi, non ti riguardano. Puoi anche valutare la possibilità di definire periodi di manutenzione.

Un altro motivo di questi errori potrebbe essere la mancanza di risorse nel cluster del tuo ambiente. In questi casi, puoi scalare o ottimizzare il tuo ambiente come descritto nelle istruzioni Scalare gli ambienti o Ottimizzare l'ambiente.

Un'esecuzione di DAG è contrassegnata come riuscita, ma non ha attività eseguite

Se un'esecuzione di DAG execution_date è precedente a start_date del DAG, potresti visualizzare esecuzioni di DAG senza esecuzioni di attività, ma comunque contrassegnate come riuscite.

Un&#39;esecuzione di DAG riuscita senza attività eseguite
Figura 3. Esecuzione di DAG riuscita senza attività eseguite (fai clic per ingrandire)

Causa

Questa situazione può verificarsi in uno dei seguenti casi:

  • La mancata corrispondenza è causata dalla differenza di fuso orario tra execution_date e start_date del DAG. Ad esempio, potrebbe verificarsi quando utilizzi pendulum.parse(...) per impostare start_date.

  • Il valore start_date del DAG è impostato su un valore dinamico, ad esempio airflow.utils.dates.days_ago(1)

Soluzione

  • Assicurati che execution_date e start_date utilizzino lo stesso fuso orario.

  • Specifica un valore statico per start_date e combinalo con catchup=False per evitare di eseguire DAG con date di inizio passate.

Best practice

Impatto delle operazioni di aggiornamento o upgrade sulle esecuzioni dei task Airflow

Le operazioni di aggiornamento o upgrade interrompono le attività Airflow attualmente in esecuzione, a meno che un'attività non venga eseguita in modalità differibile.

Ti consigliamo di eseguire queste operazioni quando prevedi un impatto minimo sulle esecuzioni delle attività Airflow e di configurare meccanismi di ripetizione appropriati nei DAG e nelle attività.

Non programmare i DAG generati in modo programmatico contemporaneamente

La generazione programmatica di oggetti DAG da un file DAG è un metodo efficiente per creare molti DAG simili che presentano solo piccole differenze.

È importante non pianificare l'esecuzione immediata di tutti questi DAG. Esiste un'alta probabilità che i worker di Airflow non dispongano di risorse CPU e memoria sufficienti per eseguire tutte le attività pianificate contemporaneamente.

Per evitare problemi con la pianificazione dei DAG programmatici:

  • Aumenta la concorrenza dei worker e fai lo scale up del tuo ambiente in modo che possa eseguire più attività contemporaneamente.
  • Genera DAG in modo da distribuire le pianificazioni in modo uniforme nel tempo, per evitare di pianificare centinaia di attività contemporaneamente, in modo che i worker Airflow abbiano il tempo di eseguire tutte le attività pianificate.

Controllare il tempo di esecuzione di DAG, attività ed esecuzioni parallele dello stesso DAG

Se vuoi controllare la durata di una singola esecuzione DAG per un determinato DAG, puoi utilizzare il parametro DAG dagrun_timeout. Ad esempio, se prevedi che una singola esecuzione DAG (indipendentemente dal fatto che l'esecuzione termini con esito positivo o negativo) non debba durare più di 1 ora, imposta questo parametro su 3600 secondi.

Puoi anche controllare la durata di una singola attività Airflow. Per farlo, puoi utilizzare execution_timeout.

Se vuoi controllare il numero di esecuzioni DAG attive per un DAG specifico, puoi utilizzare l'[core]max-active-runs-per-dag opzione di configurazione di Airflow.

Se vuoi che venga eseguita una sola istanza di un DAG in un determinato momento, imposta il parametro max-active-runs-per-dag su 1.

Evita l'aumento del traffico di rete da e verso il database Airflow

La quantità di traffico di rete tra il cluster GKE del tuo ambiente e il database Airflow dipende dal numero di DAG, dal numero di attività nei DAG e dal modo in cui i DAG accedono ai dati nel database Airflow. I seguenti fattori potrebbero influire sull'utilizzo della rete:

  • Query al database Airflow. Se i tuoi DAG eseguono molte query, generano grandi quantità di traffico. Esempi: controllo dello stato delle attività prima di procedere con altre attività, query della tabella XCom, dump dei contenuti del database Airflow.

  • Numero elevato di attività. Più attività ci sono da pianificare, più traffico di rete viene generato. Questa considerazione si applica sia al numero totale di attività nei DAG sia alla frequenza di pianificazione. Quando lo scheduler Airflow pianifica le esecuzioni dei DAG, esegue query sul database Airflow e genera traffico.

  • L'interfaccia web di Airflow genera traffico di rete perché invia query al database Airflow. L'utilizzo intensivo di pagine con grafici, attività e diagrammi può generare grandi volumi di traffico di rete.

Passaggi successivi