Risoluzione dei problemi dello scheduler Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina fornisce informazioni e passaggi per la risoluzione dei problemi comuni con gli scheduler Airflow e i processori DAG.

Identificare l'origine del problema

Per iniziare la risoluzione dei problemi, identifica se il problema si verifica:

  • Al momento dell'analisi del DAG, mentre il DAG viene analizzato da un processore DAG di Airflow
  • Al momento dell'esecuzione, mentre il DAG viene elaborato da uno scheduler Airflow

Per saperne di più sul tempo di analisi e sul tempo di esecuzione, leggi Differenza tra il tempo di analisi DAG e il tempo di esecuzione DAG.

Esaminare i problemi di elaborazione del DAG

  1. Controlla i log del processore DAG.
  2. Controlla i tempi di analisi dei DAG.

Monitoraggio delle attività in esecuzione e in coda

Per controllare se hai attività bloccate in una coda, segui questi passaggi.

  1. Nella console Google Cloud , vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Monitoraggio.

  4. Nella scheda Monitoraggio, esamina il grafico Attività Airflow nella sezione Esecuzioni DAG e identifica i possibili problemi. Le attività Airflow sono attività in stato di coda in Airflow e possono essere inserite nella coda dell'intermediario Celery o dell'esecutore Kubernetes. Le attività nella coda Celery sono istanze di attività che vengono inserite nella coda dell'intermediario Celery.

Risoluzione dei problemi durante l'analisi dei DAG

Le sezioni seguenti descrivono i sintomi e le potenziali correzioni per alcuni problemi comuni durante l'analisi del DAG.

Numero e distribuzione temporale delle attività

Airflow può avere problemi durante la pianificazione di un numero elevato di DAG o attività contemporaneamente. Per evitare problemi di pianificazione, puoi:

  • Modifica i DAG in modo che utilizzino un numero inferiore di attività più consolidate.
  • Modifica gli intervalli di pianificazione dei DAG per distribuire le esecuzioni dei DAG in modo più uniforme nel tempo.

Scalabilità della configurazione Airflow

Airflow fornisce opzioni di configurazione che controllano il numero di attività e DAG che Airflow può eseguire contemporaneamente. Per impostare queste opzioni di configurazione, esegui l'override dei relativi valori per il tuo ambiente. Puoi anche impostare alcuni di questi valori a livello di DAG o attività.

  • Contemporaneità dei worker

    Il parametro [celery]worker_concurrency controlla il numero massimo di attività che un worker Airflow può eseguire contemporaneamente. Se moltiplichi il valore di questo parametro per il numero di worker Airflow nel tuo ambiente Cloud Composer, ottieni il numero massimo di attività che possono essere eseguite in un determinato momento nel tuo ambiente. Questo numero è limitato dall'opzione di configurazione di Airflow [core]parallelism, descritta più avanti.

    Negli ambienti Cloud Composer 3, il valore predefinito di [celery]worker_concurrency viene calcolato automaticamente in base al numero di istanze di attività simultanee leggere che un worker può ospitare. Ciò significa che il suo valore dipende dai limiti delle risorse worker. Il valore di concorrenza dei worker non dipende dal numero di worker nel tuo ambiente.

  • Numero massimo di esecuzioni di DAG attive

    L'opzione di configurazione di Airflow [core]max_active_runs_per_dag controlla il numero massimo di esecuzioni di DAG attive per DAG. Lo scheduler non crea altre esecuzioni DAG se raggiunge questo limite.

    Se questo parametro è impostato in modo errato, potresti riscontrare un problema per cui lo scheduler limita l'esecuzione del DAG perché non può creare più istanze di esecuzione del DAG in un determinato momento.

    Puoi anche impostare questo valore a livello di DAG con il parametro max_active_runs.

  • Numero massimo di attività attive per DAG

    L'opzione di configurazione di Airflow [core]max_active_tasks_per_dag controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ogni DAG.

    Se questo parametro è impostato in modo errato, potresti riscontrare un problema in cui l'esecuzione di una singola istanza DAG è lenta perché è possibile eseguire solo un numero limitato di attività DAG in un determinato momento. In questo caso, puoi aumentare il valore di questa opzione di configurazione.

    Puoi anche impostare questo valore a livello di DAG con il parametro max_active_tasks.

    Puoi utilizzare i parametri max_active_tis_per_dag e max_active_tis_per_dagrun a livello di attività per controllare quante istanze con un ID attività specifico possono essere eseguite per DAG e per esecuzione DAG.

  • Parallelismo e dimensioni del pool

    L'opzione di configurazione di Airflow [core]parallelism controlla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'executor dopo che tutte le dipendenze di queste attività sono soddisfatte.

    Si tratta di un parametro globale per l'intera configurazione di Airflow.

    Le attività vengono messe in coda ed eseguite all'interno di un pool. Gli ambienti Cloud Composer utilizzano un solo pool. La dimensione di questo pool controlla quante attività possono essere messe in coda dal pianificatore per l'esecuzione in un determinato momento. Se le dimensioni del pool sono troppo piccole, lo scheduler non può mettere in coda le attività per l'esecuzione anche se le soglie, definite dall'opzione di configurazione [core]parallelism e dall'opzione di configurazione [celery]worker_concurrency moltiplicata per il numero di worker Airflow, non sono ancora state raggiunte.

    Puoi configurare le dimensioni del pool nell'interfaccia utente di Airflow (Menu > Amministrazione > Pool). Regola le dimensioni del pool in base al livello di parallelismo previsto nel tuo ambiente.

    Di solito, [core]parallelism è impostato come prodotto del numero massimo di worker e [celery]worker_concurrency.

Risolvere i problemi relativi alle attività in esecuzione e in coda

Le sezioni seguenti descrivono i sintomi e le potenziali correzioni per alcuni problemi comuni relativi alle attività in esecuzione e in coda.

Le esecuzioni di DAG non vengono eseguite

Sintomo:

Quando una data di pianificazione per un DAG viene impostata in modo dinamico, ciò può comportare vari effetti collaterali imprevisti. Ad esempio:

  • L'esecuzione di un DAG è sempre futura e il DAG non viene mai eseguito.

  • Le esecuzioni di DAG precedenti sono contrassegnate come eseguite e riuscite nonostante non siano state eseguite.

Per saperne di più, consulta la documentazione di Apache Airflow.

Possibili soluzioni:

  • Segui i consigli riportati nella documentazione di Apache Airflow.

  • Imposta start_date statico per i DAG. In alternativa, puoi utilizzare catchup=False per disattivare l'esecuzione del DAG per le date passate.

  • Evita di utilizzare datetime.now() o days_ago(<number of days>) a meno che tu non sia consapevole degli effetti collaterali di questo approccio.

Utilizzo della funzionalità TimeTable dello scheduler Airflow

Le tabelle orarie sono disponibili a partire da Airflow 2.2.

Puoi definire una tabella oraria per un DAG con uno dei seguenti metodi:

Puoi anche utilizzare gli orari integrati.

Evita la pianificazione delle attività durante i periodi di manutenzione

Puoi definire periodi di manutenzione per il tuo ambiente in modo che la manutenzione dell'ambiente venga eseguita al di fuori degli orari in cui esegui i DAG. Puoi comunque eseguire i tuoi DAG durante i periodi di manutenzione, a condizione che sia accettabile che alcune attività possano essere interrotte e riprovate. Per saperne di più su come i periodi di manutenzione influiscono sul tuo ambiente, consulta Specificare i periodi di manutenzione.

Utilizzo di "wait_for_downstream" nei tuoi DAG

Se imposti il parametro wait_for_downstream su True nei tuoi DAG, affinché un'attività venga completata correttamente, anche tutte le attività immediatamente downstream di questa attività devono essere completate correttamente. Ciò significa che l'esecuzione delle attività appartenenti a una determinata esecuzione di DAG potrebbe essere rallentata dall'esecuzione delle attività dell'esecuzione di DAG precedente. Scopri di più nella documentazione di Airflow.

Le attività in coda da troppo tempo verranno annullate e riprogrammate

Se un'attività Airflow viene mantenuta in coda troppo a lungo, lo scheduler la riprogramma per l'esecuzione dopo che è trascorso il periodo di tempo impostato nell'opzione di configurazione di Airflow [scheduler]task_queued_timeout. Il valore predefinito è 2400. Nelle versioni di Airflow precedenti alla 2.3.1, l'attività viene contrassegnata anche come non riuscita e viene riprovata se idonea per un nuovo tentativo.

Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda (scheda "Monitoraggio" nella UI di Cloud Composer). Se i picchi in questo grafico non scendono entro circa due ore, le attività verranno molto probabilmente riprogrammate (senza log), seguite dalle voci di log "Adopted tasks were still pending ..." (Le attività adottate erano ancora in attesa…) nei log dello scheduler. In questi casi, nei log delle attività Airflow potresti visualizzare il messaggio "Impossibile trovare il file di log…" perché l'attività non è stata eseguita.

In generale, questo comportamento è previsto e la successiva istanza dell'attività pianificata deve essere eseguita in base alla pianificazione. Se osservi molti casi di questo tipo nei tuoi ambienti Cloud Composer, potrebbe significare che non ci sono abbastanza worker Airflow nel tuo ambiente per elaborare tutte le attività pianificate.

Risoluzione: per risolvere il problema, devi assicurarti che i worker Airflow abbiano sempre capacità per eseguire le attività in coda. Ad esempio, puoi aumentare il numero di worker o worker_concurrency. Puoi anche ottimizzare il parallelismo o i pool per evitare di mettere in coda più attività della capacità disponibile.

Le attività bloccate nella coda potrebbero impedire l'esecuzione di un DAG specifico

Per risolvere il problema, esegui l'upgrade dell'ambiente a Cloud Composer versione 2.1.12 o successive.

Nei casi normali, lo scheduler Airflow dovrebbe essere in grado di gestire le situazioni in cui ci sono attività nella coda e per qualche motivo non è possibile eseguirle correttamente (ad esempio quando è stato eliminato un DAG a cui appartengono queste attività).

Se queste attività non vengono eliminate dal pianificatore, potrebbe essere necessario eliminarle manualmente. Puoi farlo, ad esempio, nell'interfaccia utente di Airflow (Menu > Browser > Istanze attività), trova le attività in coda ed eliminale.

Approccio di Cloud Composer al parametro min_file_process_interval

Cloud Composer modifica il modo in cui [scheduler]min_file_process_interval viene utilizzato dallo scheduler Airflow.

Nelle versioni di Cloud Composer precedenti alla 2.0.26, [scheduler]min_file_process_interval viene ignorato.

Nelle versioni di Cloud Composer successive alla 2.0.26:

Lo scheduler Airflow viene riavviato dopo un certo numero di volte in cui tutti i DAG vengono pianificati e il parametro [scheduler]num_runs controlla quante volte viene eseguito dallo scheduler. Quando lo scheduler raggiunge [scheduler]num_runs cicli di pianificazione, viene riavviato. Lo scheduler è un componente stateless e il riavvio è un meccanismo di riparazione automatica per eventuali problemi che lo scheduler potrebbe riscontrare. Il valore predefinito di [scheduler]num_runs è 5000.

[scheduler]min_file_process_interval può essere utilizzato per configurare la frequenza di analisi DAG, ma questo parametro non può essere più lungo del tempo necessario a uno scheduler per eseguire i loop [scheduler]num_runs durante la pianificazione dei DAG.

Contrassegno delle attività come non riuscite dopo aver raggiunto dagrun_timeout

Lo scheduler contrassegna le attività non completate (in esecuzione, pianificate e in coda) come non riuscite se un'esecuzione DAG non viene completata entro dagrun_timeout (un parametro DAG).

Soluzione:

Sintomi di un carico elevato del database Airflow

A volte, nei log dello scheduler Airflow potresti visualizzare la seguente voce di log di avviso:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Nei log dei worker di Airflow potrebbero essere osservati anche sintomi simili:

Per MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Per PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Questi errori o avvisi potrebbero essere un sintomo del sovraccarico del database Airflow dovuto al numero di connessioni aperte o al numero di query eseguite nello stesso periodo di tempo, dagli scheduler o da altri componenti di Airflow come worker, trigger e web server.

Possibili soluzioni:

Il server web mostra l'avviso "The scheduler does not appear to be running"

Lo scheduler invia regolarmente il suo heartbeat al database Airflow. In base a queste informazioni, il server web di Airflow determina se lo scheduler è attivo.

A volte, se lo scheduler è sottoposto a un carico elevato, potrebbe non essere in grado di comunicare il suo heartbeat ogni [scheduler]scheduler_heartbeat_sec.

In una situazione del genere, il server web Airflow potrebbe mostrare il seguente avviso:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Possibili soluzioni:

  • Aumenta le risorse di CPU e memoria per lo scheduler.

  • Ottimizza i DAG in modo che l'analisi e la pianificazione siano più rapide e non consumino troppe risorse dello scheduler.

  • Evita di utilizzare variabili globali nei DAG Airflow. Utilizza invece le variabili di ambiente e le variabili Airflow.

  • Aumenta il valore dell'opzione di configurazione di Airflow [scheduler]scheduler_health_check_threshold in modo che il server web attenda più a lungo prima di segnalare l'indisponibilità dello scheduler.

Soluzioni alternative per i problemi riscontrati durante il backfill dei DAG

A volte potresti voler eseguire nuovamente i DAG già eseguiti. Puoi farlo con un comando Airflow CLI nel seguente modo:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Per eseguire nuovamente solo le attività non riuscite per un DAG specifico, utilizza anche l'argomento --rerun-failed-tasks.

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • START_DATE con un valore per il parametro DAG start_date, nel formato YYYY-MM-DD.
  • END_DATE con un valore per il parametro DAG end_date, nel formato YYYY-MM-DD.
  • DAG_NAME con il nome del DAG.

L'operazione di backfill a volte potrebbe generare una situazione di deadlock in cui un backfill non è possibile perché è presente un blocco su un'attività. Ad esempio:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

In alcuni casi, puoi utilizzare le seguenti soluzioni alternative per superare i deadlock:

  • Disattiva il mini-scheduler ignorando [core]schedule_after_task_execution a False.

  • Esegui i backfill per intervalli di date più ristretti. Ad esempio, imposta START_DATE e END_DATE per specificare un periodo di un solo giorno.

Passaggi successivi