Risoluzione dei problemi dello scheduler Airflow

Cloud Composer 1 | Cloud Composer 2

Questa pagina fornisce passaggi e informazioni per la risoluzione dei problemi comuni degli scheduler Airflow.

Identificare l'origine del problema

Per iniziare la risoluzione dei problemi, identifica se il problema si verifica al momento dell'analisi dei DAG o durante l'elaborazione delle attività al momento dell'esecuzione. Per saperne di più su tempo di analisi e tempo di esecuzione, consulta Differenza tra tempo di analisi dei DAG e tempo di esecuzione dei DAG.

Ispezione dei log del processore DAG

Se disponi di DAG complessi, il processore DAG, eseguito dallo scheduler, potrebbe non analizzare tutti i DAG. Questo potrebbe causare molti problemi con i seguenti sintomi.

Sintomi:

  • Se il processore DAG riscontra problemi durante l'analisi dei DAG, potrebbe verificarsi una combinazione dei problemi elencati di seguito. Se i DAG vengono generati in modo dinamico, questi problemi potrebbero avere un impatto maggiore rispetto ai DAG statici.

  • I DAG non sono visibili nell'UI di Airflow e nell'UI dei DAG.

  • I DAG non sono pianificati per l'esecuzione.

  • Sono presenti errori nei log del processore DAG, ad esempio:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    o

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Gli scheduler Airflow riscontrano problemi che ne causano il riavvio.

  • Le attività di Airflow pianificate per l'esecuzione vengono annullate e le esecuzioni di DAG per i DAG che non è stato possibile analizzare potrebbero essere contrassegnate come failed. Ad esempio:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Soluzione:

  • Aumenta i parametri relativi all'analisi dei DAG:

  • Correggi o rimuovi i DAG che causano problemi al processore DAG.

Ispezione dei tempi di analisi dei DAG

Per verificare se il problema si verifica al momento dell'analisi dei DAG, segui questi passaggi.

Console

Nella console Google Cloud puoi utilizzare la pagina Monitoring e la scheda Log per ispezionare i tempi di analisi dei DAG.

Ispeziona i tempi di analisi dei DAG con la pagina Monitoraggio di Cloud Composer:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Monitoring.

  3. Nella scheda Monitoring, esamina il grafico Tempo di analisi totale per tutti i file DAG nella sezione Esecuzioni DAG e identifica i possibili problemi.

    La sezione delle esecuzioni di DAG nella scheda Monitoraggio di Composer mostra le metriche di integrità per i DAG nel tuo ambiente

Ispeziona i tempi di analisi dei DAG con la scheda Log di Cloud Composer:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Monitoring.

  3. Vai alla scheda Log e, nell'albero di navigazione Tutti i log, seleziona la sezione Gestore del processore DAG.

  4. Esamina i log di dag-processor-manager e identifica i possibili problemi.

    I log del processore DAG mostreranno i tempi di analisi dei DAG

gcloud - Airflow 1

Utilizza il comando list_dags con il flag -r per vedere il tempo di analisi di tutti i tuoi DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

L'output del comando è simile al seguente:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

Cerca il valore Data analisi DagBag. Un valore grande potrebbe indicare che uno dei DAG non è implementato in modo ottimale. Nella tabella di output puoi identificare i DAG con tempi di analisi lunghi.

gcloud - Airflow 2

Utilizza il comando dags report per vedere il tempo di analisi di tutti i DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

L'output del comando è simile al seguente:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Cerca il valore duration per ciascuno dei DAG elencati nella tabella. Un valore grande potrebbe indicare che uno dei DAG non è implementato in modo ottimale. Nella tabella di output, puoi identificare quali DAG hanno un tempo di analisi lungo.

Monitoraggio delle attività in esecuzione e in coda

Per controllare se ci sono attività bloccate in una coda, segui questi passaggi.

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Monitoring.

  4. Nella scheda Monitoring, esamina il grafico Attività di Airflow nella sezione Esecuzioni di DAG e identifica i possibili problemi. Le attività Airflow sono in stato di coda in Airflow, possono essere inviate alla coda di broker Celery o Kubernetes Executor. Le attività in coda di Celery sono istanze di attività inserite nella coda di broker Celery.

Risoluzione dei problemi durante l'analisi dei DAG

Le seguenti sezioni descrivono sintomi e potenziali correzioni per alcuni problemi comuni al momento dell'analisi dei DAG.

Analisi e pianificazione dei DAG in Cloud Composer 1 e Airflow 1

L'efficienza dell'analisi dei DAG è stata notevolmente migliorata in Airflow 2. Se riscontri problemi di prestazioni relativi all'analisi e alla pianificazione dei DAG, valuta la possibilità di eseguire la migrazione a Airflow 2.

In Cloud Composer 1, lo scheduler viene eseguito sui nodi del cluster insieme ad altri componenti di Cloud Composer. Per questo motivo, il carico dei singoli nodi del cluster potrebbe essere superiore o inferiore rispetto ad altri nodi. Le prestazioni dello scheduler (analisi e pianificazione dei DAG) potrebbero variare a seconda del nodo in cui viene eseguito. Inoltre, un singolo nodo in cui viene eseguito lo scheduler può cambiare in seguito a operazioni di upgrade o manutenzione. Questo limite è stato risolto in Cloud Composer 2, dove puoi allocare risorse di CPU e memoria allo scheduler e le prestazioni dello scheduler non dipendono dal carico dei nodi del cluster.

Numero limitato di thread

Consentire al gestore del processore DAG (la parte dello scheduler che elabora i file DAG) di utilizzare solo un numero limitato di thread potrebbe influire sul tempo di analisi dei DAG.

Per risolvere il problema, esegui l'override delle seguenti opzioni di configurazione di Airflow:

  • Per Airflow 1.10.12 e versioni precedenti, sostituisci il parametro max_threads:

    Sezione Chiave Valore Note
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 Sostituisci NUMBER_OF_CORES_IN_MACHINE con il numero di core
    nelle macchine dei nodi worker.
  • Per Airflow 1.10.14 e versioni successive, sostituisci il parametro parsing_processes:

    Sezione Chiave Valore Note
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 Sostituisci NUMBER_OF_CORES_IN_MACHINE con il numero di core
    nelle macchine dei nodi worker.

Distribuzione del numero e del tempo delle attività

Airflow è noto per i problemi di pianificazione di un numero elevato di attività di piccole dimensioni. In questi casi, ti consigliamo di scegliere un numero inferiore di attività più consolidate.

Anche la pianificazione di un numero elevato di DAG o di attività contemporaneamente potrebbe essere una possibile fonte di problemi. Per evitare questo problema, distribuisci le attività in modo più uniforme nel tempo.

Risoluzione dei problemi relativi alle attività in esecuzione e in coda

Le seguenti sezioni descrivono sintomi e potenziali correzioni di alcuni problemi comuni relativi alle attività in esecuzione e in coda.

Le code di attività sono troppo lunghe

In alcuni casi, una coda di attività potrebbe essere troppo lunga per lo scheduler. Per informazioni su come ottimizzare i parametri worker e Celery, leggi l'articolo su come scalare il tuo ambiente Cloud Composer insieme alla tua azienda.

Utilizzo della funzionalità TimeTable dello scheduler Airflow

A partire da Airflow 2.2, puoi definire una tabella temporale per un DAG utilizzando una nuova funzionalità chiamata TimeTable.

Puoi definire una tabella oraria utilizzando uno dei seguenti metodi:

Risorse cluster limitate

Questa sezione riguarda solo Cloud Composer 1.

Potresti riscontrare problemi di prestazioni se il cluster GKE del tuo ambiente è troppo piccolo per gestire tutti i DAG e le attività. In questo caso, prova una delle seguenti soluzioni:

  • Crea un nuovo ambiente con un tipo di macchina che offra maggiori prestazioni ed esegui la migrazione dei DAG.
  • Crea altri ambienti Cloud Composer e dividi i DAG tra loro.
  • Modifica il tipo di macchina per i nodi GKE, come descritto in Upgrade del tipo di macchina per i nodi GKE. Poiché questa procedura è soggetta a errori, è l'opzione meno consigliata.
  • Esegui l'upgrade del tipo di macchina dell'istanza Cloud SQL che esegue il database Airflow nel tuo ambiente, ad esempio utilizzando i comandi gcloud composer environments update. Le basse prestazioni del database Airflow potrebbero essere la causa della lentezza dello scheduler.

Evita la pianificazione delle attività durante i periodi di manutenzione

Puoi definire periodi di manutenzione specifici per il tuo ambiente. Durante questi periodi di tempo si verificano eventi di manutenzione per Cloud SQL e GKE.

Imposta lo scheduler Airflow per ignorare i file non necessari

Puoi migliorare le prestazioni dello scheduler Airflow saltando i file non necessari nella cartella DAG. Lo scheduler Airflow ignora i file e le cartelle specificati nel file .airflowignore.

Per fare in modo che lo scheduler Airflow ignori i file non necessari:

  1. Crea un file .airflowignore.
  2. In questo file, elenca i file e le cartelle che devono essere ignorati.
  3. Carica questo file nella cartella /dags del bucket del tuo ambiente.

Per ulteriori informazioni sul formato file .airflowignore, consulta la documentazione di Airflow.

Lo scheduler Airflow elabora i DAG in pausa

Gli utenti di Airflow mettono in pausa i DAG per evitarne l'esecuzione. Ciò consente di risparmiare i cicli di elaborazione dei worker di Airflow.

Lo scheduler Airflow continuerà ad analizzare i DAG in pausa. Se vuoi davvero migliorare le prestazioni dello scheduler Airflow, utilizza .airflowignore o elimina i DAG in pausa dalla cartella dei DAG.

Utilizzo di "wait_for_downstream" nei tuoi DAG

Se imposti il parametro wait_for_downstream su True nei DAG, affinché un'attività abbia esito positivo, anche tutte le attività immediatamente downstream di questa attività devono avere esito positivo. Significa che l'esecuzione delle attività appartenenti a una determinata esecuzione di DAG potrebbe essere rallentata dall'esecuzione delle attività dell'esecuzione precedente di DAG. Scopri di più nella documentazione di Airflow.

Le attività in coda da troppo tempo verranno annullate e riprogrammate

