Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina fornisce informazioni e passaggi per la risoluzione dei problemi comuni relativi al flusso di lavoro.
Molti problemi di esecuzione dei DAG sono causati da prestazioni dell'ambiente non ottimali. Puoi ottimizzare il tuo ambiente Cloud Composer 2 seguendo lo strumento di ottimizzazione le prestazioni e i costi dell'ambiente.
Alcuni problemi di esecuzione dei DAG potrebbero essere causati dallo scheduler di Airflow non funziona correttamente o in modo ottimale. Segui le istruzioni per la risoluzione dei problemi relativi all'agente di pianificazione per risolvere questi problemi.
Flusso di lavoro per la risoluzione dei problemi
Per iniziare la risoluzione dei problemi:
Controlla i log di Airflow.
Puoi aumentare il livello di logging di Airflow eseguendo l'override del parametro seguendo l'opzione di configurazione Airflow.
Airflow 2
Sezione Chiave Valore logging
logging_level
Il valore predefinito è INFO
. Imposta suDEBUG
per ottenere un livello di dettaglio maggiore nei messaggi di log.Airflow 1
Sezione Chiave Valore core
logging_level
Il valore predefinito è INFO
. Imposta il valoreDEBUG
per aumentare il livello di dettaglio nei messaggi di log.Controlla la dashboard di monitoraggio.
Esamina Cloud Monitoring.
Nella console Google Cloud, verifica la presenza di errori nelle pagine per i componenti del tuo ambiente.
Nell'interfaccia web di Airflow, controlla nella vista grafico del DAG istanze di attività non riuscite.
Sezione Chiave Valore webserver
dag_orientation
LR
,TB
,RL
oBT
Debug degli errori dell'operatore
Per eseguire il debug di un errore di un operatore:
- Verifica la presenza di errori specifici delle attività.
- Controlla i log di Airflow.
- Esamina Cloud Monitoring.
- Controlla i log specifici dell'operatore.
- Correggi gli errori.
- Carica il DAG nella cartella
dags/
. - Nell'interfaccia web di Airflow, cancellare gli stati passati per il DAG.
- Riprendi o esegui il DAG.
Risoluzione dei problemi di esecuzione dell'attività
Airflow è un sistema distribuito con molte entità come lo scheduler, l'executor e i worker che comunicano tra loro tramite una coda di attività e il database Airflow e inviano indicatori (come SIGTERM). Il seguente diagramma mostra una panoramica delle interconnessioni tra i componenti di Airflow.
In un sistema distribuito come Airflow, potrebbe esserci una certa connettività di rete o l'infrastruttura sottostante potrebbe riscontrare problemi intermittenti. Ciò può portare a situazioni in cui le attività possono non riuscire ed essere riprogrammate esecuzione o le attività potrebbero non essere state completate correttamente (ad esempio, o attività bloccate in esecuzione). Airflow ha meccanismi per gestire situazioni di questo tipo e riprende automaticamente il normale funzionamento. Le sezioni seguenti spiegano i problemi comuni che si verificano durante l'esecuzione delle attività da parte di Airflow: attività zombie, pillole velenose e indicatori SIGTERM.
Risolvere i problemi relativi alle attività zombie
Airflow rileva due tipi di mancata corrispondenza tra un'attività e un processo che la esegue:
Le attività zombie sono attività che dovrebbero essere in esecuzione, ma non lo sono. Questo può accadere se il processo dell'attività è stato interrotto o non viene risponde, se il worker Airflow non ha segnalato uno stato dell'attività in tempo perché è sovraccarico o se la VM in cui viene eseguita l'attività è stata arrestata. Airflow le trova periodicamente e le esegue o le ritenta, a seconda delle impostazioni.
Scoprire le attività zombie
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
Le attività non terminate sono attività che non dovrebbero essere in esecuzione. Airflow rileva tali attività periodicamente e le termina.
I motivi e le soluzioni più comuni per le attività Zombie sono elencati di seguito.
Il worker Airflow ha esaurito la memoria
Ogni worker Airflow può eseguire fino a [celery]worker_concurrency
istanze di attività
contemporaneamente. Se un consumo cumulativo di memoria di queste istanze di attività
supera il limite di memoria per un worker Airflow, su di esso verrà eseguito un processo casuale
è stato interrotto per liberare risorse.
Scoprire gli eventi di esaurimento memoria dei worker di Airflow
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
Soluzioni:
Ottimizzare le attività per utilizzare meno memoria, ad esempio mediante evitare codice di primo livello;
Nelle versioni di Cloud Composer 2 precedenti alla 2.6.0, aggiorna
[celery]worker_concurrency
utilizzando la formula attuale se questo valore è inferiore.In Cloud Composer 2, utilizza gli override della configurazione di Airflow di mantenere
[celery]worker_concurrency
e aumentare la memoria per i worker di Airflow;In Cloud Composer 1, esegui l'upgrade a un tipo di macchina più grande.
Riduci
[celery]worker_concurrency
.
Il worker Airflow è stato rimosso
Le eliminazioni dei pod sono una parte normale dell'esecuzione di carichi di lavoro su Kubernetes. GKE esegue l'espulsione dei pod se non è più disponibile spazio di archiviazione o per liberare risorse per i carichi di lavoro con una priorità più elevata.
Scopri le eliminazioni dei worker di Airflow
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
Soluzioni:
- Se un'espulsione è causata dalla mancanza di spazio di archiviazione, puoi ridurre l'utilizzo dello spazio o rimuovere i file temporanei non appena non sono più necessari.
In alternativa, puoi
aumenta lo spazio di archiviazione disponibile oppure esegui
carichi di lavoro in un pod dedicato utilizzando
KubernetesPodOperator
.
Il worker Airflow è stato terminato
I worker di Airflow potrebbero essere rimossi esternamente. Se le attività in esecuzione non vengono completate durante un periodo di interruzione, verranno interrotte e potrebbero essere rilevate come zombie.
Scopri le terminazioni dei pod dei worker di Airflow
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
Scenari e soluzioni possibili:
I worker di Airflow vengono riavviati durante le modifiche dell'ambiente, ad esempio upgrade o installazione di pacchetti:
Scoprire le modifiche all'ambiente Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
Puoi eseguire queste operazioni quando non sono in esecuzione attività critiche oppure devi abilitare e nuovi tentativi in attività.
Vari componenti potrebbero essere temporaneamente non disponibili durante la manutenzione operazioni:
Scopri le operazioni di manutenzione di GKE
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
Puoi specificare i periodi di manutenzione per ridurre al minimo le sovrapposizioni con l'esecuzione delle attività critiche.
Nelle versioni di Cloud Composer 2 precedenti alla 2.4.5, viene eseguita una terminazione di Airflow il worker potrebbe ignorare il segnale SIGTERM e continuare a eseguire attività:
Scopri il scale down tramite la scalabilità automatica di Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker-set") textPayload:"Workers deleted"
Puoi eseguire l'upgrade a una versione successiva di Cloud Composer in cui questo problema è stato risolto.
Il worker di Airflow era sottoposto a un carico elevato
La quantità di risorse di CPU e memoria disponibile per un worker Airflow è limitata in base alla configurazione dell'ambiente. Se un utilizzo si avvicina ai limiti, causerebbe un conflitto di risorse e ritardi inutili durante l'attività dell'esecuzione. In situazioni estreme, quando mancano le risorse per periodi più lunghi, del tempo, questo potrebbe causare attività zombie.
Soluzioni:
- Monitorare l'utilizzo di CPU e memoria dei worker e regolarlo per evitare superiore all'80%.
Il database Airflow era sotto un carico elevato
Un database viene utilizzato da vari componenti di Airflow per comunicare tra loro e, in particolare, per memorizzare gli heartbeat delle istanze di attività. La scarsità di risorse nel database comporterà tempi di query più lunghi e potrebbe influire sull'esecuzione di un'attività.
Soluzioni:
Il database Airflow non è stato temporaneamente disponibile
Un worker Airflow potrebbe impiegare del tempo per rilevare e gestire in modo corretto gli errori intermittenti, ad esempio i problemi di connettività temporanei. Potrebbe superare la soglia di rilevamento degli zombie predefinita.
Scopri i timeout dell'heartbeat di Airflow
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Soluzioni:
Aumenta il timeout per le attività zombie e sostituisci il valore dell'opzione di configurazione di Airflow
[scheduler]scheduler_zombie_task_threshold
:Sezione Chiave Valore Note scheduler
scheduler_zombie_task_threshold
Nuovi timeout (tra secondi) Il valore predefinito il valore è 300
Risoluzione dei problemi della pillola velenosa
La pillola velenosa è un meccanismo utilizzato da Airflow per arrestare le attività Airflow.
Airflow utilizza la pillola velenosa nelle seguenti situazioni:
- Quando un programmatore termina un'attività che non è stata completata in tempo.
- Quando un'attività scade o viene eseguita per troppo tempo.
Quando Airflow utilizza la pillola avvelenata, puoi vedere le seguenti voci di log nei log di un worker Airflow che ha eseguito l'attività:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Possibili soluzioni:
- Verifica che nel codice dell'attività non siano presenti errori che potrebbero causare un'esecuzione troppo lunga.
- (Cloud Composer 2) Aumenta la CPU e la memoria per i worker Airflow, in modo che le attività vengano eseguite più velocemente.
Aumenta il valore del parametro Configurazione Airflow
[celery_broker_transport_options]visibility-timeout
.Di conseguenza, lo scheduler attende più tempo per il completamento di un'attività, prima di considerare l'attività come un compito zombie. Questa opzione è particolarmente e sono utili per attività dispendiose in termini di tempo che durano molte ore. Se il valore è troppo basso (ad esempio 3 ore), lo scheduler considera "bloccate" le attività in esecuzione per 5 o 6 ore (attività zombie).
Aumenta il valore dell'opzione di configurazione
[core]killed_task_cleanup_time
Airflow.Un valore più lungo offre più tempo ai worker Airflow per completare le attività in modo corretto. Se il valore è troppo basso, le attività Airflow potrebbero essere interrotte all'improvviso, senza abbastanza tempo per finire il lavoro con grazia.
Risoluzione dei problemi relativi ai segnali SIGTERM
Gli indicatori SIGTERM vengono utilizzati da Linux, Kubernetes, Airflow Scheduler e Celery per terminare i processi responsabili dell'esecuzione di worker o attività Airflow.
I motivi per cui gli indicatori SIGTERM vengono inviati in un ambiente potrebbero essere diversi:
Un'attività è diventata un'attività zombie e deve essere interrotta.
Lo scheduler ha scoperto un duplicato di un'attività e invia la pillola velenosa e SIGTERM segnala all'attività di interromperla.
In Scalabilità automatica orizzontale dei pod, GKE Il piano di controllo invia indicatori SIGTERM per rimuovere i pod che non sono più necessaria.
Lo scheduler può inviare segnali SIGTERM al processo DagFileProcessorManager. Questi indicatori SIGTERM vengono utilizzati dal programmatore per gestire il ciclo di vita del processo DagFileProcessorManager e possono essere ignorati in tutta sicurezza.
Esempio:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condizione di gara tra il callback heartbeat e i callback di uscita in local_task_job, che monitora l'esecuzione dell'attività. Se il battito cardiaco rileva che un'attività è stata contrassegnata come completata, non può distinguere se l'attività stessa è riuscita o che Airflow è stato detto di prendere in considerazione l'attività riuscito. In ogni caso, un esecutore dell'attività verrà terminato senza attendere per uscire.
Questi segnali SIGTERM possono essere ignorati in tutta sicurezza. L'attività è già in e l'esecuzione del DAG nel suo insieme non sarà interessati.
La voce di log
Received SIGTERM.
è l'unica differenza tra le normali l'uscita e l'arresto dell'attività con stato riuscito.Un componente Airflow utilizza più risorse (CPU, memoria) di quanto consentito dal nodo del cluster.
Il servizio GKE esegue operazioni di manutenzione invia indicatori SIGTERM ai pod in esecuzione su un nodo che sta per essere sottoposto ad upgrade. Quando un'istanza di attività viene terminata con SIGTERM, puoi visualizzare il seguente log voci nei log di un worker Airflow che ha eseguito l'attività:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Possibili soluzioni:
Questo problema si verifica quando una VM che esegue l'attività esaurisce la memoria. Questo non è correlato alle configurazioni di Airflow, ma alla quantità di memoria disponibile per la VM.
L'aumento della memoria dipende dalla versione di Cloud Composer che utilizzi. Ad esempio:
In Cloud Composer 2, puoi assegnare più risorse di CPU e memoria ad Airflow worker.
Nel caso di Cloud Composer 1, puoi ricreare l'ambiente utilizzando un tipo di macchina con prestazioni superiori.
In entrambe le versioni di Cloud Composer, puoi abbassare il valore dell'opzione di configurazione di Airflow
[celery]worker_concurrency
concurrency. Questa opzione determina quante attività vengono eseguite contemporaneamente da un determinato worker Airflow.
Per saperne di più sull'ottimizzazione del tuo ambiente Cloud Composer 2, vedi Ottimizza le prestazioni e i costi dell'ambiente
Query di Cloud Logging per scoprire i motivi dei riavvii o delle eliminazioni dei pod
Gli ambienti di Cloud Composer utilizzano i cluster GKE come livello di infrastruttura di calcolo. In questa sezione potrai trovare query utili che ti aiuteranno a trova motivi per i riavvii o le eliminazioni del worker o dello scheduler Airflow.
Le query presentate di seguito potrebbero essere ottimizzate nel seguente modo:
puoi specificare la sequenza temporale che ti interessa in Cloud Logging, ad esempio le ultime 6 ore, 3 giorni o puoi definire un intervallo di tempo personalizzato.
devi specificare il valore CLUSTER_NAME di Cloud Composer
puoi anche limitare la ricerca a un pod specifico aggiungendo POD_NAME
Scopri i container riavviati
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Query alternativa per limitare i risultati a un pod specifico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Scoprire l'arresto dei container a seguito di un evento di esaurimento della memoria
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Query alternativa per limitare i risultati a un pod specifico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Scoprire i container che hanno interrotto l'esecuzione
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Query alternativa per limitare i risultati a un pod specifico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Impatto delle operazioni di aggiornamento o upgrade sulle esecuzioni delle attività Airflow
Le operazioni di aggiornamento o upgrade interrompono attualmente l'esecuzione delle attività Airflow in corso a meno che un'attività non venga eseguita in modalità decriptabile.
Ti consigliamo di eseguire queste operazioni quando prevedi un impatto minimo sulle esecuzioni delle attività Airflow e configura i meccanismi adeguati per i nuovi tentativi e i DAG e le attività.
Risoluzione dei problemi delle attività di KubernetesExecutor
CeleryKubernetesExecutor è un tipo di esecutore in Cloud Composer 3 che possono usare CeleryExecutor e KubernetesExecutor contemporaneamente nel tempo.
Per ulteriori informazioni sulla risoluzione dei problemi relativi alle attività eseguite con KubernetesExecutor, consulta la pagina Utilizzare CeleryKubernetesExecutor.
Problemi comuni
Le sezioni seguenti descrivono i sintomi e le potenziali correzioni di alcuni problemi comuni relativi ai DAG.
L'attività Airflow è stata interrotta da Negsignal.SIGKILL
A volte la tua attività potrebbe utilizzare più memoria di quella allocata al worker Airflow.
In questo caso, potrebbe essere interrotto da Negsignal.SIGKILL
. Il sistema invia questo segnale per evitare un ulteriore consumo di memoria che potrebbe influire sull'esecuzione di altre attività Airflow. Nel log del worker di Airflow potresti vedere
la seguente voce di log:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
potrebbe essere visualizzato anche come codice -9
.
Possibili soluzioni:
worker_concurrency
di worker Airflow inferiori.Nel caso di Cloud Composer 2, aumenta la memoria dei worker di Airflow.
Nel caso di Cloud Composer 1, esegui l'upgrade a un tipo di macchina più grande utilizzato in di un cluster Cloud Composer.
Ottimizza le attività per utilizzare meno memoria.
Gestisci le attività ad alta intensità di risorse in Cloud Composer utilizzando KubernetesPodOperator o GKEStartPodOperator per l'isolamento delle attività e l'allocazione personalizzata delle risorse.
L'attività non riesce senza emettere log a causa di errori di analisi DAG
A volte potrebbero verificarsi errori DAG sottili che portano a una situazione in cui un programmatore Airflow e un processore DAG sono in grado di pianificare le attività per l'esecuzione e di analizzare un file DAG (rispettivamente), ma il worker Airflow non riesce a eseguire le attività da un DAG di questo tipo perché sono presenti errori di programmazione nel file DAG Python. Ciò potrebbe portare a una situazione in cui un'attività Airflow è contrassegnata come Failed
e non è presente alcun log della relativa esecuzione.
Soluzioni:
Verifica nei log del worker Airflow che non siano presenti errori generati dal worker Airflow relativi a DAG mancanti o errori di analisi del DAG.
Aumenta i parametri relativi all'analisi dei DAG:
Aumenta dagbag-import-timeout ad almeno 120 secondi (o più, se necessario).
Aumenta dag-file-processor-timeout almeno 180 secondi (o più, se necessario). Questo valore deve essere superiore a
dagbag-import-timeout
.
Vedi anche Ispezione dei log del processore DAG.
L'attività non riesce senza inviare log a causa della pressione delle risorse
Sintomo: durante l'esecuzione di un'attività, il processo secondario del worker Airflow è responsabile per l'esecuzione delle attività Airflow viene interrotta bruscamente. L'errore visibile nel log del worker di Airflow potrebbe essere simile al seguente:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Soluzione:
- In Cloud Composer 1, crea un nuovo ambiente con
un tipo di macchina più grande di quello attuale
di testo. Valuta la possibilità di aggiungere più nodi al tuo ambiente e un livello inferiore di
[celery]worker_concurrency
per i worker. - In Cloud Composer 2, aumenta i limiti di memoria per i worker Airflow.
- Se il tuo ambiente genera anche attività zombie, consulta la sezione Risoluzione dei problemi relativi alle attività zombie.
- Per un tutorial sul debug dei problemi di memoria insufficiente, vedi Esegui il debug dei problemi DAG legati all'esaurimento o all'esaurimento dello spazio di archiviazione.
L'attività non riesce senza inviare log a causa dell'eliminazione dei pod
I pod di Google Kubernetes Engine sono soggetti alle Ciclo di vita dei pod di Kubernetes ed eliminazione dei pod. Attività i picchi e la co-pianificazione dei worker sono le due cause più comuni dell'eliminazione dei pod in Cloud Composer.
L'espulsione dei pod può verificarsi quando un determinato pod utilizza eccessivamente le risorse di un nodo rispetto alle aspettative di consumo delle risorse configurate per il nodo. Ad esempio, l'espulsione potrebbe verificarsi quando in un pod vengono eseguite diverse attività che richiedono molta memoria e il loro carico combinato fa sì che il nodo in cui viene eseguito il pod superi il limite di consumo di memoria.
Se un pod worker Airflow viene rimosso, tutte le istanze delle attività in esecuzione su quel pod pod vengono interrotti e successivamente contrassegnati come non riusciti da Airflow.
I log vengono inseriti nel buffer. Se un pod worker viene rimosso prima che il buffer venga svuotato, i log non vengono emesse. L'errore di attività senza log indica che i worker di Airflow vengono riavviati a causa di un esaurimento della memoria (OOM). Alcuni log potrebbero essere presenti in Cloud Logging anche se i log di Airflow non sono stati emessi.
Per visualizzare i log:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log.
Visualizza i log dei singoli worker in Tutti i log -> Log Airflow -> Worker -> (singolo worker).
L'esecuzione del DAG è limitata alla memoria. L'esecuzione di ogni attività inizia con due Airflow di elaborazione delle attività: esecuzione e monitoraggio delle attività. Ogni nodo può gestire fino a 6 attività contemporaneamente (circa 12 processi caricati con i moduli Airflow). A seconda della natura del DAG, è possibile consumare più memoria.
Sintomo:
Nella console Google Cloud, vai alla pagina Carichi di lavoro.
Se ci sono pod
airflow-worker
che mostranoEvicted
, fai clic su ogni pod espulso e cerca il messaggioThe node was low on resource: memory
nella parte superiore della finestra.
Correzione:
- In Cloud Composer 1, crea un nuovo ambiente Cloud Composer con un tipo di macchina più grande di quello attuale di testo.
- In Cloud Composer 2, aumenta i limiti di memoria per i worker di Airflow.
- Controlla i log dei pod
airflow-worker
per individuare possibili cause dell'eliminazione. Per maggiori informazioni sul recupero dei log dai singoli pod, consulta Risoluzione dei problemi relativi ai carichi di lavoro di cui è stato eseguito il deployment. - Assicurati che le attività nel DAG siano idempotenti e che sia possibile recuperare.
Evita di scaricare file non necessari nel file system locale dei worker di Airflow.
I worker di Airflow hanno una capacità del file system locale limitata. Ad esempio, nel Cloud Composer 2, un worker può avere da 1 GB a 10 GB di spazio di archiviazione. Quando lo spazio di archiviazione finisce, il pod worker di Airflow viene espulso dal piano di controllo GKE. Questa operazione non riesce a tutte le attività rimosse worker era in esecuzione.
Esempi di operazioni problematiche:
- Download di file o oggetti e archiviazione locale in un flusso Airflow worker. Archivia invece questi oggetti direttamente in un servizio adatto, ad esempio un bucket Cloud Storage.
- Accesso a oggetti di grandi dimensioni nella cartella
/data
da un worker Airflow. Il worker Airflow scarica l'oggetto nel file system locale. Implementa invece i DAG in modo che i file di grandi dimensioni vengano elaborati all'esterno del pod di lavoro Airflow.
Timeout importazione caricamento DAG
Sintomo:
- Nell'interfaccia web di Airflow, nella parte superiore della pagina dell'elenco dei DAG, un riquadro di avviso rosso mostra
Broken DAG: [/path/to/dagfile] Timeout
. In Cloud Monitoring: i log
airflow-scheduler
contengono voci simili a:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Correzione:
Esegui l'override di dag_file_processor_timeout
Airflow
di configurazione e concedere più tempo per l'analisi dei DAG:
Sezione | Chiave | Valore |
---|---|---|
core |
dag_file_processor_timeout |
Nuovo valore di timeout |
L'esecuzione del DAG non termina entro il tempo previsto
Sintomo:
A volte un'esecuzione del DAG non termina perché le attività di Airflow si bloccano e l'esecuzione del DAG dura più a lungo del previsto. In condizioni normali, le attività Airflow non rimangono indefinitamente nello stato in coda o in esecuzione, perché Airflow dispone di procedure di timeout e pulizia che aiutano a evitare questa situazione.
Risoluzione:
Utilizza il parametro
dagrun_timeout
per i DAG. Ad esempio:dagrun_timeout=timedelta(minutes=120)
. Di conseguenza, ogni esecuzione del DAG deve essere completata entro il timeout dell'esecuzione del DAG e le attività non completate devono essere contrassegnate comeFailed
oUpstream Failed
. Per ulteriori informazioni sugli stati delle attività di Airflow, consulta la documentazione di Apache Airflow.Utilizza il parametro timeout esecuzione attività per definire un timeout predefinito per le attività eseguite in base agli operatori Apache Airflow.
Esecuzioni di DAG non eseguite
Sintomo:
Quando una data di pianificazione per un DAG viene impostata in modo dinamico, possono verificarsi vari effetti collaterali imprevisti. Ad esempio:
L'esecuzione di un DAG è sempre nel futuro e il DAG non viene mai eseguito.
Le esecuzioni DAG passate sono contrassegnate come eseguite e riuscite nonostante non lo siano.
Ulteriori informazioni sono disponibili nella documentazione di Apache Airflow.
Correzione:
Segui i consigli riportati nella documentazione di Apache Airflow.
Imposta
start_date
statico per i DAG. Come opzione, puoi utilizzarecatchup=False
per disattivare l'esecuzione del DAG per le date passate.Evita di utilizzare
datetime.now()
odays_ago(<number of days>)
, a meno che non sappia gli effetti collaterali di questo approccio.
Aumento del traffico di rete da e verso il database Airflow
La quantità di traffico della rete tra i cluster GKE del tuo ambiente cluster e il database Airflow dipende dal numero di DAG, dal numero le attività nei DAG e il modo in cui i DAG accedono ai dati nel database Airflow. La che possono influire sull'utilizzo della rete:
Query al database Airflow. Se i tuoi DAG eseguono molte query, generano grandi quantità di traffico. Esempi: controllo dello stato delle attività prima di procedere con altre attività, query sulla tabella XCom, dumping dei contenuti del database Airflow.
Numero elevato di attività. Più attività ci sono da pianificare, più generato il traffico di rete. Questo vale sia per il numero totale di attività nei DAG sia per la frequenza di pianificazione. Quando lo scheduler Airflow pianifica le esecuzioni dei DAG, esegue query sul database Airflow e genera traffico.
L'interfaccia web di Airflow genera traffico di rete perché esegue query il database Airflow. Usare in modo intensivo pagine con grafici, attività e possono generare grandi volumi di traffico di rete.
Il DAG fa arrestare in modo anomalo il server web Airflow o ne causa il ritorno di un errore 502 gateway timeout
Gli errori del server web possono verificarsi per diversi motivi. Controlla i log di airflow-webserver in Cloud Logging per determinare la causa dell'errore 502 gateway timeout
.
Calcolo pesante
Questa sezione si applica solo a Cloud Composer 1.
Evita di eseguire calcoli pesanti al momento dell'analisi del DAG.
A differenza dei nodi worker e di pianificazione, i cui tipi di macchine possono essere personalizzati per avere una maggiore capacità di CPU e memoria, il server web utilizza un tipo di macchina fisso, che può causare errori di analisi del DAG se il calcolo in fase di analisi è troppo pesante.
Tieni presente che il server web ha 2 vCPU e 2 GB di memoria.
Il valore predefinito per core-dagbag_import_timeout
è 30 secondi. Questo valore di timeout definisce il limite massimo di tempo impiegato da Airflow per caricare un modulo Python nella cartella dags/
.
Autorizzazioni errate
Questa sezione riguarda solo Cloud Composer 1.
Il server web non viene eseguito con lo stesso account di servizio dei worker e dello schedulatore. Di conseguenza, i worker e lo scheduler potrebbero essere in grado di accedere alle risorse gestite dall'utente a cui il server web non può accedere.
Ti consigliamo di evitare di accedere a risorse non pubbliche durante
Analisi dei DAG. A volte è inevitabile e dovrete concedere
le autorizzazioni all'account di servizio del server web. Il servizio
deriva dal dominio del server web. Ad esempio, se il dominio è example-tp.appspot.com
, l'account di servizio è example-tp@appspot.gserviceaccount.com
.
Errori DAG
Questa sezione riguarda solo Cloud Composer 1.
Il server web viene eseguito su App Engine ed è separato da
al cluster GKE dell'ambiente. Il server web analizza i file di definizione del DAG e può verificarsi un 502 gateway timeout
se sono presenti errori nel DAG. Airflow funziona normalmente senza un server web funzionale se il DAG problematico non interrompe i processi in esecuzione in GKE.
In questo caso, puoi utilizzare gcloud composer environments run
per recuperare dettagli dal tuo ambiente e come soluzione alternativa se il server web diventa non disponibile.
In altri casi, puoi eseguire l'analisi dei DAG in GKE e cercare DAG che generano eccezioni irreversibili di Python o timeout (il valore predefinito è 30 secondi). Per risolvere il problema, connettiti a una shell remota in un contenitore di worker Airflow e controlla la presenza di errori di sintassi. Per ulteriori informazioni, consulta la sezione Testare i DAG.
Gestione di un numero elevato di DAG e plug-in nelle cartelle dag e plug-in
I contenuti delle cartelle /dags
e /plugins
vengono sincronizzati dal bucket del tuo ambiente ai file system locali dei worker e degli scheduler di Airflow.
Maggiore è la quantità di dati archiviati in queste cartelle, più tempo occorre per eseguire la sincronizzazione. Per risolvere queste situazioni:
Limita il numero di file nelle cartelle
/dags
e/plugins
. Archivia solo il minimo di file richiesti.Se possibile, aumenta lo spazio su disco disponibile per gli scheduler e i worker di Airflow.
Se possibile, aumenta la CPU e la memoria degli scheduler e dei worker di Airflow in modo che l'operazione di sincronizzazione venga eseguita più velocemente.
Nel caso di un numero molto elevato di DAG, dividi i DAG in batch, comprimi in archivi zip e implementali nella cartella
/dags
. Questo approccio velocizza il processo di sincronizzazione dei DAG. Componenti Airflow decomprimi gli archivi ZIP prima di elaborare i DAG.Anche la generazione di DAG in una pubblicità programmatica potrebbe essere un metodo per limitare il numero di file DAG archiviati nella cartella
/dags
. Consulta la sezione sui DAG di pubblicità programmatica da evitare problemi con la pianificazione e l'esecuzione dei DAG generati in modo programmatico.
Non pianificare contemporaneamente DAG generati tramite programmazione
Generare oggetti DAG in modo programmatico da un file DAG è un metodo efficiente per creare molti DAG simili che hanno solo piccole differenze.
È importante non pianificare l'esecuzione di tutti questi DAG contemporaneamente. È molto probabile che i worker di Airflow non dispongano di risorse CPU e memoria sufficienti per eseguire tutte le attività pianificate contemporaneamente.
Per evitare problemi con la pianificazione dei DAG programmatici:
- Aumenta la concorrenza dei worker e esegui l'upgrade dell'ambiente in modo che possa eseguire più attività contemporaneamente.
- Genera i DAG in modo da distribuire le relative pianificazioni in modo uniforme nel tempo, per evitare di pianificare centinaia di attività contemporaneamente, in modo che i worker Airflow abbiano il tempo di eseguire tutte le attività pianificate.
Errore 504 durante l'accesso al server web Airflow
Consulta la sezione Errore 504 durante l'accesso all'interfaccia utente di Airflow.
Lost connection to Postgres / MySQL server during query
eccezione viene generata durante l'esecuzione dell'attività o subito dopo
Lost connection to Postgres / MySQL server during query
eccezioni
si verificano spesso quando si verificano le seguenti condizioni:
- Il DAG utilizza
PythonOperator
o un operatore personalizzato. - Il DAG esegue query sul database Airflow.
Se vengono eseguite più query da una funzione richiamabile, i traceback potrebbero indicare erroneamente la riga self.refresh_from_db(lock_for_update=True)
nel codice Airflow; si tratta della prima query del database dopo l'esecuzione dell'attività. La
la causa effettiva dell'eccezione si verifica prima di questo, quando una sessione SQLAlchemy
non è chiuso correttamente.
Le sessioni SQLAlchemy hanno come ambito un thread e create in una funzione richiamabile una sessione può essere continuata in un secondo momento all'interno del codice Airflow. Se si verificano ritardi significativi tra le query all'interno di una sessione, la connessione potrebbe essere già stata chiusa dal server Postgres o MySQL. Il timeout della connessione negli ambienti Cloud Composer è impostato su circa 10 minuti.
Correzione:
- Utilizza il decoratore
airflow.utils.db.provide_session
. Questo decoratore fornisce una sessione valida al database Airflow nel parametrosession
e chiude correttamente la sessione alla fine della funzione. - Non utilizzare una singola funzione a lunga esecuzione. Sposta invece tutto il database
per separare funzioni, in modo che ci siano più funzioni con
il decorator di
airflow.utils.db.provide_session
. In questo caso, le sessioni vengono chiuse automaticamente dopo il recupero dei risultati della query.
Controllo del tempo di esecuzione dei DAG, delle attività e delle esecuzioni parallele dello stesso DAG
Se vuoi controllare la durata di un'esecuzione singola di un determinato DAG, puoi utilizzare il parametro DAG dagrun_timeout
. Ad esempio, se prevedi che un'esecuzione singola di un DAG (indipendentemente dal fatto che l'esecuzione termini con esito positivo o negativo) non debba durare più di un'ora, imposta questo parametro su 3600 secondi.
Puoi anche controllare la durata di una singola attività Airflow. Per farlo, puoi utilizzare execution_timeout
.
Se vuoi controllare quante esecuzioni di DAG attive vuoi avere per un
un determinato DAG, puoi utilizzare [core]max-active-runs-per-dag
l'opzione di configurazione Airflow per farlo.
Se vuoi che un DAG venga eseguito in un determinato momento, imposta
max-active-runs-per-dag
su 1
.
Problemi che incidono sulla sincronizzazione di DAG e plug-in con scheduler, worker e server web
Cloud Composer sincronizza i contenuti di /dags
e /plugins
cartelle
a scheduler e worker. Alcuni oggetti nelle cartelle /dags
e /plugins
potrebbero impedire il corretto funzionamento di questa sincronizzazione o almeno rallentarla.
La cartella
/dags
viene sincronizzata con gli scheduler e i worker. Questa cartella non viene sincronizzata con i server web in Cloud Composer 2 o se attiviDAG Serialization
in Cloud Composer 1.La cartella
/plugins
è sincronizzata con scheduler, worker e server web.
Potresti riscontrare i seguenti problemi:
Hai caricato file compressi con gzip che utilizzano la transcodifica di compressione nelle cartelle
/dags
e/plugins
. Questo accade in genere se utilizzi il flag--gzip-local-all
in ungcloud storage cp
comando per caricare i dati nel bucket.Soluzione: elimina l'oggetto che ha utilizzato la transcodifica con compressione e poi caricalo di nuovo. al bucket.
Uno degli oggetti è denominato ".", quindi un oggetto di questo tipo non viene sincronizzato con scheduler e worker e questa operazione potrebbe interrompere del tutto la sincronizzazione.
Soluzione: rinomina l'oggetto problematico.
Una cartella e un file Python DAG hanno gli stessi nomi, ad esempio
a.py
. In questo caso, il file DAG non è sincronizzato correttamente con i componenti di Airflow.Soluzione: rimuovi la cartella che ha lo stesso nome di un file Python DAG.
Uno degli oggetti nelle cartelle
/dags
o/plugins
contiene un simbolo/
alla fine del nome dell'oggetto. Questi oggetti possono ingannare il processo di sincronizzazione perché il simbolo/
indica che un oggetto è una cartella, non un file.Soluzione: rimuovi il simbolo
/
dal nome dell'oggetto problematico.Non archiviare i file non necessari nelle cartelle
/dags
e/plugins
.A volte i DAG e i plug-in che implementi sono accompagnati file aggiuntivi, ad esempio file di archiviazione per i test di questi componenti. Questi file vengono sincronizzati con i worker e gli scheduler e influiscono sul tempo necessario per copiarli in questi elementi.
Soluzione: non archiviare file aggiuntivi e non necessari in
/dags
e/plugins
cartelle.
Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'
errore generato da scheduler e worker
Questo problema si verifica perché gli oggetti possono avere con spazio dei nomi sovrapposto in Cloud Storage, gli scheduler e i worker usano file system tradizionali. Ad esempio, è possibile per aggiungere una cartella e un oggetto con lo stesso nome al di sincronizzare la directory di una VM con un bucket. Quando il bucket viene sincronizzato con gli scheduler e i worker dell'ambiente, viene generato questo errore, che può causare errori nelle attività.
Per risolvere il problema, assicurati che non ci siano spazi dei nomi in sovrapposizione nel bucket dell'ambiente. Ad esempio, se sia /dags/misc
(un file) che
/dags/misc/example_file.txt
(un altro file) si trovano in un bucket, si è verificato un errore
generate dallo scheduler.
interruzioni transitorie durante la connessione al database dei metadati Airflow
Cloud Composer viene eseguito su un'infrastruttura cloud distribuita. Ciò significa che di tanto in tanto potrebbero verificarsi alcuni problemi temporanei che potrebbero interrompere l'esecuzione delle attività Airflow.
In queste situazioni potresti vedere i seguenti messaggi di errore nei worker di Airflow log:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
o
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Tali problemi intermittenti potrebbero essere causati anche da operazioni di manutenzione per i tuoi ambienti Cloud Composer.
Di solito questi errori sono intermittenti e se le tue attività Airflow sono idempotenti e ci sono nuovi tentativi configurati, non dovresti essere immune. Puoi anche valuta la possibilità di definire periodi di manutenzione.
Un altro motivo di questi errori potrebbe essere la mancanza di risorse nel cluster del tuo ambiente. In questi casi, puoi eseguire lo scaling o ottimizzare il tuo ambiente come descritto nelle istruzioni per lo scaling degli ambienti o l'ottimizzazione dell'ambiente.
Un'esecuzione di DAG è contrassegnata come riuscita, ma non ha attività eseguite
Se un'esecuzione di DAG execution_date
è precedente a start_date
del DAG, potresti visualizzare esecuzioni di DAG che non hanno esecuzioni di attività, ma sono comunque contrassegnate come riuscite.
Causa
Questa situazione potrebbe verificarsi in uno dei seguenti casi:
Una mancata corrispondenza è causata dalla differenza di fuso orario tra i
execution_date
estart_date
. Ad esempio, può accadere quando utilizzandopendulum.parse(...)
per impostarestart_date
.Il
start_date
del DAG è impostato su un valore dinamico, ad esempioairflow.utils.dates.days_ago(1)
Soluzione:
Assicurati che
execution_date
estart_date
utilizzino lo stesso fuso orario.Specifica un valore
start_date
statico e combinalo concatchup=False
per evitare di DAG con date di inizio passate.
Un DAG non è visibile nell'interfaccia utente di Airflow o nell'interfaccia utente del DAG e lo scheduler non lo pianifica
Il processore DAG analizza ogni DAG prima che possa essere programmato dallo scheduler e prima che un DAG diventi visibile in la UI di Airflow o la UI DAG.
Le seguenti opzioni di configurazione di Airflow definiscono i timeout per l'analisi dei DAG:
[core]dagrun_import_timeout
definisce il tempo a disposizione del processore DAG per analizzare un singolo DAG.[core]dag_file_processor_timeout
definisce la quantità totale di tempo che il processore DAG può dedicare all'analisi di tutti e i DAG.
Se un DAG non è visibile nella UI di Airflow o DAG:
- Controlla i log del processore DAG per verificare se è in grado di elaborare correttamente il DAG. In caso di problemi, potresti visualizzare le seguenti voci di log nei log del programmatore o del processore DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
- Controlla i log del programmatore per verificare che funzioni correttamente. Nel caso di potresti vedere le seguenti voci di log nei log dello scheduler:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Soluzioni:
Correggi tutti gli errori di analisi dei DAG. Il processore DAG analizza più DAG e, in alcuni casi rari, gli errori di analisi di un DAG possono influire negativamente sull'analisi di altri DAG.
Se l'analisi del DAG richiede più secondi rispetto a quelli definiti in
[core]dagrun_import_timeout
, aumenta questo timeout.Se l'analisi di tutti i DAG richiede più secondi di quelli definiti in
[core]dag_file_processor_timeout
, aumenta questo timeout.Se l'analisi del DAG richiede molto tempo, può anche significare che non è implementato in modo ottimale. Ad esempio, se legge molto variabili di ambiente o eseguire chiamate a servizi esterni o ad Airflow per configurare un database. Nella misura del possibile, evita di eseguire queste operazioni nelle sezioni globali dei DAG.
Aumenta le risorse di CPU e memoria per lo scheduler in modo che possa funzionare più velocemente.
Aumentare il numero di processi del processore DAG per consentire l'analisi più velocemente. Puoi farlo aumentando il valore di
[scheduler]parsing_process
.
Sintomi di un carico elevato del database Airflow
Per ulteriori informazioni, consulta Sintomi di un carico elevato del database Airflow.
Passaggi successivi
- Risoluzione dei problemi di installazione del pacchetto PyPI
- Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti