Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina fornisce informazioni e passaggi per la risoluzione dei problemi comuni relativi al flusso di lavoro.
Molti problemi di esecuzione dei DAG sono causati da prestazioni dell'ambiente non ottimali. Puoi ottimizzare il tuo ambiente seguendo la guida Ottimizzare le prestazioni e i costi dell'ambiente.
Alcuni problemi di esecuzione dei DAG potrebbero essere causati dal mancato funzionamento corretto o ottimale dello scheduler di Airflow. Segui le istruzioni per la risoluzione dei problemi relativi all'agente di pianificazione per risolvere questi problemi.
Risoluzione dei problemi relativi al flusso di lavoro
Per iniziare la risoluzione dei problemi:
Controlla i log di Airflow.
Puoi aumentare il livello di logging di Airflow eseguendo l'override della seguente opzione di configurazione di Airflow.
Sezione Chiave Valore logging
logging_level
Il valore predefinito è INFO
. Imposta suDEBUG
per ottenere un livello di dettaglio maggiore nei messaggi di log.Controlla la dashboard di monitoraggio.
Esamina Cloud Monitoring.
Nella console Google Cloud, controlla se sono presenti errori nelle pagine dei componenti del tuo ambiente.
Nell'interfaccia web di Airflow, controlla le istanze di attività non riuscite nella Visualizzazione grafico del DAG.
Sezione Chiave Valore webserver
dag_orientation
LR
,TB
,RL
oBT
Debug degli errori dell'operatore
Per eseguire il debug di un errore dell'operatore:
- Controlla la presenza di errori specifici delle attività.
- Controlla i log di Airflow.
- Esamina Cloud Monitoring.
- Controlla i log specifici dell'operatore.
- Correggi gli errori.
- Carica il DAG nella cartella
/dags
. - Nell'interfaccia web di Airflow, cancella gli stati passati per il DAG.
- Riprendi o esegui il DAG.
Risoluzione dei problemi di esecuzione delle attività
Airflow è un sistema distribuito con molte entità come lo scheduler, l'executor e i worker che comunicano tra loro tramite una coda di attività e il database Airflow e inviano segnali (come SIGTERM). Il seguente diagramma mostra una panoramica delle interconnessioni tra i componenti di Airflow.
In un sistema distribuito come Airflow potrebbero verificarsi problemi di connettività di rete o l'infrastruttura di base potrebbe presentare problemi intermittenti. Ciò può portare a situazioni in cui le attività possono non riuscire ed essere riprogrammate per l'esecuzione oppure non essere completate correttamente (ad esempio, attività zombie o attività bloccate durante l'esecuzione). Airflow dispone di meccanismi per gestire queste situazioni e riprendere automaticamente il normale funzionamento. Le sezioni seguenti spiegano i problemi comuni che si verificano durante l'esecuzione delle attività da parte di Airflow: attività zombie, istanza in fase di terminazione e segnali SIGTERM.
Risolvere i problemi relativi alle attività zombie
Airflow rileva due tipi di mancata corrispondenza tra un'attività e un processo che la esegue:
Le attività zombie sono attività che dovrebbero essere in esecuzione, ma non lo sono. Ciò può accadere se il processo dell'attività è stato interrotto o non risponde, se il worker Airflow non ha segnalato in tempo lo stato dell'attività perché è sovraccaricato o se la VM in cui viene eseguita l'attività è stata arrestata. Airflow le trova periodicamente e le esegue o le ritenta, a seconda delle impostazioni.
Scoprire le attività zombie
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
Le attività non terminate sono attività che non dovrebbero essere in esecuzione. Airflow individua periodicamente queste attività e le termina.
Le sezioni seguenti descrivono i motivi e le soluzioni più comuni per le attività zombie.
Il worker Airflow ha esaurito la memoria
Ogni worker Airflow può eseguire fino a [celery]worker_concurrency
istanze di attività contemporaneamente. Se il consumo cumulativo di memoria di queste istanze di attività supera il limite di memoria per un worker Airflow, un processo casuale viene terminato per liberare risorse.
A volte, la scarsità di memoria in un worker Airflow può portare all'invio di pacchetti con formato non corretto durante una sessione SQL Alchemy al database, a un server DNS o a qualsiasi altro servizio chiamato da un DAG. In questo caso, l'altra estremità della connessione potrebbe rifiutare o interrompere le connessioni dal worker Airflow. Ad esempio:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
Soluzioni:
Ottimizza le attività per utilizzare meno memoria, ad esempio evitando il codice di primo livello.
Riduci
[celery]worker_concurrency
.Aumenta la memoria per i worker Airflow per gestire le modifiche di
[celery]worker_concurrency
.
Il worker Airflow è stato espulso
L'espulsione dei pod è una parte normale dell'esecuzione dei carichi di lavoro su Kubernetes. GKE esegue l'espulsione dei pod se non è più disponibile spazio di archiviazione o per liberare risorse per i carichi di lavoro con una priorità più elevata.
Soluzioni:
- Se un'espulsione è causata dalla mancanza di spazio di archiviazione, puoi ridurre l'utilizzo dello spazio o rimuovere i file temporanei non appena non sono più necessari.
In alternativa, puoi
aumentare lo spazio di archiviazione disponibile o eseguire
workload in un pod dedicato con
KubernetesPodOperator
.
Il worker Airflow è stato terminato
I worker di Airflow potrebbero essere rimossi esternamente. Se le attività in esecuzione non vengono completate durante un periodo di interruzione, vengono interrotte e potrebbero essere rilevate come zombie.
Possibili scenari e soluzioni:
I worker Airflow vengono riavviati durante le modifiche dell'ambiente, ad esempio upgrade o installazione di pacchetti:
Scoprire le modifiche all'ambiente Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
Puoi eseguire queste operazioni quando non sono in esecuzione attività critiche o attivare i tentativi di esecuzione di nuovo delle attività.
Vari componenti potrebbero essere temporaneamente non disponibili durante le operazioni di manutenzione.
Puoi specificare i periodi di manutenzione per ridurre al minimosi sovrappone all'esecuzione delle attività critiche.
Il worker Airflow era sotto carico elevato
La quantità di risorse di CPU e memoria disponibili per un worker Airflow è limitata dalla configurazione dell'ambiente. Se l'utilizzo delle risorse si avvicina ai limiti, potrebbe verificarsi una contesa delle risorse e ritardi non necessari durante l'esecuzione delle attività. In situazioni estreme, quando le risorse scarseggiano per periodi di tempo più lunghi, potrebbero verificarsi attività zombie.
Soluzioni:
- Monitora l'utilizzo della CPU e della memoria dei worker e regolalo per evitare di superare l'80%.
Il database Airflow era sotto un carico elevato
Un database viene utilizzato da vari componenti di Airflow per comunicare tra loro e, in particolare, per memorizzare gli heartbeat delle istanze di attività. La carenza di risorse nel database comporta tempi di query più lunghi e potrebbe influire sull'esecuzione delle attività.
A volte, nei log di un worker Airflow sono presenti i seguenti errori:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
Soluzioni:
- Evita di utilizzare molte istruzioni
Variables.get
nel codice DAG di primo livello. Utilizza invece i modelli Jinja per recuperare i valori delle variabili Airflow. - Ottimizza (riduci) l'utilizzo delle istruzioni xcom_push e xcom_pull nei modelli Jinja nel codice DAG di primo livello.
- Valuta la possibilità di eseguire l'upgrade a un ambiente di dimensioni maggiori (medio o grande).
- Riduci il numero di pianificatori
- Riduci la frequenza dell'analisi del DAG.
- Monitora l'utilizzo di CPU e memoria del database.
Il database Airflow non è stato temporaneamente disponibile
Un worker Airflow potrebbe impiegare del tempo per rilevare e gestire in modo corretto gli errori intermittenti, ad esempio i problemi di connettività temporanei. Potrebbe superare la soglia di rilevamento degli zombie predefinita.
Scopri i timeout dell'heartbeat di Airflow
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Soluzioni:
Aumenta il timeout per le attività zombie e sostituisci il valore dell'opzione di configurazione di Airflow
[scheduler]scheduler_zombie_task_threshold
:Sezione Chiave Valore Note scheduler
scheduler_zombie_task_threshold
Nuovo timeout (in secondi) Il valore predefinito è 300
Risoluzione dei problemi relativi all'interruzione dell'istanza
Airflow utilizza il meccanismo di terminazione dell'istanza per arrestare le attività di Airflow. Questo meccanismo viene utilizzato nelle seguenti situazioni:
- Quando un programmatore termina un'attività che non è stata completata in tempo.
- Quando un'attività scade o viene eseguita per troppo tempo.
Quando Airflow termina le istanze di attività, puoi vedere le seguenti voci di log nei log di un worker di Airflow che ha eseguito l'attività:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Possibili soluzioni:
Controlla il codice dell'attività per verificare la presenza di errori che potrebbero causare un'esecuzione troppo lunga.
Aumenta la CPU e la memoria per i worker di Airflow, in modo che le attività vengano eseguite più velocemente.
Aumenta il valore dell'opzione
[celery_broker_transport_options]visibility-timeout
Configurazione Airflow.Di conseguenza, lo scheduler attende più a lungo il completamento di un'attività prima di considerarla un'attività zombie. Questa opzione è particolarmente utile per le attività che richiedono molto tempo e durano molte ore. Se il valore è troppo basso (ad esempio 3 ore), lo scheduler considera "bloccate" le attività in esecuzione per 5 o 6 ore (attività zombie).
Aumenta il valore dell'opzione di configurazione
[core]killed_task_cleanup_time
Airflow.Un valore più lungo offre più tempo ai worker Airflow per completare le attività in modo corretto. Se il valore è troppo basso, le attività Airflow potrebbero essere interrotte bruscamente, senza tempo sufficiente per completare il lavoro in modo corretto.
Risoluzione dei problemi relativi agli indicatori SIGTERM
Gli indicatori SIGTERM vengono utilizzati da Linux, Kubernetes, Airflow Scheduler e Celery per terminare i processi responsabili dell'esecuzione di worker o attività di Airflow.
Esistono diversi motivi per cui gli indicatori SIGTERM vengono inviati in un ambiente:
Un'attività è diventata un'attività zombie e deve essere interrotta.
Lo scheduler ha rilevato un duplicato di un'attività e invia all'attività gli indicatori di istanza in fase di terminazione e SIGTERM per interromperla.
In Scalabilità automatica pod orizzontale, il Control Plane di GKE invia segnali SIGTERM per rimuovere i pod che non sono più necessari.
Lo scheduler può inviare segnali SIGTERM al processo DagFileProcessorManager. Questi indicatori SIGTERM vengono utilizzati dal programmatore per gestire il ciclo di vita del processo DagFileProcessorManager e possono essere ignorati in tutta sicurezza.
Esempio:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condizione di gara tra il callback heartbeat e i callback di uscita in local_task_job, che monitora l'esecuzione dell'attività. Se il heartbeat rileva che un'attività è stata contrassegnata come riuscita, non può distinguere se l'attività stessa è riuscita o se è stato chiesto ad Airflow di considerarla riuscita. Tuttavia, terminerà un task runner senza attendere che esca.
Questi segnali SIGTERM possono essere ignorati in tutta sicurezza. L'attività è già nello stato di esito positivo e l'esecuzione dell'esecuzione del DAG nel suo complesso non sarà interessata.
La voce di log
Received SIGTERM.
è l'unica differenza tra l'uscita normale e l'interruzione dell'attività nello stato di esito positivo.Figura 2. Condizione di gara tra i callback heartbeat ed exit (fai clic per ingrandire) Un componente Airflow utilizza più risorse (CPU, memoria) di quanto consentito dal nodo del cluster.
Il servizio GKE esegue operazioni di manutenzione e invia segnali SIGTERM ai pod in esecuzione su un nodo di cui sta per essere eseguito l'upgrade.
Quando un'istanza di attività viene terminata con SIGTERM, puoi vedere le seguenti voci di log nei log di un worker Airflow che ha eseguito l'attività:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
Possibili soluzioni:
Questo problema si verifica quando una VM che esegue l'attività non dispone di memoria. Non è correlato alle configurazioni di Airflow, ma alla quantità di memoria disponibile per la VM.
In Cloud Composer 3, puoi assegnare più risorse di CPU e memoria ai worker Airflow.
Puoi abbassare il valore dell'opzione di configurazione di Airflow
[celery]worker_concurrency
concurrency. Questa opzione determina quante attività vengono eseguite contemporaneamente da un determinato worker Airflow.
Per ulteriori informazioni sull'ottimizzazione dell'ambiente, consulta Ottimizzazione di prestazioni e costi dell'ambiente.
Impatto delle operazioni di aggiornamento o upgrade sulle esecuzioni delle attività Airflow
Le operazioni di aggiornamento o upgrade interrompono le attività Airflow attualmente in esecuzione, a meno che un'attività non venga eseguita in modalità differibile.
Ti consigliamo di eseguire queste operazioni quando prevedi un impatto minimo sulle esecuzioni delle attività di Airflow e di configurare meccanismi di ripetizione appropriati nelle attività e nei DAG.
Risoluzione dei problemi relativi alle attività KubernetesExecutor
CeleryKubernetesExecutor è un tipo di executor in Cloud Composer 3 che può utilizzare contemporaneamente CeleryExecutor e KubernetesExecutor.
Per ulteriori informazioni sulla risoluzione dei problemi relativi alle attività eseguite con KubernetesExecutor, consulta la pagina Utilizzare CeleryKubernetesExecutor.
Problemi comuni
Le sezioni seguenti descrivono i sintomi e le potenziali correzioni di alcuni problemi comuni relativi ai DAG.
L'attività Airflow è stata interrotta da Negsignal.SIGKILL
A volte la tua attività potrebbe utilizzare più memoria di quella allocata al worker Airflow.
In questo caso, potrebbe essere interrotta da Negsignal.SIGKILL
. Il sistema invia questo segnale per evitare un ulteriore consumo di memoria che potrebbe influire sull'esecuzione di altre attività Airflow. Nel log del worker di Airflow potresti vedere
la seguente voce di log:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
potrebbe essere visualizzato anche come codice -9
.
Possibili soluzioni:
Riduci il
worker_concurrency
dei worker Airflow.Aumenta la quantità di memoria disponibile per i worker Airflow.
Gestisci le attività che richiedono molte risorse in Cloud Composer utilizzando KubernetesPodOperator o GKEStartPodOperator per isolamento delle attività e allocazione delle risorse personalizzata.
Ottimizza le attività per utilizzare meno memoria.
L'attività non riesce senza emettere log a causa di errori di analisi del DAG
A volte potrebbero verificarsi errori DAG sottili che portano a una situazione in cui lo scheduler di Airflow può pianificare le attività per l'esecuzione, il processore DAG può analizzare il file DAG, ma il worker di Airflow non riesce a eseguire le attività dal DAG perché ci sono errori di programmazione nel file DAG. Ciò potrebbe portare a una situazione in cui un'attività Airflow è contrassegnata come Failed
e non è presente alcun log della relativa esecuzione.
Soluzioni:
Verifica nei log del worker Airflow che non siano presenti errori generati dal worker Airflow relativi a un DAG mancante o a errori di analisi del DAG.
Aumenta i parametri relativi all'analisi del DAG:
Aumenta dagbag-import-timeout almeno a 120 secondi (o più, se necessario).
Aumenta dag-file-processor-timeout fino ad almeno 180 secondi (o più, se necessario). Questo valore deve essere superiore a
dagbag-import-timeout
.
Consulta anche Ispezione dei log del processore DAG.
L'attività non riesce senza emettere log a causa della pressione sulle risorse
Sintomo: durante l'esecuzione di un'attività, il sottoprocesso del worker di Airflow responsabile dell'esecuzione dell'attività di Airflow viene interrotto bruscamente. L'errore visibile nel log del worker di Airflow potrebbe essere simile al seguente:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Soluzione:
In Cloud Composer 3, aumenta i limiti di memoria per i worker Airflow.
Se il tuo ambiente genera anche attività zombie, consulta la sezione Risolvere i problemi relativi alle attività zombie.
Per un tutorial sul debug dei problemi di esaurimento della memoria, consulta Eseguire il debug dei problemi di esaurimento della memoria e dello spazio di archiviazione dei DAG.
L'attività non riesce a generare log a causa dell'eliminazione del pod
I pod di Google Kubernetes Engine sono soggetti al ciclo di vita dei pod Kubernetes e all'espulsione dei pod. I picchi di attività sono la causa più comune dell'espulsione dei pod in Cloud Composer.
L'espulsione dei pod può verificarsi quando un determinato pod utilizza eccessivamente le risorse di un nodo rispetto alle aspettative di consumo delle risorse configurate per il nodo. Ad esempio, l'espulsione potrebbe verificarsi quando in un pod vengono eseguite diverse attività che richiedono molta memoria e il loro carico combinato fa sì che il nodo in cui viene eseguito il pod superi il limite di consumo di memoria.
Se un pod di worker Airflow viene espulso, tutte le istanze di attività in esecuzione su quel pod vengono interrotte e in seguito contrassegnate come non riuscite da Airflow.
I log vengono memorizzati nella memoria intermedia. Se un pod di lavoro viene espulso prima dello svuotamento del buffer, i log non vengono emessi. L'errore di attività senza log indica che i worker di Airflow vengono riavviati a causa di un esaurimento della memoria (OOM). Alcuni log potrebbero essere presenti in Cloud Logging anche se i log di Airflow non sono stati emessi.
Per visualizzare i log:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log.
Visualizza i log dei singoli worker di Airflow in Tutti i log > Log di Airflow > Worker.
Soluzione:
Aumenta i limiti di memoria per i worker Airflow.
Assicurati che le attività nel DAG siano idempotenti e recuperabili.
Evita di scaricare file non necessari nel file system locale dei worker Airflow.
I worker di Airflow hanno una capacità limitata del file system locale. Un worker Airflow può avere da 1 GB a 10 GB di spazio di archiviazione. Quando lo spazio di archiviazione finisce, il pod worker Airflow viene espulso dal piano di controllo GKE. In questo modo, tutte le attività in esecuzione dal worker espulso non andranno a buon fine.
Esempi di operazioni problematiche:
- Scaricare file o oggetti e archiviarli localmente in un worker Airflow. Archivia invece questi oggetti direttamente in un servizio adatto, come un bucket Cloud Storage.
- Accesso a oggetti di grandi dimensioni nella cartella
/data
da un worker Airflow. Il worker Airflow scarica l'oggetto nel file system locale. Implementa invece i DAG in modo che i file di grandi dimensioni vengano elaborati al di fuori del pod di lavoro Airflow.
Timeout dell'importazione del caricamento del DAG
Sintomo:
- Nell'interfaccia web di Airflow, nella parte superiore della pagina dell'elenco dei DAG, un riquadro di avviso rosso mostra
Broken DAG: [/path/to/dagfile] Timeout
. In Cloud Monitoring: i log
airflow-scheduler
contengono voci simili a:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Correzione:
Esegui l'override dell'opzione di configurazione dag_file_processor_timeout
Airflow e concedi più tempo per l'analisi del DAG:
Sezione | Chiave | Valore |
---|---|---|
core |
dag_file_processor_timeout |
Nuovo valore di timeout |
L'esecuzione del DAG non termina entro il tempo previsto
Sintomo:
A volte un'esecuzione del DAG non termina perché le attività di Airflow si bloccano e l'esecuzione del DAG dura più a lungo del previsto. In condizioni normali, le attività Airflow non rimangono indefinitamente nello stato in coda o in esecuzione, perché Airflow dispone di procedure di timeout e pulizia che aiutano a evitare questa situazione.
Correzione:
Utilizza il parametro
dagrun_timeout
per i DAG. Ad esempio:dagrun_timeout=timedelta(minutes=120)
. Di conseguenza, ogni esecuzione del DAG deve essere completata entro il timeout dell'esecuzione del DAG. Le attività non completate sono contrassegnate comeFailed
oUpstream Failed
. Per ulteriori informazioni sugli stati delle attività di Airflow, consulta la documentazione di Apache Airflow.Utilizza il parametro timeout esecuzione attività per definire un timeout predefinito per le attività eseguite in base agli operatori Apache Airflow.
Le esecuzioni di DAG non vengono eseguite
Sintomo:
Quando una data di pianificazione per un DAG viene impostata in modo dinamico, possono verificarsi vari effetti collaterali inaspettati. Ad esempio:
L'esecuzione di un DAG è sempre nel futuro e il DAG non viene mai eseguito.
Le esecuzioni DAG passate sono contrassegnate come eseguite e riuscite nonostante non lo siano.
Per saperne di più, consulta la documentazione di Apache Airflow.
Possibili soluzioni:
Segui i consigli riportati nella documentazione di Apache Airflow.
Imposta
start_date
statico per i DAG. Come opzione, puoi utilizzarecatchup=False
per disattivare l'esecuzione del DAG per le date passate.Evita di utilizzare
datetime.now()
odays_ago(<number of days>)
, a meno che non sappia gli effetti collaterali di questo approccio.
Aumento del traffico di rete da e verso il database Airflow
La quantità di traffico di rete tra il cluster GKE del tuo ambiente e il database Airflow dipende dal numero di DAG, dal numero di attività nei DAG e dal modo in cui i DAG accedono ai dati nel database Airflow. I seguenti fattori potrebbero influire sull'utilizzo della rete:
Query al database Airflow. Se i tuoi DAG eseguono molte query, generano grandi quantità di traffico. Esempi: controllo dello stato delle attività prima di procedere con altre attività, query sulla tabella XCom, dumping dei contenuti del database Airflow.
Numero elevato di attività. Più attività sono da pianificare, maggiore è il traffico di rete generato. Questo vale sia per il numero totale di attività nei DAG sia per la frequenza di pianificazione. Quando lo scheduler Airflow pianifica le esecuzioni dei DAG, esegue query sul database Airflow e genera traffico.
L'interfaccia web di Airflow genera traffico di rete perché esegue query sul database Airflow. L'utilizzo intensivo di pagine con grafici, attività e diagrammi può generare grandi volumi di traffico di rete.
Il DAG arresta in modo anomalo il server web Airflow o ne causa il ritorno di un errore "502 Gateway Timeout"
Gli errori del server web possono verificarsi per diversi motivi. Controlla i log di airflow-webserver in Cloud Logging per determinare la causa dell'errore 502 gateway timeout
.
Gestione di un numero elevato di DAG e plug-in nelle cartelle DAG e plug-in
I contenuti delle cartelle /dags
e /plugins
vengono sincronizzati dal bucket del tuo ambiente ai file system locali dei worker e degli scheduler di Airflow.
Maggiore è la quantità di dati archiviati in queste cartelle, più tempo occorre per eseguire la sincronizzazione. Per risolvere queste situazioni:
Limita il numero di file nelle cartelle
/dags
e/plugins
. Archivia solo il minimo di file richiesti.Aumenta lo spazio su disco disponibile per gli scheduler e i worker di Airflow.
Aumenta la CPU e la memoria degli scheduler e dei worker di Airflow in modo che l'operazione di sincronizzazione venga eseguita più velocemente.
In caso di un numero molto elevato di DAG, suddividili in batch, comprimeli in archivi ZIP ed esegui il deployment di questi archivi nella cartella
/dags
. Questo approccio velocizza la procedura di sincronizzazione dei DAG. I componenti Airflow scompattano gli archivi ZIP prima di elaborare i DAG.La generazione di DAG in modo programmatico potrebbe anche essere un metodo per limitare il numero di file DAG archiviati nella cartella
/dags
. Consulta la sezione sui DAG programmatici per evitare problemi di pianificazione ed esecuzione dei DAG generati tramite programmazione.
Non pianificare contemporaneamente DAG generati tramite programmazione
La generazione di oggetti DAG tramite programmazione da un file DAG è un metodo efficiente per creare molti DAG simili con piccole differenze.
È importante non pianificare l'esecuzione di tutti questi DAG contemporaneamente. È molto probabile che i worker di Airflow non dispongano di risorse CPU e memoria sufficienti per eseguire tutte le attività pianificate contemporaneamente.
Per evitare problemi con la pianificazione dei DAG programmatici:
- Aumenta la concorrenza dei worker e esegui l'upgrade dell'ambiente in modo che possa eseguire più attività contemporaneamente.
- Genera i DAG in modo da distribuire le relative pianificazioni in modo uniforme nel tempo, per evitare di pianificare centinaia di attività contemporaneamente, in modo che i worker Airflow abbiano il tempo di eseguire tutte le attività pianificate.
Errore 504 durante l'accesso al server web Airflow
Consulta la sezione Errore 504 durante l'accesso all'interfaccia utente di Airflow.
Viene lanciata un'eccezione di perdita di connessione al server Postgres durante l'esecuzione dell'attività o subito dopo
Le eccezioni Lost connection to Postgres server during query
si verificano spesso quando si verificano le seguenti condizioni:
- Il DAG utilizza
PythonOperator
o un operatore personalizzato. - Il DAG esegue query sul database Airflow.
Se vengono eseguite più query da una funzione richiamabile, i traceback potrebbero indicare erroneamente la riga self.refresh_from_db(lock_for_update=True)
nel codice Airflow; si tratta della prima query del database dopo l'esecuzione dell'attività. La causa effettiva dell'eccezione si verifica prima, quando una sessione SQLAlchemy non viene chiusa correttamente.
Le sessioni SQLAlchemy sono limitate a un thread e create in una funzione callable. La sessione può essere successivamente continuata all'interno del codice Airflow. Se si verificano ritardi significativi tra le query all'interno di una sessione, la connessione potrebbe essere già stata chiusa dal server Postgres. Il timeout della connessione negli ambienti Cloud Composer è impostato su circa 10 minuti.
Soluzione:
- Utilizza il decoratore
airflow.utils.db.provide_session
. Questo decoratore fornisce una sessione valida al database Airflow nel parametrosession
e chiude correttamente la sessione alla fine della funzione. - Non utilizzare una singola funzione a lunga esecuzione. Sposta invece tutte le query
del database in funzioni separate, in modo che esistano più funzioni con
il decoratore
airflow.utils.db.provide_session
. In questo caso, le sessioni vengono chiuse automaticamente dopo il recupero dei risultati della query.
Controllo del tempo di esecuzione dei DAG, delle attività e delle esecuzioni parallele dello stesso DAG
Se vuoi controllare la durata di un'esecuzione singola di un DAG per un determinato DAG, puoi utilizzare il parametro DAG dagrun_timeout
. Ad esempio, se prevedi che un'esecuzione singola di un DAG (indipendentemente dal fatto che l'esecuzione termini con esito positivo o negativo) non debba durare più di un'ora, imposta questo parametro su 3600 secondi.
Puoi anche controllare la durata consentita per una singola attività Airflow. Per farlo, puoi utilizzare execution_timeout
.
Se vuoi controllare il numero di esecuzioni DAG attive per un determinato DAG, puoi utilizzare l'[core]max-active-runs-per-dag
opzione di configurazione di Airflow.
Se vuoi che in un determinato momento venga eseguita una sola istanza di un DAG, imposta il parametro max-active-runs-per-dag
su 1
.
Problemi che influiscono sulla sincronizzazione di DAG e plug-in con scheduler, worker e server web
Cloud Composer sincronizza i contenuti delle cartelle /dags
e /plugins
con gli scheduler e i worker. Alcuni oggetti nelle cartelle /dags
e /plugins
potrebbero impedire il corretto funzionamento di questa sincronizzazione o rallentarla.
La cartella
/dags
viene sincronizzata con gli scheduler e i worker.Questa cartella non è sincronizzata con il server web.
La cartella
/plugins
viene sincronizzata con gli scheduler, i worker e i server web.
Potresti riscontrare i seguenti problemi:
Hai caricato file compressi con gzip che utilizzano la transcodifica di compressione nelle cartelle
/dags
e/plugins
. Questo accade in genere se utilizzi il flag--gzip-local-all
in ungcloud storage cp
comando per caricare i dati nel bucket.Soluzione: elimina l'oggetto che ha utilizzato la transcodifica di compressione e ricaricalo nel bucket.
Uno degli oggetti è denominato ".": questo tipo di oggetto non viene sincronizzato con gli pianificatori e i worker e potrebbe interrompere completamente la sincronizzazione.
Soluzione: rinomina l'oggetto.
Una cartella e un file Python DAG hanno gli stessi nomi, ad esempio
a.py
. In questo caso, il file DAG non è sincronizzato correttamente con i componenti di Airflow.Soluzione: rimuovi la cartella con lo stesso nome del file Python del DAG.
Uno degli oggetti nelle cartelle
/dags
o/plugins
contiene un simbolo/
alla fine del nome dell'oggetto. Questi oggetti possono interferire con il processo di sincronizzazione perché il simbolo/
indica che un oggetto è una cartella, non un file.Soluzione: rimuovi il simbolo
/
dal nome dell'oggetto problematico.Non archiviare file non necessari nelle cartelle
/dags
e/plugins
.A volte i DAG e i plug-in che implementi sono dotati di file aggiuntivi, come i file che memorizzano i test per questi componenti. Questi file vengono sincronizzati con i worker e gli scheduler e influiscono sul tempo necessario per copiarli in questi componenti.
Soluzione: non archiviare file aggiuntivi e non necessari nelle cartelle
/dags
e/plugins
.
Fine [Errno 21] È una directory: l'errore "/home/airflow/gcs/dags/..." viene generato dagli scheduler e dai worker
Questo problema si verifica perché gli oggetti possono avere uno spazio dei nomi sovrapposto in Cloud Storage, mentre allo stesso tempo gli schedulatori e i worker utilizzano file system tradizionali. Ad esempio, è possibile aggiungere sia una cartella sia un oggetto con lo stesso nome al bucket di un ambiente. Quando il bucket viene sincronizzato con gli scheduler e i worker dell'ambiente, viene generato questo errore, che può causare errori nelle attività.
Per risolvere il problema, assicurati che non ci siano spazi dei nomi sovrapposti nel
bucket dell'ambiente. Ad esempio, se sia /dags/misc
(un file) sia
/dags/misc/example_file.txt
(un altro file) si trovano in un bucket, viene
generato un errore dal programmatore.
interruzioni transitorie durante la connessione al database dei metadati Airflow
Cloud Composer viene eseguito su un'infrastruttura distribuita. Ciò significa che di tanto in tanto potrebbero verificarsi alcuni problemi temporanei che potrebbero interrompere l'esecuzione delle attività Airflow.
In questi casi, nei log dei worker di Airflow potresti visualizzare i seguenti messaggi di errore:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
o
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Questi problemi intermittenti potrebbero essere causati anche da operazioni di manutenzione eseguite per gli ambienti Cloud Composer.
In genere questi errori sono intermittenti e, se le attività Airflow sono idempotenti e hai configurato i tentativi di nuovo, non ti interessano. Puoi anche definire periodi di manutenzione.
Un altro motivo di questi errori potrebbe essere la mancanza di risorse nel cluster del tuo ambiente. In questi casi, puoi eseguire lo scaling o ottimizzare il tuo ambiente come descritto nelle istruzioni su come eseguire lo scaling degli ambienti o ottimizzare l'ambiente.
Un'esecuzione di DAG è contrassegnata come riuscita, ma non contiene attività eseguite
Se un'esecuzione di DAG execution_date
è precedente a start_date
del DAG, potresti visualizzare esecuzioni di DAG che non hanno esecuzioni di attività, ma sono comunque contrassegnate come riuscite.

Causa
Questa situazione potrebbe verificarsi in uno dei seguenti casi:
La mancata corrispondenza è causata dalla differenza di fuso orario tra
execution_date
estart_date
del DAG. Ciò può accadere, ad esempio, quando si utilizzapendulum.parse(...)
per impostarestart_date
.start_date
del DAG è impostato su un valore dinamico, ad esempioairflow.utils.dates.days_ago(1)
Soluzione:
Assicurati che
execution_date
estart_date
utilizzino lo stesso fuso orario.Specifica un
start_date
statico e combinalo concatchup=False
per evitare di eseguire DAG con date di inizio passate.
Un DAG non è visibile nell'interfaccia utente di Airflow o nell'interfaccia utente del DAG e lo scheduler non lo pianifica
Il processore DAG analizza ogni DAG prima che possa essere programmato dall'scheduler e prima che un DAG diventi visibile nell'interfaccia utente di Airflow o nell'interfaccia utente DAG.
Le seguenti opzioni di configurazione di Airflow definiscono i timeout per l'analisi dei DAG:
[core]dagrun_import_timeout
definisce il tempo a disposizione del processore DAG per analizzare un singolo DAG.[core]dag_file_processor_timeout
definisce il tempo totale che il processore DAG può impiegare per analizzare tutti i DAG.
Se un DAG non è visibile nell'interfaccia utente di Airflow o nell'interfaccia utente del DAG:
Controlla i log del processore DAG per verificare se è in grado di elaborare correttamente il DAG. In caso di problemi, potresti visualizzare le seguenti voci di log nei log del programmatore o del processore DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for /usr/local/airflow/dags/example_dag.py with PID 21903 started at 2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
Controlla i log del programmatore per verificare che funzioni correttamente. In caso di problemi, nei log dell'organizzatore potresti visualizzare le seguenti voci di log:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it Process timed out, PID: 68496
Soluzioni:
Correggi tutti gli errori di analisi del DAG. Il processore DAG analizza più DAG e, in alcuni casi rari, gli errori di analisi di un DAG possono influire negativamente sull'analisi di altri DAG.
Se l'analisi del DAG richiede più secondi rispetto a quelli definiti in
[core]dagrun_import_timeout
, aumenta questo timeout.Se l'analisi di tutti i DAG richiede più secondi di quelli definiti in
[core]dag_file_processor_timeout
, aumenta questo timeout.Se l'analisi del DAG richiede molto tempo, può anche significare che non è implementato in modo ottimale. Ad esempio, se legge molte variabili di ambiente o esegue chiamate a servizi esterni o al database Airflow. Nella misura del possibile, evita di eseguire queste operazioni nelle sezioni globali dei DAG.
Aumenta le risorse di CPU e memoria per il processore DAG in modo che possa funzionare più velocemente.
Sintomi di un carico elevato del database Airflow
Per ulteriori informazioni, consulta Sintomi di un carico elevato del database Airflow.
Passaggi successivi
- Risoluzione dei problemi relativi all'installazione del pacchetto PyPI
- Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti