Risoluzione dei problemi dei DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina fornisce i passaggi per la risoluzione dei problemi e le informazioni per un flusso di lavoro comune che le applicazioni presentino problemi di prestazioni.

Molti problemi di esecuzione dei DAG sono causati da prestazioni dell'ambiente non ottimali. Puoi ottimizzare il tuo ambiente Cloud Composer 2 seguendo lo strumento di ottimizzazione le prestazioni e i costi dell'ambiente.

Alcuni problemi di esecuzione dei DAG potrebbero essere causati dallo scheduler di Airflow non funziona correttamente o in modo ottimale. Segui Istruzioni per la risoluzione dei problemi dello scheduler per risolvere questi problemi.

Flusso di lavoro per la risoluzione dei problemi

Per iniziare la risoluzione dei problemi:

  1. Controlla i log di Airflow.

    Puoi aumentare il livello di logging di Airflow eseguendo l'override del parametro seguendo l'opzione di configurazione Airflow.

    Flusso d'aria 2

    Sezione Chiave Valore
    logging logging_level Il valore predefinito è INFO. Imposta il valore DEBUG per aumentare il livello di dettaglio nei messaggi di log.

    Flusso d'aria 1

    Sezione Chiave Valore
    core logging_level Il valore predefinito è INFO. Imposta il valore DEBUG per aumentare il livello di dettaglio nei messaggi di log.
  2. Controlla la dashboard di Monitoring.

  3. Esamina Cloud Monitoring.

  4. Nella console Google Cloud, verifica la presenza di errori nelle pagine per i componenti del tuo ambiente.

  5. Nell'interfaccia web di Airflow, controlla nella vista grafico del DAG istanze di attività non riuscite.

    Sezione Chiave Valore
    webserver dag_orientation LR, TB, RL o BT

Debug degli errori dell'operatore

Per eseguire il debug di un errore di un operatore:

  1. Verifica la presenza di errori specifici delle attività.
  2. Controlla i log di Airflow.
  3. Esamina Cloud Monitoring.
  4. Controlla i log specifici dell'operatore.
  5. Correggi gli errori.
  6. Carica il DAG nella cartella dags/.
  7. Nell'interfaccia web di Airflow, cancellare gli stati passati per il DAG.
  8. Riprendi o esegui il DAG.

Risoluzione dei problemi di esecuzione dell'attività

Airflow è un sistema distribuito con molte entità come scheduler, esecutore, worker che comunicano tra loro tramite una coda di attività e Airflow un database e inviare indicatori (come SIGTERM). Il seguente diagramma mostra un panoramica delle interconnessioni tra i componenti Airflow.

Interazione tra i componenti Airflow
Figura 1. Interazione tra i componenti Airflow (fai clic per ingrandire)
.

In un sistema distribuito come Airflow, potrebbe esserci una certa connettività di rete o l'infrastruttura sottostante potrebbe riscontrare problemi intermittenti. Ciò può portare a situazioni in cui le attività possono non riuscire ed essere riprogrammate esecuzione o le attività potrebbero non essere state completate correttamente (ad esempio, o attività bloccate in esecuzione). Airflow ha meccanismi per gestire situazioni di questo tipo e riprende automaticamente il normale funzionamento. Persone che seguo spiegano i problemi comuni che si verificano durante l'esecuzione delle attività da parte di Airflow: Compiti con gli zombi, pillole velenosi e segnali SIGTERM.

Risoluzione dei problemi relativi alle attività Zombie

Airflow rileva due tipi di mancata corrispondenza tra un'attività e un processo che viene eseguito l'attività:

  • Le attività zombie sono attività che dovrebbero essere in esecuzione, ma che in esecuzione. Questo può accadere se il processo dell'attività è stato interrotto o non viene risponde, se il worker Airflow non ha segnalato uno stato dell'attività in tempo perché è sovraccarico o se la VM in cui viene eseguita l'attività è stata arrestata. Airflow rileva periodicamente queste attività e non riesce o esegue un nuovo tentativo. a seconda delle impostazioni dell'attività.

    Scopri attività zombie

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Le attività non morte sono attività che non dovrebbero essere in esecuzione. Airflow rileva tali attività periodicamente e le termina.

I motivi e le soluzioni più comuni per le attività Zombie sono elencati di seguito.

Il worker Airflow ha esaurito la memoria

Ogni worker Airflow può eseguire fino a [celery]worker_concurrency istanze di attività contemporaneamente. Se un consumo cumulativo di memoria di queste istanze di attività supera il limite di memoria per un worker Airflow, su di esso verrà eseguito un processo casuale è stato interrotto per liberare risorse.

Scopri gli eventi di esaurimento della memoria dei worker Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

Soluzioni:

Il worker Airflow è stato rimosso

Le eliminazioni dei pod sono una parte normale dell'esecuzione di carichi di lavoro su Kubernetes. GKE rimuove i pod se hanno esaurito lo spazio di archiviazione o li liberano per i carichi di lavoro con una priorità più elevata.

Scopri le eliminazioni dei worker di Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluzioni:

Il worker Airflow è stato terminato

I worker di Airflow potrebbero essere rimossi esternamente. Se le attività attualmente in esecuzione non terminare durante un periodo di interruzione del servizio previsto, saranno interrotti e potrebbero finiscono per essere rilevati come zombi.

Scopri le terminazioni dei pod dei worker di Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Scenari e soluzioni possibili:

  • I worker di Airflow vengono riavviati durante le modifiche dell'ambiente, ad esempio upgrade o installazione di pacchetti:

    Scopri le modifiche dell'ambiente Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Puoi eseguire queste operazioni quando non sono in esecuzione attività critiche oppure devi abilitare e nuovi tentativi in attività.

  • Vari componenti potrebbero essere temporaneamente non disponibili durante la manutenzione operations:

    Scopri le operazioni di manutenzione di GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Puoi specificare periodi di manutenzione per ridurre al minimo si sovrappone all'esecuzione delle attività critiche.

  • Nelle versioni di Cloud Composer 2 precedenti alla 2.4.5, viene eseguita una terminazione di Airflow il worker potrebbe ignorare il segnale SIGTERM e continuare a eseguire attività:

    Scopri il scale down tramite la scalabilità automatica di Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    Puoi eseguire l'upgrade a una versione successiva di Cloud Composer in cui è stato risolto.

Il worker di Airflow era sottoposto a un carico elevato

La quantità di risorse di CPU e memoria disponibile per un worker Airflow è limitata in base alla configurazione dell'ambiente. Se un utilizzo si avvicina ai limiti, causerebbe un conflitto di risorse e ritardi inutili durante l'attività dell'esecuzione. In situazioni estreme, quando mancano le risorse per periodi più lunghi, del tempo, questo potrebbe causare attività zombie.

Soluzioni:

Il database Airflow era sottoposto a un carico elevato

Un database viene utilizzato da vari componenti Airflow per comunicare tra loro per archiviare gli heartbeat delle istanze di attività. Carenza di risorse sul prolunga i tempi di query e potrebbe influire sull'esecuzione di un'attività.

Soluzioni:

Il database Airflow era temporaneamente non disponibile

Un worker Airflow potrebbe impiegare tempo per rilevare e gestire delicatamente ad esempio problemi di connettività temporanei. Potrebbe superare il valore predefinito soglia di rilevamento degli zombie.

Scoprire i timeout heartbeat di Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluzioni:

  • Aumenta il timeout per le attività zombie e sostituisci il di [scheduler]scheduler_zombie_task_threshold Opzione di configurazione Airflow:

    Sezione Chiave Valore Note
    scheduler scheduler_zombie_task_threshold Nuovi timeout (tra secondi) Il valore predefinito il valore è 300

Risoluzione dei problemi della pillola velenosa

La pillola velenosa è un meccanismo utilizzato da Airflow per arrestare le attività Airflow.

Airflow utilizza la pillola velenosa nelle seguenti situazioni:

  • Quando uno scheduler termina un'attività che non è stata completata in tempo.
  • Quando un'attività scade o viene eseguita per troppo tempo.

Quando Airflow utilizza la pillola velenosa, puoi vedere le seguenti voci di log nei log di un worker Airflow che ha eseguito l'attività:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possibili soluzioni:

  • Controlla se nel codice dell'attività sono presenti errori che potrebbero causare un'esecuzione troppo lunga.
  • (Cloud Composer 2) Aumentare la CPU e la memoria di Airflow in modo che le attività vengano eseguite più velocemente.
  • Aumenta il valore del parametro Configurazione Airflow [celery_broker_transport_options]visibility-timeout .

    Di conseguenza, lo scheduler attende più tempo per il completamento di un'attività, prima di considerare l'attività come un compito zombie. Questa opzione è particolarmente e sono utili per attività dispendiose in termini di tempo che durano molte ore. Se se il valore è troppo basso (ad esempio, 3 ore), lo scheduler considera che vengono eseguite per 5 o 6 ore come "impiccate" (Attività zombie).

  • Aumenta il valore del flusso d'aria [core]killed_task_cleanup_time l'opzione di configurazione.

    Un valore più lungo offre più tempo ai worker di Airflow per completare le loro attività con eleganza. Se il valore è troppo basso, le attività Airflow potrebbero essere interrotte all'improvviso, senza abbastanza tempo per finire il lavoro con grazia.

Risoluzione dei problemi relativi ai segnali SIGTERM

I segnali SIGTERM sono utilizzati da Linux, Kubernetes, Airflow scheduler e Celery per terminare i processi responsabili che eseguono worker o attività Airflow.

I motivi per cui gli indicatori SIGTERM vengono inviati in un ambiente potrebbero essere diversi:

  • Un'attività è diventata un'attività Zombie e deve essere arrestata.

  • Lo scheduler ha scoperto un duplicato di un'attività e invia la pillola velenosa e SIGTERM segnala all'attività di interromperla.

  • In Scalabilità automatica orizzontale dei pod, GKE Il piano di controllo invia indicatori SIGTERM per rimuovere i pod che non sono più necessaria.

  • Lo scheduler può inviare segnali SIGTERM al processo DagFileProcessorManager. Questi indicatori SIGTERM vengono utilizzati dallo scheduler per gestire il ciclo di vita del processo DagFileProcessorManager e può essere ignorato.

    Esempio:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condizione di gara tra il callback di heartbeat e i callback di uscita nel local_task_job, che monitora l'esecuzione dell'attività. Se il battito cardiaco rileva che un'attività è stata contrassegnata come completata, non può distinguere se l'attività stessa è riuscita o che Airflow è stato detto di prendere in considerazione l'attività riuscito. In ogni caso, un esecutore dell'attività verrà terminato senza attendere per uscire.

    Questi indicatori SIGTERM possono essere ignorati tranquillamente. L'attività è già in e l'esecuzione del DAG nel suo insieme non sarà interessati.

    La voce di log Received SIGTERM. è l'unica differenza tra le normali l'uscita e l'arresto dell'attività con stato riuscito.

    Condizione di gara tra l&#39;heartbeat e i callback di uscita
    Figura 2. Condizione di gara tra il battito cardiaco e i callback di uscita (fai clic per ingrandire)
    .
  • Un componente Airflow utilizza più risorse (CPU, memoria) di quanto consentito dalla di un cluster Kubernetes.

  • Il servizio GKE esegue operazioni di manutenzione invia indicatori SIGTERM ai pod in esecuzione su un nodo che sta per essere sottoposto ad upgrade. Quando un'istanza di attività viene terminata con SIGTERM, puoi visualizzare il seguente log voci nei log di un worker Airflow che ha eseguito l'attività:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Possibili soluzioni:

Questo problema si verifica quando una VM che esegue l'attività esaurisce la memoria. Questo non è alle configurazioni Airflow ma alla quantità di memoria disponibile VM.

L'aumento della memoria dipende dalla versione di Cloud Composer che utilizzi. Ad esempio:

  • In Cloud Composer 2, puoi assegnare più risorse di CPU e memoria ad Airflow worker.

  • Nel caso di Cloud Composer 1, puoi ricreare l'ambiente utilizzando una e un tipo di macchina con più prestazioni.

  • In entrambe le versioni di Cloud Composer, è possibile ridurre il valore l'opzione di configurazione Airflow contemporaneità [celery]worker_concurrency. Questa opzione determina quante attività vengono eseguite contemporaneamente da un determinato Worker Airflow.

Per saperne di più sull'ottimizzazione del tuo ambiente Cloud Composer 2, vedi Ottimizza le prestazioni e i costi dell'ambiente

Query di Cloud Logging per scoprire i motivi dei riavvii o delle eliminazioni dei pod

Gli ambienti di Cloud Composer utilizzano i cluster GKE come infrastruttura di calcolo livello di sicurezza. In questa sezione potrai trovare query utili che ti aiuteranno a trova motivi per i riavvii o le eliminazioni del worker o dello scheduler Airflow.

Le query presentate di seguito potrebbero essere ottimizzate nel seguente modo:

  • puoi specificare la sequenza temporale che ti interessa in Cloud Logging; ad esempio le ultime 6 ore, i 3 giorni oppure puoi definire un intervallo di tempo personalizzato

  • devi specificare Cloud Composer CLUSTER_NAME

  • puoi anche limitare la ricerca a un pod specifico aggiungendo POD_NAME

Scopri i container riavviati

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Scopri l'arresto dei container a causa di un evento di esaurimento della memoria

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Scopri i container che hanno interrotto l'esecuzione

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Query alternativa per limitare i risultati a un pod specifico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impatto delle operazioni di aggiornamento o upgrade sulle esecuzioni delle attività Airflow

Le operazioni di aggiornamento o upgrade interrompono attualmente l'esecuzione delle attività Airflow in corso a meno che un'attività non venga eseguita in modalità decriptabile.

Ti consigliamo di eseguire queste operazioni quando prevedi un impatto minimo sulle esecuzioni delle attività Airflow e configura i meccanismi adeguati per i nuovi tentativi e i DAG e le attività.

Risoluzione dei problemi delle attività di KubernetesExecutor

CeleryKubernetesExecutor è un tipo di esecutore in Cloud Composer 3 che possono utilizzare CeleryExecutor e KubernetesExecutor contemporaneamente nel tempo.

Consulta la pagina Use CeleryKubernetesExecutor (Utilizzare CeleryKubernetesExecutor) per ulteriori informazioni. e informazioni sulla risoluzione dei problemi per le attività eseguite con KubernetesExecutor.

Problemi comuni

Le seguenti sezioni descrivono i sintomi e le potenziali correzioni per alcuni Problemi dei DAG.

L'attività Airflow è stata interrotta da Negsignal.SIGKILL

A volte l'attività potrebbe utilizzare più memoria di quella allocata al worker Airflow. In questo caso, potrebbe essere interrotto da Negsignal.SIGKILL. Il sistema invia questo segnale per evitare un ulteriore consumo di memoria, che potrebbero l'esecuzione di altre attività Airflow. Nel log del worker di Airflow potresti vedere la seguente voce di log:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL potrebbe essere visualizzato anche come codice -9.

Possibili soluzioni:

  • worker_concurrency di worker Airflow inferiori.

  • Nel caso di Cloud Composer 2, aumenta la memoria dei worker di Airflow.

  • Nel caso di Cloud Composer 1, esegui l'upgrade a un tipo di macchina più grande utilizzato in di un cluster Cloud Composer.

  • Ottimizza le tue attività per utilizzare meno memoria.

  • Gestisci le attività ad alta intensità di risorse in Cloud Composer utilizzando KubernetesPodOperator o GKEStartPodOperator per l'isolamento delle attività e l'allocazione personalizzata delle risorse.

L'attività non riesce senza l'emissione di log a causa di errori di analisi dei DAG

A volte potrebbero verificarsi lievi errori DAG che portano a una situazione in cui uno scheduler Airflow e un processore DAG sono in grado di pianificare l'esecuzione delle attività e analizzare un file DAG (rispettivamente), ma il worker Airflow non riesce a eseguire le attività da un DAG di questo tipo, in quanto ci sono errori di programmazione nel file DAG Python. Questo potrebbe generare una situazione in cui un'attività Airflow è contrassegnata come Failed e non sono presenti log dalla sua esecuzione.

Soluzioni:

  • Verifica nei log dei worker di Airflow che non siano presenti errori generati da Worker Airflow correlato a errori di analisi dei DAG o dei DAG mancanti.

  • Aumenta i parametri relativi all'analisi dei DAG:

  • Vedi anche Ispezione dei log del processore DAG.

L'attività non riesce senza emettere log a causa della pressione delle risorse

Sintomo: durante l'esecuzione di un'attività, il processo secondario del worker Airflow è responsabile per l'esecuzione delle attività Airflow viene interrotta bruscamente. L'errore visibile nel log del worker di Airflow potrebbe essere simile a quello seguente:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Soluzione:

L'attività non riesce senza inviare log a causa dell'eliminazione dei pod

I pod di Google Kubernetes Engine sono soggetti alle Ciclo di vita dei pod di Kubernetes ed eliminazione dei pod. Attività i picchi e la co-pianificazione dei worker sono le due cause più comuni dell'eliminazione dei pod in Cloud Composer.

L'eliminazione dei pod può verificarsi quando un determinato pod fa un uso eccessivo delle risorse di un nodo, rispetto alle aspettative configurate per il consumo di risorse per il nodo. Per Ad esempio, l'eliminazione potrebbe avvenire quando in un pod vengono eseguite diverse attività che usano molta memoria, e il relativo carico combinato fa sì che il nodo in cui viene eseguito questo pod superi limite per il consumo di memoria.

Se un pod worker Airflow viene rimosso, tutte le istanze delle attività in esecuzione su quel pod pod vengono interrotti e successivamente contrassegnati come non riusciti da Airflow.

I log vengono inseriti nel buffer. Se un pod worker viene rimosso prima che il buffer venga svuotato, i log non vengono emesse. L'errore dell'attività senza log indica che il flusso di lavoro i worker vengono riavviati per esaurimento della memoria (OOM). Alcuni log potrebbero essere presenti in Cloud Logging anche se i log di Airflow non sono stati emessi.

Per visualizzare i log:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.

  3. Vai alla scheda Log.

  4. Visualizza i log dei singoli worker in Tutti i log -> Log di Airflow -> Lavoratori -> (singolo lavoratore).

L'esecuzione del DAG è limitata alla memoria. L'esecuzione di ogni attività inizia con due Airflow di elaborazione delle attività: esecuzione e monitoraggio delle attività. Ogni nodo può richiedere fino a 6 attività simultanee (circa 12 processi caricati con moduli Airflow). A seconda della natura del DAG, è possibile consumare più memoria.

Sintomo:

  1. Nella console Google Cloud, vai alla pagina Carichi di lavoro.

    Vai a Carichi di lavoro

  2. Se sono presenti airflow-worker pod che mostrano Evicted, fai clic su ogni pod rimosso pod e cerca il parametro The node was low on resource: memory nella parte superiore della finestra.

Risoluzione:

  • In Cloud Composer 1, crea un nuovo ambiente Cloud Composer con un tipo di macchina più grande di quello attuale di testo.
  • In Cloud Composer 2, aumenta i limiti di memoria per i worker di Airflow.
  • Controlla i log dei pod airflow-worker per individuare possibili cause dell'eliminazione. Per ulteriori informazioni sul recupero dei log dai singoli pod, consulta Risoluzione dei problemi relativi ai carichi di lavoro di cui è stato eseguito il deployment.
  • Assicurati che le attività nel DAG siano idempotenti e che possano essere recuperate.
  • Evita di scaricare file non necessari nel file system locale dei worker di Airflow.

    I worker di Airflow hanno una capacità del file system locale limitata. Ad esempio, nel Cloud Composer 2, un worker può avere da 1 GB a 10 GB di spazio di archiviazione. Quando di archiviazione lo spazio di archiviazione si esaurisce, il pod worker Airflow viene rimosso dal piano di controllo GKE. Questa operazione non riesce a tutte le attività rimosse worker era in esecuzione.

    Esempi di operazioni problematiche:

    • Download di file o oggetti e archiviazione locale in un flusso Airflow worker. Archivia invece questi oggetti direttamente in un servizio adatto, ad esempio un bucket Cloud Storage.
    • Accesso a oggetti di grandi dimensioni nella cartella /data da un worker Airflow. Il worker di Airflow scarica l'oggetto nel proprio file system locale. Implementa invece i DAG in modo che i file di grandi dimensioni vengano elaborati al di fuori del pod di worker Airflow.

Timeout importazione caricamento DAG

Sintomo:

  • Nell'interfaccia web di Airflow, nella parte superiore della pagina dell'elenco dei DAG, viene visualizzato un allarme rosso mostra Broken DAG: [/path/to/dagfile] Timeout.
  • In Cloud Monitoring: i log airflow-scheduler contengono voci simile a:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Risoluzione:

Esegui l'override di dag_file_processor_timeout Airflow di configurazione e concedere più tempo per l'analisi dei DAG:

Sezione Chiave Valore
core dag_file_processor_timeout Nuovo valore di timeout

L'esecuzione del DAG non termina entro il tempo previsto

Sintomo:

A volte un'esecuzione di DAG non termina perché le attività Airflow si bloccano e l'esecuzione del DAG dura più a lungo del previsto. In condizioni normali, le attività Airflow non rimangono a tempo indeterminato in stato in coda o in esecuzione, perché Airflow è in timeout di pulizia che aiutano a evitare questa situazione.

Risoluzione:

  • Utilizza la dagrun_timeout per i DAG. Ad esempio: dagrun_timeout=timedelta(minutes=120). Di conseguenza, ogni esecuzione di DAG deve come completate entro il timeout dell'esecuzione del DAG e le attività non terminate vengono contrassegnate. come Failed o Upstream Failed. Per ulteriori informazioni sugli stati delle attività di Airflow, consulta Documentazione di Apache Airflow.

  • Utilizza la timeout esecuzione attività per definire un timeout predefinito per le attività eseguite in base a Apache Operatori Airflow.

Esecuzioni di DAG non eseguite

Sintomo:

Quando una data di pianificazione per un DAG è impostata in modo dinamico, questo può portare a vari e inaspettati. Ad esempio:

  • L'esecuzione del DAG è sempre nel futuro e il DAG non viene mai eseguito.

  • Le esecuzioni di DAG precedenti sono contrassegnate come eseguite e riuscite nonostante non siano state eseguito.

Ulteriori informazioni sono disponibili nella documentazione di Apache Airflow.

Risoluzione:

  • Segui i consigli nella documentazione di Apache Airflow.

  • Imposta start_date statico per i DAG. In alternativa, puoi utilizzare catchup=False per disabilitare l'esecuzione del DAG per date passate.

  • Evita di usare datetime.now() o days_ago(<number of days>) a meno che non gli effetti collaterali di questo approccio.

Aumento del traffico di rete da e verso il database Airflow

La quantità di traffico della rete tra i cluster GKE del tuo ambiente cluster e il database Airflow dipende dal numero di DAG, dal numero le attività nei DAG e il modo in cui i DAG accedono ai dati nel database Airflow. La che possono influire sull'utilizzo della rete:

  • Query al database Airflow. Se i tuoi DAG eseguono molte query, generano grandi quantità di traffico. Esempi: controllare lo stato delle attività prima di procedere con altre attività, eseguire query sulla tabella XCom, eseguire il dump di Airflow contenuti del database.

  • Numero elevato di attività. Più attività ci sono da pianificare, più generato il traffico di rete. Questa considerazione vale sia per il totale di attività nei tuoi DAG e alla frequenza di pianificazione. Quando Lo scheduler Airflow programma le esecuzioni dei DAG, invia query ad Airflow e genera traffico.

  • L'interfaccia web di Airflow genera traffico di rete perché esegue query il database Airflow. Usare in modo intensivo pagine con grafici, attività e possono generare grandi volumi di traffico di rete.

Il DAG ha arrestato in modo anomalo il server web Airflow o causa la restituzione di un errore 502 gateway timeout

Gli errori del server web possono verificarsi per diversi motivi. Controllo airflow-webserver accede Cloud Logging per determinare la causa 502 gateway timeout errore.

Computing pesante

Questa sezione riguarda solo Cloud Composer 1.

Evita di eseguire calcoli intensivi al momento di analisi del DAG.

A differenza dei nodi worker e scheduler, i cui tipi di macchina possono essere personalizzati hanno capacità di CPU e memoria maggiori, il server web utilizza un tipo di macchina fisso, il che può causare errori di analisi dei DAG se il calcolo del tempo di analisi è troppo pesante.

Tieni presente che il server web ha 2 vCPU e 2 GB di memoria. Il valore predefinito per core-dagbag_import_timeout è 30 secondi. Questo timeout definisce il limite superiore del tempo impiegato da Airflow per caricare un Modulo Python nella cartella dags/.

Autorizzazioni errate

Questa sezione riguarda solo Cloud Composer 1.

Il server web non viene eseguito con lo stesso account di servizio dei worker e scheduler. Pertanto, i worker e lo scheduler potrebbero essere in grado di accedere e le risorse gestite dall'utente a cui il server web non può accedere.

Ti consigliamo di evitare di accedere a risorse non pubbliche durante Analisi dei DAG. A volte è inevitabile e dovrete concedere le autorizzazioni all'account di servizio del server web. Il servizio deriva dal dominio del server web. Ad esempio, se il dominio è example-tp.appspot.com, l'account di servizio è example-tp@appspot.gserviceaccount.com.

Errori DAG

Questa sezione riguarda solo Cloud Composer 1.

Il server web viene eseguito su App Engine ed è separato da al cluster GKE dell'ambiente. Il server web analizza il DAG file di definizione e un 502 gateway timeout può verificarsi in caso di errori nel DAG. Airflow funziona normalmente senza un server web funzionante se il DAG problematico non interrompe i processi in esecuzione in GKE. In questo caso, puoi utilizzare gcloud composer environments run per recuperare dettagli dal tuo ambiente e come soluzione alternativa se il server web non disponibile.

In altri casi, puoi eseguire l'analisi dei DAG in GKE e cercare DAG che generano eccezioni irreversibili di Python o timeout (il valore predefinito è 30 secondi). Per risolvere i problemi, connettiti a una shell remota in un container worker Airflow e per testare gli errori di sintassi. Per ulteriori informazioni, consulta Test dei DAG.

Gestione di un numero elevato di DAG e plug-in nelle cartelle DAG e plugin

I contenuti di /dags e /plugins cartelle sono sincronizzati da del bucket del tuo ambiente ai file system locali dei worker di Airflow e scheduler.

Maggiore è il numero di dati archiviati in queste cartelle, maggiore sarà il tempo necessario per eseguire la sincronizzazione dei dati. Per risolvere queste situazioni:

  • Limita il numero di file nelle cartelle /dags e /plugins. Memorizza solo il numero minimo di file richiesti.

  • Se possibile, aumenta lo spazio su disco disponibile per gli scheduler di Airflow e worker.

  • Se possibile, aumenta la CPU e la memoria degli scheduler e dei worker di Airflow, affinché l'operazione di sincronizzazione venga eseguita più velocemente.

  • Nel caso di un numero molto elevato di DAG, dividi i DAG in batch, comprimi in archivi zip e implementali nella cartella /dags. Questo approccio velocizza il processo di sincronizzazione dei DAG. Componenti Airflow decomprimi gli archivi ZIP prima di elaborare i DAG.

  • Anche la generazione di DAG in una pubblicità programmatica potrebbe essere un metodo per limitare il numero di file DAG archiviati nella cartella /dags. Consulta la sezione sui DAG di pubblicità programmatica da evitare problemi con la pianificazione e l'esecuzione dei DAG generati in modo programmatico.

Non pianificare contemporaneamente i DAG generati in modo programmatico

Generare oggetti DAG in modo programmatico da un file DAG è un metodo efficiente per creare molti DAG simili che hanno solo piccole differenze.

È importante non pianificare l'esecuzione immediata di tutti questi DAG. Là è un'alta probabilità che i worker Airflow non abbiano CPU e memoria sufficienti per eseguire tutte le attività pianificate contemporaneamente.

Per evitare problemi con la pianificazione dei DAG programmatici:

  • Aumenta la contemporaneità dei worker e fai lo scale up del tuo ambiente, in modo che possa di eseguire più attività contemporaneamente.
  • Genera i DAG in modo da distribuire le pianificazioni in modo uniforme nel tempo, in modo da evitare di pianificare centinaia di attività contemporaneamente, in modo che i worker di Airflow il tempo necessario per eseguire tutte le attività pianificate.

Errore 504 durante l'accesso al server web Airflow

Vedi Errore 504 durante l'accesso alla UI di Airflow.

Lost connection to Postgres / MySQL server during query eccezione viene generata durante l'esecuzione dell'attività o subito dopo

Lost connection to Postgres / MySQL server during query eccezioni si verificano spesso quando si verificano le seguenti condizioni:

  • Il DAG utilizza PythonOperator o un operatore personalizzato.
  • Il tuo DAG esegue query sul database Airflow.

Se da una funzione richiamabile vengono eseguite più query, le tracce potrebbero puntare erroneamente alla linea self.refresh_from_db(lock_for_update=True) nella codice Airflow; si tratta della prima query del database dopo l'esecuzione dell'attività. La la causa effettiva dell'eccezione si verifica prima di questo, quando una sessione SQLAlchemy non è chiuso correttamente.

Le sessioni SQLAlchemy hanno come ambito un thread e create in una funzione richiamabile una sessione può essere continuata in un secondo momento all'interno del codice Airflow. In caso di eventi significativi ritardi tra le query in una sessione, la connessione potrebbe essere già chiuso dal server Postgres o MySQL. Il timeout della connessione in Gli ambienti Cloud Composer sono impostati su circa 10 minuti.

Risoluzione:

  • Utilizza il decorator di airflow.utils.db.provide_session. Questo decorator fornisce una sessione valida per il database Airflow in session e chiude correttamente la sessione al termine della funzione.
  • Non utilizzare una singola funzione a lunga esecuzione. Sposta invece tutto il database per separare funzioni, in modo che ci siano più funzioni con il decorator di airflow.utils.db.provide_session. In questo caso, le sessioni vengono chiusi automaticamente dopo aver recuperato i risultati della query.

Controllo del tempo di esecuzione di DAG, attività ed esecuzioni parallele dello stesso DAG

