Esegui il debug dei problemi di pianificazione delle attività

Cloud Composer 1 | Cloud Composer 2

Questo tutorial ti guida nella diagnosi e nella risoluzione dei problemi di pianificazione e analisi delle attività che causano malfunzionamenti dello scheduler, errori e latenza di analisi e guasti delle attività.

Introduzione

Lo scheduler Airflow è influenzato principalmente da due fattori: la pianificazione delle attività e l'analisi dei DAG. I problemi relativi a uno di questi fattori possono avere un impatto negativo su condizioni e prestazioni ambientali.

A volte vengono pianificate troppe attività contemporaneamente. In questo caso, la coda è piena e le attività rimangono nello stato "pianificato" o vengono riprogrammate dopo essere state messe in coda, il che potrebbe causare errori delle attività e latenza delle prestazioni.

Un altro problema comune è la latenza di analisi e gli errori causati dalla complessità di un codice DAG. Ad esempio, un codice DAG che contiene variabili Airflow al livello superiore del codice può causare ritardi nell'analisi, sovraccarico del database, errori di pianificazione e timeout dei DAG.

In questo tutorial imparerai a diagnosticare i DAG di esempio e imparerai a risolvere i problemi di pianificazione e analisi, a migliorare la pianificazione dei DAG e a ottimizzare le configurazioni dell'ambiente e del codice DAG per migliorare le prestazioni.

Obiettivi

In questa sezione sono elencati gli obiettivi degli esempi di questo tutorial.

Esempio: malfunzionamento e latenza dello scheduler causati da un'elevata contemporaneità delle attività

  • Carica il DAG di esempio eseguito più volte contemporaneamente e diagnostica il malfunzionamento dello scheduler e i problemi di latenza con Cloud Monitoring.

  • Ottimizza il codice DAG consolidando le attività e valutando l'impatto sulle prestazioni.

  • Distribuisci le attività in modo più uniforme nel tempo e valuta l'impatto sulle prestazioni.

  • Ottimizza le configurazioni e le configurazioni dell'ambiente di Airflow e valuta l'impatto.

Esempio: errori di analisi dei DAG e latenza causati da codice complesso

  • Carica il DAG di esempio con le variabili Airflow e diagnostica i problemi di analisi con Cloud Monitoring.

  • Ottimizza il codice DAG evitando le variabili Airflow al livello superiore del codice e valuta l'impatto sul momento dell'analisi.

  • Ottimizza le configurazioni e le configurazioni dell'ambiente di Airflow e valuta l'impatto sui tempi di analisi.

Costi

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

Al termine di questo tutorial, puoi evitare di continuare la fatturazione eliminando le risorse che hai creato. Per maggiori dettagli, vedi Pulizia.

Prima di iniziare

Questa sezione descrive le azioni necessarie prima di iniziare il tutorial.

Creare e configurare un progetto

Per questo tutorial è necessario un progetto Google Cloud. Configura il progetto nel seguente modo:

  1. Nella console Google Cloud, seleziona o crea un progetto:

    Vai al selettore di progetti

  2. Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata in un progetto.

  3. Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:

    • Amministratore ambienti e oggetti Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Amministratore Compute (roles/compute.admin)

Abilita le API per il tuo progetto

Attiva l'API Cloud Composer.

Abilita l'API

Crea il tuo ambiente Cloud Composer

Crea un ambiente Cloud Composer 2.

Durante la creazione dell'ambiente, concedi il ruolo Estensione agente di servizio API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) all'account agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni nel tuo progetto Google Cloud.

Esempio: malfunzionamento dello scheduler e errore dell'attività a causa di problemi di pianificazione delle attività

Questo esempio mostra il malfunzionamento dello scheduler di debug e la latenza causata da un'elevata contemporaneità delle attività.

Carica il DAG di esempio nel tuo ambiente

Carica il DAG di esempio riportato di seguito nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato dag_10_tasks_200_seconds_1.

Questo DAG ha 200 attività. Ogni attività attende 1 secondo e stampa il messaggio "Completata!". Al termine del caricamento, il DAG viene attivato automaticamente. Cloud Composer esegue questo DAG 10 volte e tutte le esecuzioni di DAG vengono eseguite in parallelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostica il malfunzionamento dello scheduler e i problemi di errore delle attività

Una volta completata l'esecuzione del DAG, apri la UI di Airflow e fai clic sul DAG dag_10_tasks_200_seconds_1. Vedrai che 10 esecuzioni di DAG sono riuscite in totale, ognuna delle quali contiene 200 attività riuscite.

Esamina i log delle attività di Airflow:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Log, quindi vai a Tutti i log > Log di Airflow > Worker > Visualizza in Esplora log.

Nell'istogramma dei log, puoi vedere gli errori e gli avvisi indicati con i colori rosso e arancione:

L'istogramma dei log worker Airflow con errori e avvisi indicati con colori rosso e arancione
Figura 1. Istogramma dei log del worker Airflow (fai clic per ingrandire)

Il DAG di esempio ha generato circa 130 avvisi e 60 errori. Fai clic su una colonna che contiene barre gialle e rosse. Nei log verranno visualizzati alcuni dei seguenti avvisi ed errori:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Questi log potrebbero indicare che l'utilizzo delle risorse ha superato i limiti e che il worker si è riavviato automaticamente.

Se un'attività Airflow rimane in coda per troppo tempo, lo scheduler la contrassegna come non riuscita e up_for_retry e la ripianifica ancora una volta per l'esecuzione. Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda e, se i picchi in questo grafico non diminuiscono in circa 10 minuti, è probabile che si verifichino errori delle attività (senza log).

Esamina le informazioni sul monitoraggio:

  1. Vai alla scheda Monitoring e seleziona Panoramica.

  2. Esamina il grafico Attività Airflow.

    Il grafico delle attività di Airflow nel tempo, che mostra un picco nel numero di attività in coda
    Figura 2. Grafico delle attività di Airflow (fai clic per ingrandire)

    Nel grafico delle attività di Airflow, è presente un picco nelle attività in coda che dura più di 10 minuti, il che potrebbe significare che nel tuo ambiente non sono disponibili risorse sufficienti per elaborare tutte le attività pianificate.

  3. Esamina il grafico Lavoratori attivi:

    Il grafico dei worker Airflow attivi nel tempo mostra che il numero di worker attivi è stato aumentato fino al limite massimo
    Figura 3. Grafico dei lavoratori attivi (fai clic per ingrandire)

    Il grafico Worker attivi indica che il DAG ha attivato la scalabilità automatica fino al limite massimo consentito di tre worker durante l'esecuzione del DAG.

  4. I grafici di utilizzo delle risorse possono indicare la mancanza di capacità nei worker di Airflow per eseguire le attività in coda. Nella scheda Monitoraggio, seleziona Lavoratori ed esamina i grafici Utilizzo totale CPU worker e Utilizzo memoria totale worker.

    Il grafico dell'utilizzo della CPU da parte dei worker di Airflow mostra l'utilizzo della CPU che aumenta fino al limite massimo
    Figura 4. Grafico di utilizzo della CPU totale dei worker (fai clic per ingrandire)
    Il grafico dell'utilizzo della memoria da parte dei worker di Airflow mostra che l'utilizzo della memoria aumenta, ma non raggiunge il limite massimo
    Figura 5. Grafico di utilizzo della memoria totale dei worker (fai clic per ingrandire)

    I grafici indicano che l'esecuzione di troppe attività contemporaneamente ha portato al raggiungimento del limite di CPU. Le risorse sono state utilizzate per oltre 30 minuti, un tempo ancora maggiore della durata totale di 200 attività in 10 esecuzioni DAG eseguite una alla volta.

Questi sono gli indicatori del riempimento della coda e della mancanza di risorse per elaborare tutte le attività pianificate.

Consolida le tue attività

Il codice attuale crea molti DAG e attività senza risorse sufficienti per elaborare tutte le attività in parallelo, causando il riempimento della coda. Se le attività rimangono in coda per troppo tempo potrebbero riprogrammarle o non riuscire. In questi casi, ti consigliamo di scegliere un numero minore di attività più consolidate.

Il seguente DAG di esempio modifica il numero di attività nell'esempio iniziale da 200 a 20 e aumenta il tempo di attesa da 1 a 10 secondi per imitare attività più consolidate che svolgono la stessa quantità di lavoro.

Carica il DAG di esempio riportato di seguito nell'ambiente creato. In questo tutorial, questo DAG è denominato dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Valuta l'impatto di attività più consolidate sui processi di pianificazione:

  1. Attendi il completamento delle esecuzioni DAG.

  2. Nella UI di Airflow, nella pagina DAG, fai clic sul DAG dag_10_tasks_20_seconds_10. Vedrai 10 esecuzioni di DAG, ognuna con 20 attività riuscite.

  3. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  4. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  5. Vai alla scheda Log, quindi vai a Tutti i log > Log di Airflow > Worker > Visualizza in Esplora log.

    Il secondo esempio con attività più consolidate ha generato circa 10 avvisi e 7 errori. Nell'istogramma puoi confrontare il numero di errori e avvisi nell'esempio iniziale (valori precedenti) e nel secondo esempio (valori successivi).

    L'istogramma dei log dei worker Airflow con errori e avvisi mostra la riduzione della quantità di errori e avvisi dopo il consolidamento delle attività
    Figura 6. Il worker Airflow registra l'istogramma dopo il consolidamento delle attività (fai clic per ingrandire)

    Quando confronti il primo esempio con quello più consolidato, puoi vedere che il secondo esempio contiene un numero significativamente inferiore di errori e avvisi. Tuttavia, gli stessi errori relativi all'arresto caldo vengono ancora visualizzati nei log a causa del sovraccarico di risorse.

  6. Nella scheda Monitoring, seleziona Worker ed esamina i grafici.

    Quando confronti il grafico Attività Airflow per il primo esempio (valori precedenti) con il grafico del secondo esempio con attività più consolidate, puoi vedere che il picco nelle attività in coda è durato per un periodo di tempo più breve quando le attività erano più consolidate. Tuttavia, è durata quasi 10 minuti, che non è ancora ottimale.

    Il grafico delle attività di Airflow nel tempo mostra che il picco delle attività di Airflow è durato un periodo di tempo più breve rispetto a prima.
    Figura 7. Grafico delle attività di Airflow dopo il consolidamento delle attività (fai clic per ingrandire)

    Nel grafico dei worker attivi puoi vedere che il primo esempio (sul lato sinistro del grafico) ha utilizzato le risorse per un periodo di tempo molto più esteso rispetto al secondo, anche se entrambi gli esempi imitano la stessa quantità di lavoro.

    Il grafico dei worker Airflow attivi nel tempo mostra che il numero di worker attivi è stato aumentato per un periodo di tempo più breve rispetto a prima.
    Figura 8. Grafico dei lavoratori attivi dopo il consolidamento delle attività (fai clic per ingrandire)

    Esamina i grafici sul consumo delle risorse dei worker. Anche se la differenza tra le risorse utilizzate nell'esempio con attività più consolidate e l'esempio iniziale è piuttosto significativa, l'utilizzo della CPU continua a raggiungere il 70% del limite.

    Il grafico dell'utilizzo della CPU da parte dei worker di Airflow mostra che l'utilizzo della CPU aumenta fino al 70% del limite massimo
    Figura 9. Grafico di utilizzo della CPU totale dei worker dopo il consolidamento delle attività (fai clic per ingrandire)
    Il grafico dell'utilizzo della memoria da parte dei worker di Airflow mostra che l'utilizzo della memoria aumenta, ma non raggiunge il limite massimo
    Figura 10. Grafico dell'utilizzo della memoria totale dei worker dopo il consolidamento delle attività (fai clic per ingrandire)

Distribuisci le attività in modo più uniforme nel tempo

Troppe attività simultanee causano l'esaurimento della coda e, di conseguenza, le attività vengono bloccate in coda o ripianificate. Nei passaggi precedenti hai ridotto il numero di attività consolidando queste attività, ma i log di output e il monitoraggio hanno indicato che il numero di attività in parallelo è ancora non ottimale.

Puoi controllare il numero di attività simultanee eseguite implementando una pianificazione o impostando dei limiti per il numero di attività eseguibili contemporaneamente.

In questo tutorial, distribuisci le attività in modo più uniforme nel tempo aggiungendo parametri a livello di DAG al DAG dag_10_tasks_20_seconds_10:

  1. Aggiungi l'argomento max_active_runs=1 al gestore del contesto DAG. Questo argomento imposta un limite relativo a una sola istanza di un'esecuzione di DAG in un determinato momento.

  2. Aggiungi l'argomento max_active_tasks=5 al gestore del contesto DAG. Questo argomento controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ogni DAG.

Carica il DAG di esempio riportato di seguito nell'ambiente creato. In questo tutorial, questo DAG è denominato dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Valuta l'impatto della distribuzione delle attività nel tempo sui processi di pianificazione:

  1. Attendi il completamento delle esecuzioni DAG.

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  3. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  4. Vai alla scheda Log, quindi vai a Tutti i log > Log di Airflow > Worker > Visualizza in Esplora log.

  5. Nell'istogramma puoi vedere che il terzo DAG con un numero limitato di attività ed esecuzioni attive non ha generato avvisi o errori e che la distribuzione dei log appare più uniforme rispetto ai valori precedenti.

    L'istogramma dei log dei worker di Airflow con errori e avvisi
    non mostra errori o avvisi dopo che le attività sono state consolidate e
    distribuite nel tempo.
    Figura 11. Il worker Airflow registra l'istogramma dopo che le attività sono state consolidate e distribuite nel tempo (fai clic per ingrandire)

Le attività nell'esempio dag_10_tasks_20_seconds_10_scheduled che ha un numero limitato di attività ed esecuzioni attive non hanno causato pressione sulle risorse perché le attività sono state messe in coda in modo uniforme.

Dopo aver eseguito i passaggi descritti, hai ottimizzato l'utilizzo delle risorse consolidando piccole attività e distribuendole in modo più uniforme nel tempo.

Ottimizza le configurazioni dell'ambiente

Puoi regolare le configurazioni dell'ambiente per assicurarti che i worker Airflow abbiano sempre la capacità di eseguire attività in coda.

Numero di worker e contemporaneità dei worker

Puoi regolare il numero massimo di worker per consentire a Cloud Composer di scalare automaticamente il tuo ambiente entro i limiti impostati.

Il parametro [celery]worker_concurrency definisce il numero massimo di attività che un singolo worker può recuperare dalla coda di attività. La modifica di questo parametro regola il numero di attività che un singolo worker può eseguire contemporaneamente. Puoi modificare questa opzione di configurazione di Airflow eseguendo l'override. Per impostazione predefinita, la contemporaneità dei worker è impostata su un valore minimo del seguente valore: 32, 12 * worker_CPU, 8 * worker_memory, il che significa che dipende dai limiti delle risorse worker. Consulta Ottimizzare gli ambienti per ulteriori informazioni sui valori predefiniti di contemporaneità dei worker.

Il numero di worker e la contemporaneità dei worker lavorano in combinazione e le prestazioni dell'ambiente dipendono fortemente da entrambi i parametri. Per scegliere la combinazione corretta, fai riferimento ai seguenti aspetti:

  • Più attività rapide in esecuzione in parallelo. Puoi aumentare la contemporaneità dei worker quando sono presenti attività in attesa in coda e i worker utilizzano contemporaneamente una bassa percentuale di CPU e memoria. Tuttavia, in determinate circostanze è possibile che la coda non venga mai riempita, determinando la mancata attivazione della scalabilità automatica. Se le piccole attività terminano l'esecuzione quando i nuovi worker sono pronti, un worker esistente può recuperare le attività rimanenti e non ci saranno attività per i worker appena creati.

    In queste situazioni, ti consigliamo di aumentare il numero minimo di worker e la contemporaneità dei worker per evitare una scalabilità eccessiva.

  • Più attività lunghe in esecuzione in parallelo. L'elevata contemporaneità dei worker impedisce al sistema di scalare il numero di worker. Se più attività richiederanno molto tempo e impiegano molte risorse per essere completate, un'elevata contemporaneità dei worker può impedire che la coda venga mai riempita e che tutte le attività vengano recuperate da un solo worker, causando problemi di prestazioni. In queste situazioni, consigliamo di aumentare il numero massimo di worker e di ridurre la contemporaneità dei worker.

L'importanza del parallelismo

Gli scheduler Airflow controllano la pianificazione delle esecuzioni dei DAG e delle singole attività dai DAG. L'opzione di configurazione di Airflow [core]parallelism controlla quante attività lo scheduler Airflow può mettere in coda nella coda dell'esecutore dopo che sono state soddisfatte tutte le dipendenze per queste attività.

Il parallelismo è un meccanismo di protezione di Airflow che determina il numero di attività che possono essere eseguite contemporaneamente per ogni scheduler, indipendentemente dal numero di worker. Il valore di parallelismo, moltiplicato per il numero di scheduler nel cluster, è il numero massimo di istanze di attività che il tuo ambiente può mettere in coda.

In genere, [core]parallelism è impostato come prodotto di un numero massimo di worker e [celery]worker_concurrency. È interessato anche dal pool. Puoi modificare questa opzione di configurazione di Airflow eseguendo l'override. Per ulteriori informazioni sulla regolazione delle configurazioni di Airflow relative alla scalabilità, consulta la pagina relativa alla scalabilità della configurazione di Airflow.

Trova configurazioni di ambiente ottimali

Il modo consigliato per risolvere i problemi di pianificazione è consolidare le attività piccole in attività più grandi e distribuire le attività in modo più uniforme nel tempo. Oltre a ottimizzare il codice DAG, puoi anche ottimizzare le configurazioni dell'ambiente in modo che abbiano una capacità sufficiente per eseguire contemporaneamente più attività.

Ad esempio, supponi di consolidare il più possibile le attività nel tuo DAG, ma limitare le attività attive per distribuirle in modo più uniforme nel tempo non è la soluzione preferita per il tuo caso d'uso specifico.

Puoi regolare il parallelismo, il numero di worker e i parametri di contemporaneità dei worker per eseguire il DAG dag_10_tasks_20_seconds_10 senza limitare le attività attive. In questo esempio, i DAG vengono eseguiti 10 volte e ogni esecuzione contiene 20 attività di piccole dimensioni. Se vuoi eseguirli tutti contemporaneamente:

  • Hai bisogno di un ambiente di dimensioni maggiori, perché controlla i parametri delle prestazioni dell'infrastruttura Cloud Composer gestita del tuo ambiente.

  • I worker Airflow devono essere in grado di eseguire 20 attività contemporaneamente, il che significa che devi impostare la contemporaneità dei worker su 20.

  • I worker hanno bisogno di CPU e memoria sufficienti per gestire tutte le attività. La contemporaneità dei worker è influenzata dalla CPU e dalla memoria dei worker, pertanto avrai bisogno di almeno worker_concurrency / 12 in CPU e least worker_concurrency / 8 in memoria.

  • Dovrai aumentare il parallelismo per trovare la corrispondenza con la maggiore contemporaneità dei worker. Affinché i worker possano recuperare 20 attività dalla coda, lo scheduler dovrà prima pianificarle.

Modifica le configurazioni dell'ambiente nel seguente modo:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Configurazione dell'ambiente.

  4. Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.

  5. Nel campo Memoria della sezione Worker, specifica il nuovo limite di memoria per i worker di Airflow. In questo tutorial, utilizza 4 GB.

  6. Nel campo CPU, specifica il nuovo limite di CPU per i worker di Airflow. In questo tutorial, utilizza 2 vCPU.

  7. Salva le modifiche e attendi qualche minuto per il riavvio dei worker di Airflow.

Quindi, esegui l'override delle opzioni di configurazione di Airflow per il parallelismo e la contemporaneità dei worker:

  1. Vai alla scheda Override della configurazione di Airflow.

  2. Fai clic su Modifica, quindi su Aggiungi override della configurazione di Airflow.

  3. Esegui l'override della configurazione di parralelismo:

    Sezione Chiave Valore
    core parallelism 20
  4. Fai clic su Aggiungi override della configurazione di Airflow e sostituisci la configurazione della contemporaneità dei worker:

    Sezione Chiave Valore
    celery worker_concurrency 20
  5. Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.

Attiva di nuovo lo stesso DAG di esempio con le configurazioni modificate:

  1. Nell'interfaccia utente di Airflow, vai alla pagina DAG.

  2. Trova il DAG dag_10_tasks_20_seconds_10 ed eliminalo.

    Dopo l'eliminazione del DAG, Airflow controlla la cartella dei DAG nel bucket del tuo ambiente ed esegue automaticamente di nuovo il DAG.

Una volta completate le esecuzioni DAG, esamina di nuovo l'istogramma dei log. Nel diagramma, puoi vedere che l'esempio dag_10_tasks_20_seconds_10 con attività più consolidate non ha generato errori e avvisi durante l'esecuzione con la configurazione dell'ambiente modificata. Confronta i risultati con i dati precedenti nel diagramma, dove lo stesso esempio ha generato errori e avvisi durante l'esecuzione con la configurazione dell'ambiente predefinito tge.

L'istogramma dei log dei worker di Airflow con errori e avvisi
        non mostra errori e avvisi dopo la modifica della configurazione
        dell'ambiente
Figura 12. Il worker Airflow registra l'istogramma dopo la regolazione della configurazione dell'ambiente (fai clic per ingrandire)

Le configurazioni di ambiente e le configurazioni di Airflow svolgono un ruolo fondamentale nella pianificazione delle attività, tuttavia non è possibile aumentare le configurazioni oltre determinati limiti.

Ti consigliamo di ottimizzare il codice DAG, consolidare le attività e utilizzare la pianificazione per ottimizzare prestazioni ed efficienza.

Esempio: errori di analisi dei DAG e latenza a causa di un codice DAG complesso

In questo esempio analizzerai la latenza di analisi di un DAG di esempio che imita un eccesso di variabili Airflow.

Crea una nuova variabile Airflow

Prima di caricare il codice campione, crea una nuova variabile Airflow.

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nella colonna Airflow webserver, segui il link Airflow per il tuo ambiente.

  3. Vai ad Amministrazione > Variabili > Aggiungi un nuovo record.

  4. Imposta i seguenti valori:

    • tasto: example_var
    • valore: test_airflow_variable

Carica il DAG di esempio nel tuo ambiente

Carica il DAG di esempio riportato di seguito nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato dag_for_loop_airflow_variable.

Questo DAG contiene un ciclo for che viene eseguito 1000 volte e che imita un eccesso di variabili Airflow. Ogni iterazione legge la variabile example_var e genera un'attività. Ogni attività contiene un comando che stampa il valore della variabile.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostica i problemi di analisi

Il tempo di analisi DAG è la quantità di tempo impiegata dallo scheduler Airflow per leggere un file DAG e analizzarlo. Prima che lo scheduler Airflow possa pianificare qualsiasi attività da un DAG, lo scheduler deve analizzare il file DAG per scoprire la struttura del DAG e delle attività definite.

Se l'analisi di un DAG richiede molto tempo, l'operazione consuma la capacità dello scheduler e potrebbe ridurre le prestazioni delle esecuzioni di DAG.

Per monitorare il tempo di analisi dei DAG:

  1. Esegui il comando dell'interfaccia a riga di comando Airflow dags report nell'interfaccia alla gcloud CLI per visualizzare il tempo di analisi per tutti i tuoi DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Sostituisci quanto segue:

    • ENVIRONMENT_NAME: il nome dell'ambiente.
    • LOCATION: la regione in cui si trova l'ambiente.
  2. Nell'output del comando, cerca il valore della durata del DAG dag_for_loop_airflow_variables. Un valore grande potrebbe indicare che questo DAG non è implementato in modo ottimale. Se disponi di più DAG, dalla tabella di output puoi identificare quelli con tempi di analisi lunghi.

    Esempio:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Controlla i tempi di analisi dei DAG nella console Google Cloud:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  4. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  5. Vai alla scheda Log, quindi vai a Tutti i log > Gestore processore DAG.

  6. Esamina i log di dag-processor-manager e identifica i possibili problemi.

    Una voce di log per il DAG di esempio mostra che il tempo di analisi DAG è di 46,3 secondi
    Figura 13. I log del gestore processore DAG mostrano i tempi di analisi dei DAG (fai clic per ingrandire)

Se il tempo totale di analisi dei DAG supera i 10 secondi circa, gli scheduler potrebbero essere sovraccaricati con l'analisi DAG e non essere in grado di eseguire i DAG in modo efficace.

Ottimizzare il codice DAG

Ti consigliamo di evitare codice Python di "primo livello" non necessario nei DAG. I DAG con molte importazioni, variabili e funzioni al di fuori dei DAG introducono tempi di analisi maggiori per lo scheduler Airflow. Questo riduce le prestazioni e la scalabilità di Cloud Composer e Airflow. Un'eccessiva lettura di variabili Airflow comporta tempi di analisi lunghi e un carico elevato del database. Se questo codice si trova in un file DAG, queste funzioni vengono eseguite su ogni battito cardiaco dello scheduler, che potrebbe essere lento.

I campi del modello di Airflow consentono di incorporare valori dalle variabili Airflow e dai modelli Jinja nei tuoi DAG. Ciò impedisce l'esecuzione di funzioni non necessarie durante gli heartbeat dello scheduler.

Per implementare l'esempio di DAG in modo migliore, evita di utilizzare le variabili Airflow nel codice Python di primo livello dei DAG. Passa invece le variabili Airflow agli operatori esistenti tramite un modello Jinja, il che ritarderà la lettura del valore fino all'esecuzione dell'attività.

Carica la nuova versione del DAG di esempio nel tuo ambiente. In questo tutorial, questo DAG è denominato dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Esamina il nuovo tempo di analisi dei DAG:

  1. Attendi il completamento dell'esecuzione del DAG.

  2. Esegui di nuovo il comando dags report per visualizzare il tempo di analisi per tutti i DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Esamina di nuovo i log dag-processor-manager e analizza la durata dell'analisi.

    Una voce di log per il DAG di esempio mostra che il tempo di analisi DAG è di 4,21 secondi
    Figura 14. I log del gestore processore DAG mostrano i tempi di analisi dei DAG dopo l'ottimizzazione del codice DAG (fai clic per ingrandire)

Sostituendo le variabili di ambiente con i modelli Airflow, hai semplificato il codice DAG e ridotto la latenza di analisi di circa dieci volte.

Ottimizza le configurazioni dell'ambiente Airflow

Lo scheduler Airflow tenta costantemente di attivare nuove attività e analizza tutti i DAG nel bucket di ambiente. Se i DAG hanno un tempo di analisi lungo e lo scheduler consuma molte risorse, puoi ottimizzare le configurazioni dello scheduler di Airflow in modo che lo scheduler utilizzi le risorse in modo più efficiente.

In questo tutorial, l'analisi dei file DAG richiede molto tempo e i cicli di analisi iniziano a sovrapporsi e, di conseguenza, a esaurire la capacità dello scheduler. Nel nostro esempio, l'analisi del primo DAG di esempio richiede più di 5 secondi, quindi configurerai lo scheduler in modo che venga eseguito meno di frequente per utilizzare le risorse in modo più efficiente. Eseguirai l'override dell'opzione di configurazione Airflow per scheduler_heartbeat_sec. Questa configurazione definisce la frequenza di esecuzione dello scheduler (in secondi). Per impostazione predefinita, il valore è impostato su 5 secondi. Puoi modificare questa opzione di configurazione di Airflow eseguendo l'override.

Esegui l'override dell'opzione di configurazione Airflow scheduler_heartbeat_sec:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Override della configurazione di Airflow.

  4. Fai clic su Modifica, quindi su Aggiungi override della configurazione di Airflow.

  5. Esegui l'override dell'opzione di configurazione Airflow:

    Sezione Chiave Valore
    scheduler scheduler_heartbeat_sec 10
  6. Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.

Controlla le metriche dello scheduler:

  1. Vai alla scheda Monitoring e seleziona Scheduler (Pianificatori).

  2. Nel grafico Heartbeat dello scheduler, fai clic sul pulsante Altre opzioni (tre puntini) e poi su Visualizza in Metrics Explorer.

Il grafico dell'heartbeat dello scheduler mostra che il battito cardiaco si verifica meno frequentemente
Figura 15. Grafico Heartbeat dello scheduler (fai clic per ingrandire)

Nel grafico, vedrai che lo scheduler viene eseguito con meno frequenza due volte dopo aver modificato la configurazione predefinita da 5 a 10 secondi. Riducendo la frequenza degli heartbeat, ti assicuri che lo scheduler non inizi a essere eseguito mentre è in corso il ciclo di analisi precedente e la capacità delle risorse dello scheduler non è esaurita.

Assegna altre risorse allo scheduler

In Cloud Composer 2, puoi allocare più risorse di CPU e memoria allo scheduler. In questo modo, puoi aumentare le prestazioni del tuo scheduler e accelerare il tempo di analisi per il tuo DAG.

Alloca CPU e memoria aggiuntive allo scheduler:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Configurazione dell'ambiente.

  4. Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.

  5. Nel campo Memoria della sezione Scheduler, specifica il nuovo limite di memoria. In questo tutorial, utilizza 4 GB.

  6. Nel campo CPU, specifica il nuovo limite di CPU. In questo tutorial, utilizza 2 vCPU.

  7. Salva le modifiche e attendi alcuni minuti per il riavvio degli scheduler Airflow.

  8. Vai alla scheda Log, quindi vai a Tutti i log > Gestore processore DAG.

  9. Esamina i log dag-processor-manager e confronta la durata di analisi per i DAG di esempio:

    Una voce di log per il DAG di esempio mostra che il tempo di analisi dei DAG per il DAG ottimizzato è di 1,5 secondi. Per il DAG non ottimizzato, il tempo di analisi è di 28,71 secondi
    Figura 16. I log del gestore processore DAG mostrano i tempi di analisi dei DAG dopo l'assegnazione di altre risorse allo scheduler (fai clic per ingrandire)

Assegnando più risorse allo scheduler, hai aumentato la capacità dello scheduler e hai ridotto in modo significativo la latenza di analisi rispetto alle configurazioni dell'ambiente predefinite. Con più risorse, lo scheduler può analizzare i DAG più velocemente, ma aumenteranno anche i costi associati alle risorse di Cloud Composer. Inoltre, non è possibile aumentare le risorse oltre un determinato limite.

Ti consigliamo di allocare le risorse solo dopo aver implementato le possibili ottimizzazioni del codice DAG e della configurazione di Airflow.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

  1. Nella console Google Cloud, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Elimina singole risorse

Se prevedi di esplorare più tutorial e guide rapide, puoi riutilizzare i progetti per evitare di superare i limiti di quota.

Elimina l'ambiente Cloud Composer. Durante questa procedura elimini anche il bucket dell'ambiente.

Passaggi successivi