Se un'attività Airflow rimane in coda per troppo tempo, lo scheduler la riprogramma di nuovo per l'esecuzione (nelle versioni di Airflow precedenti alla 2.3.1, l'attività viene anche contrassegnata come non riuscita e riprovata se idonea per un nuovo tentativo).

Un modo per osservare i sintomi di questa situazione è osservare il grafico con il numero di attività in coda (scheda "Monitoring" nell'interfaccia utente di Cloud Composer). Se i picchi in questo grafico non diminuiscono entro circa due ore, è molto probabile che le attività vengano ripianificate (senza log) seguite dalle voci di log "Le attività adottate erano ancora in attesa..." nei log dello scheduler. In questi casi, potresti vedere il messaggio "File di log non trovato..." nei log delle attività di Airflow perché l'attività non è stata eseguita.

In generale, questo comportamento è previsto e l'istanza successiva dell'attività pianificata deve essere eseguita in base alla pianificazione. Se noti molti casi di questo tipo nei tuoi ambienti Cloud Composer, è possibile che il numero di worker Airflow nel tuo ambiente non sia sufficiente per elaborare tutte le attività pianificate.

Risoluzione: per risolvere il problema, devi assicurarti che i worker di Airflow abbiano sempre capacità per eseguire le attività in coda. Ad esempio, puoi aumentare il numero di worker o worker_concurrency. Puoi anche ottimizzare il parallelismo o i pool per impedire di aggiungere attività alla coda rispetto alla capacità a tua disposizione.

Le attività inattive potrebbero bloccare l'esecuzione di un DAG specifico

In casi normali, lo scheduler Airflow dovrebbe essere in grado di gestire le situazioni in cui sono presenti attività inattive in coda e per qualche motivo non è possibile eseguirle correttamente (ad esempio, è stato eliminato un DAG a cui appartengono le attività inattive).

Se queste attività inattive non vengono eliminate definitivamente dallo scheduler, potrebbe essere necessario eliminarle manualmente. Puoi farlo, ad esempio, nella UI di Airflow: puoi andare a (Menu > Browser > Istanze attività), trovare le attività in coda appartenenti a un DAG inattivo ed eliminarle.

Per risolvere il problema, esegui l'upgrade del tuo ambiente a Cloud Composer 2.1.12 o versione successiva.

Approccio di Cloud Composer al parametro [scheduler]min_file_process_interval

Cloud Composer cambia il modo in cui [scheduler]min_file_process_interval viene utilizzato dallo scheduler Airflow.

Airflow 1

Se Cloud Composer utilizza Airflow 1, gli utenti possono impostare il valore di [scheduler]min_file_process_interval tra 0 e 600 secondi. I valori superiori a 600 secondi offrono gli stessi risultati che avrebbero se [scheduler]min_file_process_interval fosse impostato su 600 secondi.

Airflow 2

In Airflow 2, [scheduler]min_file_process_interval può essere utilizzato solo con le versioni 1.19.9 e 2.0.26 o più recenti

  • Versioni di Cloud Composer precedenti alle 1.19.9 e alla 2.0.26

    In queste versioni, [scheduler]min_file_process_interval viene ignorato.

  • Cloud Composer versioni 1.19.9 o 2.0.26 o versioni più recenti

    Lo scheduler Airflow viene riavviato dopo un determinato numero di volte in cui tutti i DAG sono stati pianificati e il parametro [scheduler]num_runs controlla quante volte viene eseguito dallo scheduler. Quando lo scheduler raggiunge [scheduler]num_runs loop di pianificazione, viene riavviato. Lo scheduler è un componente stateless e tale riavvio è un meccanismo di riparazione automatica per qualsiasi problema che potrebbe riscontrare. Se non specificato, viene applicato il valore predefinito di [scheduler]num_runs, che è 5000.

    Puoi utilizzare [scheduler]min_file_process_interval per configurare la frequenza dell'analisi dei DAG, ma questo parametro non può superare il tempo necessario per eseguire i loop di [scheduler]num_runs durante la pianificazione dei DAG.

Scalabilità della configurazione di Airflow

Airflow fornisce opzioni di configurazione di Airflow che controllano il numero di attività e DAG che Airflow può eseguire contemporaneamente. Per impostare queste opzioni di configurazione, esegui l'override dei relativi valori per il tuo ambiente.

  • Contemporaneità dei worker

    Il parametro [celery]worker_concurrency controlla il numero massimo di attività che un worker Airflow può eseguire contemporaneamente. Se moltiplichi il valore di questo parametro per il numero di worker Airflow nel tuo ambiente Cloud Composer, ottieni il numero massimo di attività che possono essere eseguite in un determinato momento nel tuo ambiente. Questo numero è limitato dall'opzione di configurazione di Airflow [core]parallelism, descritta più dettagliatamente.

    Negli ambienti Cloud Composer 2, il valore predefinito di [celery]worker_concurrency viene calcolato automaticamente

    • Per le versioni di Airflow: 2.3.3 e successive, [celery]worker_concurrency è impostato su un valore minimo di 32, 12 * worker_CPU e 8 * worker_memory.

    • Per le versioni di Airflow: 2.2.5 o precedenti, [celery]worker_concurrency è impostato su 12 * il numero di CPU dei worker.

  • Numero massimo di esecuzioni di DAG attive

    L'opzione di configurazione Airflow [core]max_active_runs_per_dag controlla il numero massimo di esecuzioni di DAG attive per DAG. Lo scheduler non crea altre esecuzioni di DAG se raggiunge questo limite.

    Se questo parametro è impostato in modo errato, potrebbe verificarsi un problema per cui lo scheduler limita l'esecuzione dei DAG perché non può creare più istanze di esecuzione di DAG in un determinato momento.

  • Numero massimo di attività attive per DAG

    L'opzione di configurazione di Airflow [core]max_active_tasks_per_dag controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ciascun DAG. È un parametro a livello di DAG.

    Se questo parametro è impostato in modo errato, potrebbe verificarsi un problema per cui l'esecuzione di una singola istanza DAG è lenta perché è possibile eseguire solo un numero limitato di attività DAG in un determinato momento.

    Soluzione: aumentare [core]max_active_tasks_per_dag.

  • Parallelismo e dimensione del pool

    L'opzione di configurazione Airflow [core]parallelism controlla quante attività lo scheduler Airflow può mettere in coda nella coda dell'esecutore dopo aver soddisfatto tutte le dipendenze per queste attività.

    È un parametro globale per l'intera configurazione di Airflow.

    Le attività vengono inserite in coda ed eseguite all'interno di un pool. Gli ambienti Cloud Composer utilizzano un solo pool. Le dimensioni di questo pool controllano il numero di attività che possono essere messe in coda dallo scheduler per l'esecuzione in un determinato momento. Se la dimensione del pool è troppo piccola, lo scheduler non può mettere in coda le attività per l'esecuzione anche se le soglie definite dall'opzione di configurazione [core]parallelism e dall'opzione di configurazione [celery]worker_concurrency moltiplicata per il numero di worker Airflow non sono ancora soddisfatte.

    Puoi configurare le dimensioni del pool nell'interfaccia utente di Airflow (Menu > Amministrazione > Pool). Modifica le dimensioni del pool al livello di parallelismo previsto nel tuo ambiente.

    Di solito, [core]parallelism è impostato come prodotto del numero massimo di worker e [celery]worker_concurrency.

I DAG non vengono pianificati dallo scheduler a causa di timeout del processore DAG

Per saperne di più su questo problema, consulta la sezione Risoluzione dei problemi dei DAG.

Contrassegnare le attività come non riuscite dopo aver raggiunto dagrun_timeout

Lo scheduler contrassegna le attività non completate (in esecuzione, pianificate e in coda) come non riuscite se un'esecuzione di DAG non viene completata entro dagrun_timeout (un parametro DAG).

Soluzione:

Sintomi di database Airflow sotto pressione di carico

A volte, nei log dello scheduler Airflow potresti vedere la seguente voce di log degli avvisi:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Si possono osservare sintomi simili anche nei log dei worker di Airflow:

Per MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Per PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Questi errori o avvisi potrebbero essere un sintomo di un database Airflow sovraccaricato dal numero di connessioni aperte o dal numero di query eseguite contemporaneamente, da scheduler o da altri componenti Airflow come worker, attivatori e server web.

Possibili soluzioni:

Il server web mostra l'avviso "Lo scheduler non sembra essere in esecuzione"

Lo scheduler segnala regolarmente il proprio battito cardiaco al database Airflow. In base a queste informazioni, il server web Airflow determina se lo scheduler è attivo.

A volte, se lo scheduler è sottoposto a un carico elevato, potrebbe non essere in grado di segnalare il battito cardiaco ogni [scheduler]scheduler-heartbeat-sec.

In una situazione del genere, il server web Airflow potrebbe mostrare il seguente avviso:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Possibili soluzioni:

Soluzioni alternative per i problemi riscontrati durante il backfill dei DAG

A volte potrebbe essere necessario eseguire nuovamente i DAG già eseguiti. Puoi farlo con lo strumento a riga di comando di Airflow nel seguente modo:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Per eseguire nuovamente solo le attività non riuscite per un DAG specifico, utilizza anche l'argomento --rerun_failed_tasks.

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Per eseguire nuovamente solo le attività non riuscite per un DAG specifico, utilizza anche l'argomento --rerun-failed-tasks.

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • START_DATE con un valore per il parametro DAG start_date, nel formato YYYY-MM-DD.
  • END_DATE con un valore per il parametro DAG end_date, nel formato YYYY-MM-DD.
  • DAG_NAME con il nome del DAG.

L'operazione di backfill a volte potrebbe generare una situazione di deadlock in cui non è possibile eseguire un backfill a causa di un blocco su un'attività. Ad esempio:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

In alcuni casi, puoi utilizzare le seguenti soluzioni alternative per superare i deadlock:

Passaggi successivi