Risoluzione dei problemi dei DAG

Cloud Composer 1 | Cloud Composer 2

Questa pagina fornisce passaggi per la risoluzione dei problemi e informazioni per problemi comuni relativi al flusso di lavoro.

Molti problemi di esecuzione dei DAG sono causati da prestazioni dell'ambiente non ottimali. Puoi ottimizzare il tuo ambiente Cloud Composer 2 seguendo la guida Ottimizzare prestazioni e costi dell'ambiente.

Alcuni problemi di esecuzione di DAG potrebbero essere causati da uno scheduler di Airflow non funzionante in modo corretto o in modo ottimale. Segui le istruzioni per la risoluzione dei problemi dello strumento di pianificazione per risolvere questi problemi.

Flusso di lavoro per la risoluzione dei problemi

Per iniziare la risoluzione dei problemi:

  1. Controlla i log di Airflow.

    Puoi aumentare il livello di logging di Airflow eseguendo l'override della seguente opzione di configurazione di Airflow.

    Flusso d'aria 2

    Sezione Chiave Valore
    logging logging_level Il valore predefinito è INFO. Imposta su DEBUG per aumentare il livello di dettaglio nei messaggi di log.

    Flusso d'aria 1

    Sezione Chiave Valore
    core logging_level Il valore predefinito è INFO. Imposta su DEBUG per aumentare il livello di dettaglio nei messaggi di log.
  2. Controlla la dashboard di Monitoring.

  3. Consulta Cloud Monitoring.

  4. Nella console Google Cloud, verifica la presenza di errori nelle pagine relative ai componenti del tuo ambiente.

  5. Nell'interfaccia web di Airflow, controlla nella visualizzazione grafico del DAG le istanze di attività non riuscite.

    Sezione Chiave Valore
    webserver dag_orientation LR, TB, RL o BT

Debug degli errori degli operatori

Per eseguire il debug di un errore relativo a un operatore:

  1. Verifica la presenza di errori specifici per le attività.
  2. Controlla i log di Airflow.
  3. Consulta Cloud Monitoring.
  4. Controlla i log specifici dell'operatore.
  5. Correggi gli errori.
  6. Carica il DAG nella cartella dags/.
  7. Nell'interfaccia web di Airflow, cancella gli stati passati del DAG.
  8. Riprendi o esegui il DAG.

Risoluzione dei problemi di esecuzione delle attività

Airflow è un sistema distribuito con molte entità come scheduler, esecutore e worker che comunicano tra loro tramite una coda di attività e il database Airflow e inviano segnali (come SIGTERM). Il seguente diagramma mostra una panoramica delle interconnessioni tra i componenti di Airflow.

Interazione tra i componenti Airflow
Figura 1. Interazione tra i componenti Airflow (fai clic per ingrandire)

In un sistema distribuito come Airflow, potrebbe verificarsi qualche problema di connettività di rete oppure l'infrastruttura sottostante potrebbe presentare problemi intermittenti; ciò può portare a situazioni in cui le attività possono non riuscire e essere riprogrammate per l'esecuzione, oppure le attività potrebbero non essere completate correttamente (ad esempio, attività di Zombie o attività che si bloccano in esecuzione). Airflow dispone di meccanismi per affrontare queste situazioni e riprendere automaticamente il normale funzionamento. Le seguenti sezioni spiegano i problemi comuni che si verificano durante l'esecuzione delle attività da parte di Airflow: attività zombie, pillole velenose e indicatori SIGTERM.

Risoluzione dei problemi relativi alle attività Zombie

Airflow rileva due tipi di mancata corrispondenza tra un'attività e un processo che la esegue:

  • Le attività zombie sono attività che dovrebbero essere in esecuzione ma non in esecuzione. Questo può accadere se il processo dell'attività è stato terminato o non risponde, se il worker Airflow non ha segnalato uno stato dell'attività in tempo perché è sovraccarico o se la VM in cui viene eseguita l'attività è stata arrestata. Airflow trova queste attività periodicamente e non riesce o riprova a eseguire l'attività, a seconda delle impostazioni dell'attività.

  • Le attività non morte sono attività che non dovrebbero essere in esecuzione. Airflow trova queste attività periodicamente e le termina.

Il motivo più comune delle attività zombie è la carenza di risorse di CPU e memoria nel cluster del tuo ambiente. Di conseguenza, un worker Airflow potrebbe non essere in grado di segnalare lo stato di un'attività o un sensore potrebbe essere interrotto bruscamente. In questo caso, lo scheduler contrassegna una determinata attività come attività Zombie. Per evitare le attività di Zombie, assegna più risorse al tuo ambiente.

Per ulteriori informazioni sulla scalabilità del tuo ambiente Cloud Composer, consulta la guida per l'ambiente di scalabilità. Se svolgi attività Zombie, una possibile soluzione è aumentare il timeout per le attività Zombie. Di conseguenza, lo scheduler attende più a lungo prima di considerare un'attività come uno zombie. In questo modo, un worker Airflow ha più tempo per segnalare lo stato dell'attività.

Per aumentare il timeout per le attività Zombie, esegui l'override del valore dell'opzione di configurazione di Airflow [scheduler]scheduler_zombie_task_threshold:

Sezione Chiave Valore Note
scheduler scheduler_zombie_task_threshold Nuovo timeout (in secondi) Il valore predefinito è 300

Risoluzione dei problemi con pillola velenosa

Poison Pill è un meccanismo utilizzato da Airflow per arrestare le attività di Airflow.

Airflow utilizza Poison Pill nelle seguenti situazioni:

  • Quando uno scheduler termina un'attività non completata in tempo.
  • Quando un'attività scade o viene eseguita troppo a lungo.

Quando Airflow utilizza Poison Pill, puoi visualizzare le seguenti voci di log nei log di un worker Airflow che ha eseguito l'attività:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possibili soluzioni:

  • Controlla che nel codice dell'attività non siano presenti errori che potrebbero determinarne un'esecuzione troppo lunga.
  • (Cloud Composer 2) Aumenta la CPU e la memoria per i worker Airflow, in modo che le attività vengano eseguite più rapidamente.
  • Aumenta il valore dell'opzione di configurazione di Airflow [celery_broker_transport_options]visibility-timeout.

    Di conseguenza, lo scheduler attende più a lungo il completamento di un'attività, prima di considerare l'attività come un'attività Zombie. Questa opzione è particolarmente utile per le attività dispendiose in termini di tempo che durano molte ore. Se il valore è troppo basso (ad esempio, 3 ore), lo scheduler considera le attività eseguite per 5 o 6 ore come "bloccate" (attività zombie).

  • Aumenta il valore dell'opzione di configurazione Airflow [core]killed_task_cleanup_time.

    Un valore più lungo fornisce ai worker Airflow più tempo per completare le attività in modo controllato. Se il valore è troppo basso, le attività di Airflow potrebbero essere interrotte bruscamente, senza tempo sufficiente per completare correttamente il lavoro.

Risoluzione dei problemi relativi agli indicatori SIGTERM

Gli indicatori SIGTERM vengono utilizzati da Linux, Kubernetes, lo scheduler Airflow e Celery per terminare i processi responsabili dell'esecuzione dei worker o delle attività di Airflow.

I motivi per cui gli indicatori SIGTERM vengono inviati in un ambiente possono essere diversi:

  • Un'attività è diventata un'attività Zombie e deve essere interrotta.

  • Lo scheduler ha rilevato un duplicato di un'attività e invia segnali di veleno e SIGTERM all'attività per interromperla.

  • In Scalabilità automatica pod orizzontale, il piano di controllo GKE invia indicatori SIGTERM per rimuovere i pod non più necessari.

  • Lo scheduler può inviare indicatori SIGTERM al processo DagFileProcessorManager. Questi indicatori SIGTERM vengono utilizzati dallo scheduler per gestire il ciclo di vita del processo DagFileProcessorManager e possono essere ignorati in sicurezza.

    Esempio:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • La condizione di gara tra il callback heartbeat e i callback di uscita in local_task_job, che monitora l'esecuzione dell'attività. Se l'heartbeat rileva che un'attività è stata contrassegnata come riuscita, non può distinguere se l'attività è andata a buon fine o se Airflow è stato invitato a considerarla come riuscita. Tuttavia, l'esecuzione dell'attività verrà chiusa senza attendere che l'esecuzione venga chiusa.

    Questi indicatori SIGTERM possono essere tranquillamente ignorati. L'attività è già in stato riuscito e l'esecuzione dell'esecuzione del DAG nel suo insieme non sarà interessata.

    La voce di log Received SIGTERM. è l'unica differenza tra l'uscita standard e la terminazione dell'attività in stato riuscito.

    Condizione di gara tra i callback di heartbeat e di uscita
    Figura 2. Condizione di gara tra i callback di heartbeat e di uscita (fai clic per ingrandire)
  • Un componente Airflow utilizza più risorse (CPU, memoria) di quelle consentite dal nodo cluster.

  • Il servizio GKE esegue operazioni di manutenzione e invia segnali SIGTERM ai pod in esecuzione su un nodo in procinto di eseguire l'upgrade. Quando un'istanza di attività viene terminata con SIGTERM, puoi visualizzare le seguenti voci di log nei log di un worker Airflow che ha eseguito l'attività:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Possibili soluzioni:

Questo problema si verifica quando una VM che esegue l'attività ha esaurito la memoria. Non è correlato alle configurazioni di Airflow, ma alla quantità di memoria disponibile per la VM.

L'aumento della memoria dipende dalla versione di Cloud Composer che utilizzi. Ad esempio:

  • In Cloud Composer 2, puoi assegnare più risorse di CPU e memoria ai worker Airflow.

  • Nel caso di Cloud Composer 1, puoi ricreare l'ambiente utilizzando un tipo di macchina con prestazioni migliori.

  • In entrambe le versioni di Cloud Composer, puoi ridurre il valore dell'opzione di configurazione di Airflow per la contemporaneità di [celery]worker_concurrency. Questa opzione determina il numero di attività eseguite contemporaneamente da un determinato worker Airflow.

Per ulteriori informazioni sull'ottimizzazione dell'ambiente Cloud Composer 2, consulta Ottimizzare le prestazioni e i costi dell'ambiente

Query di Cloud Logging per scoprire i motivi di riavvii o rimozioni dei pod

Gli ambienti di Cloud Composer utilizzano i cluster GKE come livello dell'infrastruttura di calcolo. In questa sezione troverai query utili che possono aiutare a trovare i motivi per i riavvii o le rimozioni dei worker o dello scheduler Airflow.

Le query presentate di seguito possono essere ottimizzate nel seguente modo:

  • puoi specificare la sequenza temporale che preferisci in Cloud Logging, ad esempio le ultime 6 ore o 3 giorni, oppure definire un intervallo di tempo personalizzato

  • devi specificare il valore CLUSTER_NAME di Cloud Composer

  • puoi anche limitare la ricerca a un pod specifico aggiungendo POD_NAME

Scoprire i container riavviati

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Scopri l'arresto dei container a causa di un evento di esaurimento della memoria

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Scoprire i container che non vengono più eseguiti

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impatto delle operazioni di aggiornamento o upgrade sulle esecuzioni delle attività Airflow

Le operazioni di aggiornamento o upgrade interrompono le attività Airflow attualmente in esecuzione, a meno che un'attività non venga eseguita in modalità differibile.

Ti consigliamo di eseguire queste operazioni quando prevedi un impatto minimo sulle esecuzioni delle attività di Airflow e configurare meccanismi appropriati per i nuovi tentativi nei DAG e nelle attività.

Problemi comuni

Le seguenti sezioni descrivono i sintomi e le potenziali correzioni di alcuni problemi comuni con DAG.

L'attività Airflow è stata interrotta da Negsignal.SIGKILL

A volte è possibile che l'attività utilizzi più memoria di quanta sia allocata il worker Airflow. In questa situazione, potrebbe essere interrotto da Negsignal.SIGKILL. Il sistema invia questo segnale per evitare un ulteriore consumo di memoria, che potrebbe influire sull'esecuzione di altre attività Airflow. Nel log del worker di Airflow potrebbe essere visualizzata la seguente voce di log:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Possibili soluzioni:

  • Minore worker_concurrency del worker Airflow

  • Nel caso di Cloud Composer 2, aumenta la memoria dei worker Airflow

  • Nel caso di Cloud Composer 1, esegui l'upgrade al tipo di macchina più grande utilizzato nel cluster Composer

  • Ottimizza le attività per utilizzare meno memoria

L'attività non riesce senza emettere log a causa di errori di analisi DAG

A volte potrebbero verificarsi lievi errori DAG che portano a una situazione in cui uno scheduler e un processore DAG Airflow sono in grado di pianificare le attività per l'esecuzione e di analizzare un file DAG (rispettivamente), ma il worker Airflow non riesce a eseguire le attività da un DAG di questo tipo perché sono presenti errori di programmazione nel file DAG Python. Ciò potrebbe portare a una situazione in cui un'attività Airflow è contrassegnata come Failed e non esiste alcun log dalla sua esecuzione.

Soluzioni:

  • Verifica nei log dei worker di Airflow che non siano presenti errori generati dal worker Airflow relativi a errori di analisi dei DAG o dei DAG mancanti.

  • Aumenta i parametri relativi all'analisi DAG:

  • Vedi anche Ispezione dei log del processore DAG.