Se vuoi controllare per quanto tempo una singola esecuzione di DAG per un determinato DAG dura, puoi usare il parametro DAG dagrun_timeout da eseguire Ecco. Ad esempio, se prevedi che venga eseguita una singola DAG (indipendentemente dal fatto che (l'esecuzione termina con successo o errore) non deve durare più di un'ora, imposta questo parametro su 3600 secondi.

Puoi anche controllare la durata di una singola attività Airflow. Da fare quindi puoi utilizzare execution_timeout.

Se vuoi controllare quante esecuzioni di DAG attive vuoi avere per un un determinato DAG, puoi utilizzare [core]max-active-runs-per-dag l'opzione di configurazione Airflow per farlo.

Se vuoi che un DAG venga eseguito in un determinato momento, imposta max-active-runs-per-dag su 1.

Problemi che incidono sulla sincronizzazione di DAG e plug-in con scheduler, worker e server web

Cloud Composer sincronizza i contenuti di /dags e /plugins cartelle a scheduler e worker. Alcuni oggetti nelle cartelle /dags e /plugins la sincronizzazione potrebbe non funzionare correttamente o almeno rallentarla.

  • La cartella /dags è sincronizzata con scheduler e worker. Questa cartella non è sincronizzata ai server web in Cloud Composer 2 o se attivi DAG Serialization in Cloud Composer 1.

  • La cartella /plugins è sincronizzata con scheduler, worker e server web.

Potresti riscontrare i seguenti problemi:

  • Hai caricato file compressi in formato gzip che utilizzano transcodifica di compressione in /dags e /plugins cartelle. In genere questo accade se usi il comando gsutil cp -Z per eseguire il caricamento al bucket.

    Soluzione: elimina l'oggetto che ha utilizzato la transcodifica con compressione e poi caricalo di nuovo. al bucket.

  • Uno degli oggetti è denominato ".", quindi un oggetto di questo tipo non viene sincronizzato con scheduler e worker e questa operazione potrebbe interrompere del tutto la sincronizzazione.

    Soluzione: rinomina l'oggetto problematico.

  • Una cartella e un file Python DAG hanno gli stessi nomi, ad esempio a.py. In questo caso, il file DAG non viene sincronizzato correttamente con i componenti di Airflow.

    Soluzione: rimuovi la cartella che ha lo stesso nome di un file Python DAG.

  • Uno degli oggetti nelle cartelle /dags o /plugins contiene un simbolo / alla fine del nome dell'oggetto. Questi oggetti possono fuorviare il processo di sincronizzazione perché il simbolo / significa che un oggetto è una cartella, non un file.

    Soluzione: rimuovi il simbolo / dal nome dell'oggetto problematico.

  • Non archiviare i file non necessari nelle cartelle /dags e /plugins.

    A volte i DAG e i plug-in che implementi sono accompagnati file aggiuntivi, ad esempio file di archiviazione per i test di questi componenti. Questi i file vengono sincronizzati con i worker e i pianificatori e incidono sul tempo necessario copiare questi file su scheduler, worker e server web.

    Soluzione: non archiviare file aggiuntivi e non necessari in /dags e /plugins cartelle.

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' errore generato da scheduler e worker

Questo problema si verifica perché gli oggetti possono avere con spazio dei nomi sovrapposto in Cloud Storage, gli scheduler e i worker usano file system tradizionali. Ad esempio, è possibile per aggiungere una cartella e un oggetto con lo stesso nome al di sincronizzare la directory di una VM con un bucket. Quando il bucket viene sincronizzato con gli scheduler e i worker dell'ambiente, generato questo errore, il che può portare a errori nell'attività.

Per risolvere il problema, assicurati che non esistano spazi dei nomi sovrapposti nelle del bucket dell'ambiente. Ad esempio, se sia /dags/misc (un file) che /dags/misc/example_file.txt (un altro file) si trovano in un bucket, si è verificato un errore generate dallo scheduler.

Interruzioni temporanee durante la connessione a Airflow Metadata DB

Cloud Composer viene eseguito su un'infrastruttura cloud distribuita. Significa che di tanto in tanto potrebbero comparire alcuni problemi temporanei interrompere l'esecuzione delle tue attività Airflow.

In queste situazioni potresti vedere i seguenti messaggi di errore nei worker di Airflow log:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Tali problemi intermittenti potrebbero essere causati anche da operazioni di manutenzione per i tuoi ambienti Cloud Composer.

Di solito questi errori sono intermittenti e se le tue attività Airflow sono idempotenti e ci sono nuovi tentativi configurati, non dovresti essere immune. Puoi anche valuta la possibilità di definire periodi di manutenzione.

Un altro motivo per questi errori potrebbe essere la mancanza di risorse nel tuo nel cluster dell'ambiente di lavoro. In questi casi, potresti fare lo scale up o ottimizzare come descritto in Scala gli ambienti o Istruzioni per l'ottimizzazione dell'ambiente.

Un'esecuzione di DAG è contrassegnata come riuscita, ma non ha attività eseguite

Se l'esecuzione di un DAG execution_date è precedente alla metrica start_date del DAG, potresti vedere esecuzioni di DAG che non includono attività, ma che sono comunque contrassegnate come riuscite.

Un&#39;esecuzione di DAG riuscita senza attività eseguite
Figura 3. Un'esecuzione di DAG riuscita senza attività eseguite (fai clic per ingrandire)
.

Causa

Questa situazione potrebbe verificarsi in uno dei seguenti casi:

  • Una mancata corrispondenza è causata dalla differenza di fuso orario tra i execution_date e start_date. Ad esempio, può accadere quando utilizzando pendulum.parse(...) per impostare start_date.

  • Il start_date del DAG è impostato su un valore dinamico, ad esempio airflow.utils.dates.days_ago(1)

Soluzione:

  • Assicurati che execution_date e start_date utilizzino lo stesso fuso orario.

  • Specifica un valore start_date statico e combinalo con catchup=False per evitare di DAG con date di inizio passate.

Un DAG non è visibile nella UI di Airflow o DAG e lo scheduler non lo pianifica

Il processore DAG analizza ogni DAG prima che possa essere programmato dallo scheduler e prima che un DAG diventi visibile in la UI di Airflow o la UI DAG.

Le seguenti opzioni di configurazione di Airflow definiscono i timeout per l'analisi dei DAG:

Se un DAG non è visibile nella UI di Airflow o DAG:

  • Controllare i log del processore DAG se quest'ultimo è in grado di elaborare correttamente per il tuo DAG. In caso di problemi, potresti vedere le seguenti voci di log nei log del processore DAG o dello scheduler:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • Controlla i log dello scheduler per verificare che funzioni correttamente. Nel caso di potresti vedere le seguenti voci di log nei log dello scheduler:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

Soluzioni:

  • Correggi tutti gli errori di analisi dei DAG. Il processore DAG analizza più DAG e in rari casi di analisi degli errori di un DAG possono influire negativamente sull'analisi dei e altri DAG.

  • Se l'analisi del DAG richiede più del numero di secondi definito in [core]dagrun_import_timeout, e poi aumenta il timeout.

  • Se l'analisi di tutti i DAG richiede più del numero di secondi definito nel [core]dag_file_processor_timeout, quindi aumenta il timeout.

  • Se l'analisi del DAG richiede molto tempo, può anche significare che implementate in modo ottimale. Ad esempio, se legge molto variabili di ambiente o eseguire chiamate a servizi esterni o ad Airflow per configurare un database. Per quanto possibile, evita di eseguire queste operazioni le sezioni globali dei DAG.

  • Aumenta le risorse di CPU e memoria per lo scheduler in modo che possa funzionare più velocemente.

  • Regola il numero di scheduler.

  • Aumentare il numero di processi del processore DAG per consentire l'analisi più velocemente. Puoi farlo aumentando il valore [scheduler]parsing_process.

  • Riduci la frequenza di analisi dei DAG.

  • Riduci il carico sul database Airflow.

Sintomi del carico elevato del database Airflow

Per ulteriori informazioni, vedi Sintomi del database Airflow sotto pressione di carico.

Passaggi successivi