L'attività non riesce senza emettere log a causa della pressione delle risorse

Sintomo: durante l'esecuzione di un'attività, il sottoprocesso del worker Airflow responsabile dell'esecuzione dell'attività di Airflow viene interrotto bruscamente. L'errore visibile nel log del worker di Airflow potrebbe essere simile a quello riportato di seguito:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Soluzione:

L'attività non riesce senza emettere log a causa dell'eliminazione dei pod

I pod di Google Kubernetes Engine sono soggetti al ciclo di vita dei pod Kubernetes e all'eliminazione dei pod. I picchi di attività e la co-pianificazione dei worker sono le due cause più comuni dell'eliminazione dei pod in Cloud Composer.

L'eliminazione dei pod può verificarsi quando un determinato pod usa in modo eccessivo le risorse di un nodo, in relazione alle aspettative di consumo delle risorse configurate per il nodo. Ad esempio, l'eliminazione può verificarsi quando in un pod vengono eseguite diverse attività che utilizzano molta memoria e il loro carico combinato fa sì che il nodo in cui viene eseguito il pod superi il limite di consumo di memoria.

Se un pod worker Airflow viene rimosso, tutte le istanze di attività in esecuzione su quel pod vengono interrotte e successivamente contrassegnate come non riuscite da Airflow.

I log vengono memorizzati nel buffer. Se il pod di un worker viene rimosso prima dello svuotamento del buffer, i log non vengono emessi. L'errore dell'attività senza log indica che i worker Airflow sono stati riavviati a causa dell'esaurimento della memoria (OOM). Alcuni log potrebbero essere presenti in Cloud Logging anche se i log di Airflow non sono stati emessi.

Per visualizzare i log:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Log.

  4. Visualizza i log dei singoli worker in Tutti i log -> Log di Airflow -> Worker -> (singolo worker).

L'esecuzione di DAG è limitata alla memoria. Ogni esecuzione di attività viene avviata con due processi Airflow: esecuzione e monitoraggio dell'attività. Ogni nodo può richiedere fino a 6 attività simultanee (circa 12 processi caricati con moduli Airflow). È possibile consumare più memoria, a seconda della natura del DAG.

Sintomo:

  1. Nella console Google Cloud, vai alla pagina Carichi di lavoro.

    Vai a Carichi di lavoro

  2. Se sono presenti airflow-worker pod che mostrano Evicted, fai clic su ogni pod rimosso e cerca il messaggio The node was low on resource: memory nella parte superiore della finestra.

Risoluzione:

  • In Cloud Composer 1, crea un nuovo ambiente Cloud Composer con un tipo di macchina più grande rispetto a quello attuale.
  • In Cloud Composer 2, aumenta i limiti di memoria per i worker Airflow.
  • Controlla i log dei pod airflow-worker per individuare possibili cause di eliminazione. Per ulteriori informazioni sul recupero dei log dai singoli pod, consulta Risoluzione dei problemi relativi ai carichi di lavoro di cui è stato eseguito il deployment.
  • Assicurati che le attività nel DAG siano idempotenti e ripristinabili.
  • Evita di scaricare file non necessari nel file system locale dei worker di Airflow.

    I worker Airflow hanno una capacità limitata del file system locale. Ad esempio, in Cloud Composer 2, un worker può avere da 1 GB a 10 GB di spazio di archiviazione. Quando lo spazio di archiviazione si esaurisce, il pod worker Airflow viene rimosso dal piano di controllo GKE. Questa operazione non riesce a eseguire tutte le attività eseguite dal lavoratore rimosso.

    Esempi di operazioni problematiche:

    • Download di file o oggetti e archiviazione locale in un worker Airflow. Archivia questi oggetti direttamente in un servizio idoneo come un bucket Cloud Storage.
    • Accesso a oggetti di grandi dimensioni nella cartella /data da un worker Airflow. Il worker Airflow scarica l'oggetto nel proprio file system locale. Implementa invece i DAG in modo che i file di grandi dimensioni vengano elaborati al di fuori del pod worker Airflow.

Timeout importazione caricamento DAG

Sintomo:

  • Nell'interfaccia web di Airflow, nella parte superiore della pagina dell'elenco dei DAG, una casella di avviso rossa mostra Broken DAG: [/path/to/dagfile] Timeout.
  • In Cloud Monitoring: i log airflow-scheduler contengono voci simili a:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Risoluzione:

Esegui l'override dell'opzione di configurazione Airflow dag_file_processor_timeout e lascia più tempo per l'analisi dei DAG:

Sezione Chiave Valore
core dag_file_processor_timeout Nuovo valore di timeout

L'esecuzione di DAG non termina entro il tempo previsto

Sintomo:

A volte l'esecuzione di un DAG non termina perché le attività di Airflow si bloccano e l'esecuzione dei DAG dura più a lungo del previsto. In condizioni normali, le attività di Airflow non rimangono a tempo indeterminato in stato di coda o in esecuzione, perché Airflow dispone di procedure di timeout e di pulizia che consentono di evitare questa situazione.

Risoluzione:

  • Utilizza il parametro dagrun_timeout per i DAG. Ad esempio: dagrun_timeout=timedelta(minutes=120). Di conseguenza, ogni esecuzione di DAG deve essere completata entro il timeout dell'esecuzione dei DAG e le attività non completate devono essere contrassegnate come Failed o Upstream Failed. Per ulteriori informazioni sugli stati delle attività di Airflow, consulta la documentazione di Apache Airflow.

  • Utilizza il parametro timeout esecuzione attività per definire un timeout predefinito per le attività eseguite in base agli operatori di Apache Airflow.

Esecuzioni DAG non eseguite

Sintomo:

L'impostazione dinamica di una data di pianificazione per un DAG può causare vari effetti collaterali imprevisti. Ad esempio:

  • Un'esecuzione DAG avviene sempre nel futuro e il DAG non viene mai eseguito.

  • Le esecuzioni di DAG precedenti sono contrassegnate come eseguite e hanno esito positivo nonostante non siano state eseguite.

Ulteriori informazioni sono disponibili nella documentazione di Apache Airflow.

Risoluzione:

  • Segui i consigli nella documentazione di Apache Airflow.

  • Imposta un start_date statico per i DAG. Se vuoi, puoi utilizzare catchup=False per disattivare l'esecuzione del DAG per le date passate.

  • Evita di utilizzare datetime.now() o days_ago(<number of days>), a meno che tu non sia a conoscenza degli effetti collaterali di questo approccio.

Aumento del traffico di rete da e verso il database Airflow

La quantità di traffico della rete tra il cluster GKE del tuo ambiente e il database Airflow dipende dal numero di DAG, dal numero di attività nei DAG e dal modo in cui i DAG accedono ai dati nel database Airflow. I seguenti fattori potrebbero influire sull'utilizzo della rete:

  • Query al database Airflow. Se i DAG eseguono molte query, generano grandi quantità di traffico. Esempi: controllo dello stato delle attività prima di procedere con altre, query sulla tabella XCom ed esecuzione di dump dei contenuti del database Airflow.

  • Numero elevato di attività. Più attività sono da pianificare, più traffico di rete verrà generato. Questa considerazione si applica sia al numero totale di attività nei DAG sia alla frequenza di pianificazione. Quando lo scheduler Airflow pianifica le esecuzioni dei DAG, esegue query al database Airflow e genera traffico.

  • L'interfaccia web di Airflow genera traffico di rete perché esegue query al database Airflow. L'uso intensivo di pagine con grafici, attività e schemi può generare grandi volumi di traffico di rete.

Il DAG arresta in modo anomalo il server web Airflow o restituisce un errore 502 gateway timeout

Gli errori del server web possono verificarsi per diversi motivi. Controlla i log di airflow-webserver in Cloud Logging per determinare la causa dell'errore 502 gateway timeout.

Calcolo intensivo

Questa sezione riguarda solo Cloud Composer 1.

Evita di eseguire calcoli complessi al momento dell'analisi dei DAG.

A differenza dei nodi worker e scheduler, i cui tipi di macchina possono essere personalizzati per avere una maggiore capacità di CPU e memoria, il server web utilizza un tipo di macchina fisso, che può portare a errori di analisi DAG se il calcolo del tempo di analisi è troppo pesante.

Tieni presente che il server web ha 2 vCPU e 2 GB di memoria. Il valore predefinito per core-dagbag_import_timeout è 30 secondi. Questo valore di timeout definisce il limite superiore per il tempo di tempo di Airflow dedicato al caricamento di un modulo Python nella cartella dags/.

Autorizzazioni errate

Questa sezione riguarda solo Cloud Composer 1.

Il server web non viene eseguito con lo stesso account di servizio dei worker e dello scheduler. Di conseguenza, i worker e lo scheduler potrebbero essere in grado di accedere alle risorse gestite dall'utente a cui il server web non può accedere.

Ti consigliamo di evitare di accedere a risorse non pubbliche durante l'analisi DAG. A volte ciò è inevitabile e dovrai concedere le autorizzazioni all'account di servizio del server web. Il nome dell'account di servizio viene derivato dal dominio del tuo server web. Ad esempio, se il dominio è example-tp.appspot.com, l'account di servizio è example-tp@appspot.gserviceaccount.com.

Errori DAG

Questa sezione riguarda solo Cloud Composer 1.

Il server web viene eseguito su App Engine ed è separato dal cluster GKE dell'ambiente. Il server web analizza i file di definizione dei DAG e può verificarsi un 502 gateway timeout in caso di errori nel DAG. Airflow funziona normalmente senza un server web funzionante se il DAG problematico non interrompe i processi in esecuzione in GKE. In questo caso, puoi utilizzare gcloud composer environments run per recuperare i dettagli dal tuo ambiente e come soluzione alternativa se il server web diventa non disponibile.

In altri casi, puoi eseguire l'analisi dei DAG in GKE e cercare i DAG che generano eccezioni Python irreversibili o che scadono (valore predefinito 30 secondi). Per risolvere i problemi, connettiti a una shell remota in un container worker Airflow e verifica l'eventuale presenza di errori di sintassi. Per maggiori informazioni, consulta la pagina Test dei DAG.

Gestione di un numero elevato di DAG e plug-in nelle cartelle DAG e plug-in

I contenuti delle cartelle /dags e /plugins vengono sincronizzati dal bucket del tuo ambiente ai file system locali dei worker e degli scheduler di Airflow.

Maggiore è il numero di dati archiviati in queste cartelle, più tempo sarà necessario per eseguire la sincronizzazione. Per far fronte a queste situazioni:

  • Limita il numero di file in /dags e /plugins cartelle. Archivia solo il minimo di file richiesti.

  • Se possibile, aumenta lo spazio su disco disponibile per gli scheduler e i worker Airflow.

  • Se possibile, aumenta la CPU e la memoria degli scheduler e dei worker di Airflow in modo che l'operazione di sincronizzazione venga eseguita più velocemente.

  • Nel caso di un numero molto elevato di DAG, suddividi i DAG in batch, comprimili in archivi ZIP e distribuiscili nella cartella /dags. Questo approccio accelera il processo di sincronizzazione dei DAG. I componenti di Airflow decomprimono gli archivi ZIP prima di elaborare i DAG.

  • La generazione di DAG in una pubblicità programmatica potrebbe anche essere un metodo per limitare il numero di file DAG archiviati nella cartella /dags. Consulta la sezione sui DAG programmatici per evitare problemi di pianificazione ed esecuzione dei DAG generati in modo programmatico.

Non pianificare contemporaneamente i DAG generati in modo programmatico

La generazione di oggetti DAG in modo programmatico da un file DAG è un metodo efficiente per creare molti DAG simili che presentano solo piccole differenze.

È importante non pianificare l'esecuzione immediata di tutti questi DAG. Esiste un'elevata probabilità che i worker di Airflow non abbiano risorse di CPU e memoria sufficienti per eseguire tutte le attività pianificate contemporaneamente.

Per evitare problemi con la pianificazione dei DAG programmatici:

  • Aumenta la contemporaneità dei worker e fai lo scale up dell'ambiente, in modo che possa eseguire più attività contemporaneamente.
  • Genera DAG in modo da distribuire le pianificazioni in modo uniforme nel tempo, per evitare di pianificare centinaia di attività contemporaneamente, in modo che i worker di Airflow abbiano il tempo di eseguire tutte le attività pianificate.

Errore 504 durante l'accesso al server web Airflow

Vedi Errore 504 durante l'accesso alla UI di Airflow.

L'eccezione Lost connection to Postgres / MySQL server during query viene generata durante l'esecuzione dell'attività o subito dopo

Lost connection to Postgres / MySQL server during query eccezioni si verificano spesso quando vengono soddisfatte le seguenti condizioni:

  • Il DAG utilizza PythonOperator o un operatore personalizzato.
  • Il tuo DAG esegue query al database Airflow.

Se vengono eseguite diverse query da una funzione richiamabile, le tracce potrebbero puntare erroneamente alla riga self.refresh_from_db(lock_for_update=True) nel codice Airflow; è la prima query di database dopo l'esecuzione dell'attività. La causa effettiva dell'eccezione si verifica prima, quando una sessione SQLAlchemy non viene chiusa correttamente.

Le sessioni SQLAlchemy hanno come ambito un thread e vengono create in una sessione di funzione richiamabile possono essere continuate all'interno del codice Airflow. Se si verificano ritardi significativi tra le query all'interno di una sessione, la connessione potrebbe essere già stata chiusa dal server Postgres o MySQL. Il timeout della connessione negli ambienti Cloud Composer è impostato su circa 10 minuti.

Risoluzione:

  • Utilizza il decorator di airflow.utils.db.provide_session. Questo decorator fornisce una sessione valida al database Airflow nel parametro session e chiude correttamente la sessione alla fine della funzione.
  • Non utilizzare una singola funzione a lunga esecuzione. Puoi invece spostare tutte le query di database in funzioni separate, in modo che esistano più funzioni con il decorator airflow.utils.db.provide_session. In questo caso, le sessioni vengono chiuse automaticamente dopo aver recuperato i risultati della query.

Controllo del tempo di esecuzione di DAG, attività ed esecuzioni parallele dello stesso DAG

Se vuoi controllare la durata di una singola esecuzione DAG per un determinato DAG, puoi utilizzare il parametro DAG dagrun_timeout. Ad esempio, se prevedi che una singola esecuzione DAG (indipendentemente dal fatto che l'esecuzione abbia esito positivo o negativo) non debba durare più di un'ora, imposta questo parametro su 3600 secondi.

Puoi anche controllare la durata di una singola attività Airflow. Per farlo, puoi utilizzare execution_timeout.

Se vuoi controllare il numero di esecuzioni DAG attive che vuoi avere per un determinato DAG, puoi utilizzare l'[core]max-active-runs-per-dag opzione di configurazione Airflow per farlo.

Se vuoi eseguire una sola istanza di un DAG in un determinato momento, imposta il parametro max-active-runs-per-dag su 1.

Problemi che influiscono su DAG e plug-in durante la sincronizzazione con scheduler, worker e server web

Cloud Composer sincronizza il contenuto delle cartelle /dags e /plugins con gli scheduler e i worker. Alcuni oggetti nelle cartelle /dags e /plugins potrebbero impedire il corretto funzionamento della sincronizzazione o almeno rallentarla.

  • La cartella /dags è sincronizzata con scheduler e worker. Questa cartella non viene sincronizzata con i server web in Cloud Composer 2 o se attivi DAG Serialization in Cloud Composer 1.

  • La cartella /plugins è sincronizzata con scheduler, worker e server web.

Potresti riscontrare i seguenti problemi:

  • Hai caricato file compressi in gzip che utilizzano la transcodifica di compressione nelle cartelle /dags e /plugins. Questo accade di solito se utilizzi il comando gsutil cp -Z per caricare i dati nel bucket.

    Soluzione: elimina l'oggetto che ha utilizzato la transcodifica di compressione e ricaricalo nel bucket.

  • Uno degli oggetti è denominato ".". Un oggetto di questo tipo non viene sincronizzato con scheduler e worker e potrebbe interromperne la sincronizzazione.

    Soluzione: rinomina l'oggetto problematico.

  • Una cartella e un file Python DAG hanno gli stessi nomi, ad esempio a.py. In questo caso, il file DAG non è sincronizzato correttamente con i componenti di Airflow.

    Soluzione: rimuovi la cartella che ha lo stesso nome di un file Python DAG.

  • Uno degli oggetti nelle cartelle /dags o /plugins contiene un simbolo / alla fine del nome dell'oggetto. Questi oggetti possono fuorviare il processo di sincronizzazione, perché il simbolo / indica che un oggetto è una cartella, non un file.

    Soluzione: rimuovi il simbolo / dal nome dell'oggetto problematico.

  • Non archiviare i file superflui nelle cartelle /dags e /plugins.

    A volte i DAG e i plug-in implementati sono accompagnati da file aggiuntivi, come i file in cui sono archiviati i test per questi componenti. Questi file vengono sincronizzati con i worker e gli scheduler e influiscono sul tempo necessario per copiarli per scheduler, worker e server web.

    Soluzione: non archiviare file aggiuntivi e non necessari nelle cartelle /dags e /plugins.

L'errore Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' è stato generato da scheduler e worker

Questo problema si verifica perché gli oggetti possono avere uno spazio dei nomi sovrapposto in Cloud Storage, mentre allo stesso tempo i scheduler e i worker utilizzano i file system tradizionali. Ad esempio, è possibile aggiungere sia una cartella che un oggetto con lo stesso nome al bucket di un ambiente. Quando il bucket viene sincronizzato con gli scheduler e i worker dell'ambiente, viene generato questo errore, che può portare a errori delle attività.

Per risolvere questo problema, assicurati che nel bucket dell'ambiente non siano presenti spazi dei nomi sovrapposti. Ad esempio, se /dags/misc (un file) e /dags/misc/example_file.txt (un altro file) si trovano in un bucket, lo scheduler genera un errore.

Interruzioni temporanee durante la connessione a Airflow Metadata DB

Cloud Composer viene eseguito su un'infrastruttura cloud distribuita. Significa che di tanto in tanto potrebbero verificarsi alcuni problemi temporanei che potrebbero interrompere l'esecuzione delle attività di Airflow.

In queste situazioni potresti visualizzare i seguenti messaggi di errore nei log dei worker di Airflow:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Questi problemi intermittenti potrebbero essere causati anche da operazioni di manutenzione eseguite per i tuoi ambienti Cloud Composer.

Solitamente questi errori sono intermittenti e, se le tue attività Airflow sono idempotenti e hai configurato nuovi tentativi, non dovresti riceverli. Puoi anche valutare la possibilità di definire i periodi di manutenzione.

Un ulteriore motivo di questi errori potrebbe essere la mancanza di risorse nel cluster del tuo ambiente. In questi casi, puoi fare lo scale up o l'ottimizzazione dell'ambiente come descritto nelle istruzioni sulla scalabilità degli ambienti o sull'ottimizzazione dell'ambiente.

Un'esecuzione DAG è contrassegnata come riuscita, ma non ha attività eseguite

Se un'esecuzione di DAG execution_date è precedente a quella del DAG start_date, potresti vedere esecuzioni di DAG per cui non sono state eseguite attività, ma che sono comunque contrassegnate come riuscite.

Un'esecuzione DAG riuscita senza attività eseguite
Figura 3. Un'esecuzione DAG riuscita senza attività eseguite (fai clic per ingrandire)

Causa

Questa situazione può verificarsi in uno dei seguenti casi:

  • Una mancata corrispondenza è causata dalla differenza di fuso orario tra execution_date e start_date del DAG. Può accadere, ad esempio, quando utilizzi pendulum.parse(...) per impostare start_date.

  • Il start_date del DAG è impostato su un valore dinamico, ad esempio airflow.utils.dates.days_ago(1)

Soluzione:

  • Assicurati che execution_date e start_date utilizzino lo stesso fuso orario.

  • Specifica un start_date statico e combinalo con catchup=False per evitare di eseguire DAG con date di inizio passate.

Un DAG non è visibile nella UI o nell'UI DAG di Airflow e lo scheduler non lo pianifica

Il processore DAG analizza ogni DAG prima che possa essere pianificato dallo scheduler e prima che un DAG diventi visibile nell'UI di Airflow o nell'UI DAG.

Le seguenti opzioni di configurazione di Airflow definiscono i timeout per l'analisi dei DAG:

Se un DAG non è visibile nella UI o nell'UI DAG di Airflow:

  • Controlla i log del processore DAG se il processore DAG è in grado di elaborare correttamente il DAG. In caso di problemi, nei log del processore DAG o dello scheduler potresti visualizzare le seguenti voci di log:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • Controlla i log dello scheduler per verificare se lo scheduler funziona correttamente. In caso di problemi, nei log dello scheduler potrebbero essere visualizzate le seguenti voci:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

Soluzioni:

  • Correggi tutti gli errori di analisi dei DAG. Il processore DAG analizza più DAG e, in rari casi, gli errori di analisi di un DAG possono influire negativamente sull'analisi di altri DAG.

  • Se l'analisi del DAG richiede più della quantità di secondi definita in [core]dagrun_import_timeout, aumenta questo timeout.

  • Se l'analisi di tutti i DAG richiede più dei secondi definiti in [core]dag_file_processor_timeout, aumenta questo timeout.

  • Se l'analisi del DAG richiede molto tempo, può anche significare che non è implementato in modo ottimale. ad esempio se legge molte variabili di ambiente o esegue chiamate a servizi esterni o al database Airflow. Per quanto possibile, evita di eseguire queste operazioni nelle sezioni globali dei DAG.

  • Aumenta le risorse di CPU e memoria per lo scheduler in modo che possa funzionare più velocemente.

  • Modifica il numero di scheduler.

  • Aumenta il numero di processi del processore DAG in modo che l'analisi possa essere eseguita più rapidamente. Per farlo, aumenta il valore di [scheduler]parsing_process.

  • Riduci la frequenza dell'analisi dei DAG.

  • Riduci il carico sul database Airflow.

Sintomi di carico elevato del database Airflow

Per ulteriori informazioni, consulta la sezione Sintomi di un database Airflow sotto pressione di carico.

Passaggi successivi