Risolvere i problemi di pianificazione delle attività

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questo tutorial illustra la procedura di diagnosi e risoluzione dei problemi di pianificazione e analisi delle attività che causano malfunzionamento dell'organizzatore, errori di analisi e latenza e fallimento delle attività.

Introduzione

Lo scheduler Airflow è influenzato principalmente da due fattori: la pianificazione delle attività e Analisi dei DAG. I problemi in uno di questi fattori possono avere un impatto negativo sulla l'integrità e le prestazioni dell'ambiente.

A volte vengono programmate troppe attività contemporaneamente. In questa situazione, la coda è piena e le attività rimangono nella sezione "pianificata" o diventare riprogrammate dopo essere state messe in coda, il che potrebbe causare errori e prestazioni delle attività una latenza di pochi millisecondi.

Un altro problema comune è l'analisi della latenza e degli errori causati dalla complessità del un codice DAG. Ad esempio, un codice DAG che contiene le variabili Airflow in alto livello del codice può causare ritardi nell'analisi, sovraccarico del database, pianificazione errori e timeout dei DAG.

In questo tutorial, eseguirai la diagnosi dei DAG di esempio e imparerai a risolvere i problemi di pianificazione e analisi, migliorare la pianificazione dei DAG e ottimizzare il codice DAG e le configurazioni dell'ambiente per migliorare le prestazioni.

Obiettivi

In questa sezione sono elencati gli obiettivi per gli esempi di questo tutorial.

Esempio: malfunzionamento e latenza dell'organizzatore causati da una concorrenza elevata delle attività

  • Carica il DAG di esempio eseguito più volte contemporaneamente ed esegui la diagnostica il malfunzionamento dello scheduler e i problemi di latenza con Cloud Monitoring.

  • Ottimizza il codice DAG consolidando le attività e valutando e l'impatto sulle prestazioni.

  • Distribuisci le attività in modo più uniforme nel tempo e valuta l'impatto sul rendimento.

  • Ottimizzare le configurazioni di Airflow e di ambiente valutarne l'impatto.

Esempio: latenza e errori di analisi dei DAG causati da codice complesso

  • Carica il DAG di esempio con le variabili Airflow e diagnostica i problemi di analisi con Cloud Monitoring.

  • Ottimizza il codice DAG evitando le variabili Airflow al livello superiore di il codice e valutare l'impatto sul tempo di analisi.

  • Ottimizza le configurazioni di Airflow e dell'ambiente e valuta l'impatto sul tempo di analisi.

Costi

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

Al termine di questo tutorial, puoi evitare la fatturazione continua eliminando il le risorse che hai creato. Per maggiori dettagli, vedi Pulizia.

Prima di iniziare

Questa sezione descrive le azioni necessarie prima di iniziare il tutorial.

Creare e configurare un progetto

Per questo tutorial è necessario un account Google Cloud progetto. Configura il progetto nel seguente modo:

  1. Nella console Google Cloud, seleziona o crea un progetto:

    Vai al selettore di progetti

  2. Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata per un progetto.

  3. Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:

    • Amministratore ambienti e oggetti Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Amministratore Compute (roles/compute.admin)

Abilita le API per il tuo progetto

Enable the Cloud Composer API.

Enable the API

Crea l'ambiente Cloud Composer

Crea un ambiente Cloud Composer 2.

Durante la creazione dell'ambiente, devi concedere il ruolo Estensione agente di servizio API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) all'account agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni nel progetto Google Cloud.

Esempio: malfunzionamento dello scheduler e errore delle attività a causa di problemi di pianificazione delle attività

Questo esempio mostra il debug del malfunzionamento e della latenza dell'organizzatore causati da una concorrenza elevata delle attività.

Carica il DAG di esempio nel tuo ambiente

Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato dag_10_tasks_200_seconds_1.

Questo DAG ha 200 attività. Ogni attività attende 1 secondo e stampa "Completato!". Il DAG viene attivato automaticamente dopo il caricamento. Cloud Composer esegue questo DAG 10 volte e tutte le esecuzioni dei DAG avvengono in parallelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostica i problemi di malfunzionamento dello scheduler e di errore delle attività

Al termine delle esecuzioni del DAG, apri la UI di Airflow e fai clic sul pulsante dag_10_tasks_200_seconds_1 DAG. Vedrai che sono state eseguite 10 esecuzioni di DAG in totale e ognuna ha 200 attività riuscite.

Esamina i log delle attività Airflow:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  3. Vai alla scheda Log, quindi vai a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.

Nell'istogramma dei log, puoi vedere gli errori e gli avvisi indicati con i colori rosso e arancione:

L'istogramma dei log dei worker di Airflow con errori e avvisi
    indicati con i colori rosso e arancione
Figura 1. Istogramma dei log del worker Airflow (fai clic per ingrandire)

Il DAG di esempio ha generato circa 130 avvisi e 60 errori. Fai clic su una colonna contenente barre gialle e rosse. Vedrai alcune delle seguenti opzioni avvisi ed errori nei log:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Questi log potrebbero indicare che l'utilizzo delle risorse ha superato i limiti e il worker si è riavviato automaticamente.

Se un'attività Airflow rimane in coda per troppo tempo, lo scheduler la contrassegna come non riuscita e up_for_retry e la riprogramma di nuovo per l'esecuzione. Un modo per osservare i sintomi di questa situazione è osservare la con il numero di attività in coda e se i picchi in questo grafico in circa 10 minuti, si verificheranno probabilmente degli errori nelle attività (senza alcuna log).

Esamina le informazioni di monitoraggio:

  1. Vai alla scheda Monitoraggio e seleziona Panoramica.

  2. Esamina il grafico Attività Airflow.

    Il grafico delle attività Airflow nel tempo, che mostra un picco nel
    numero di attività in coda
    Figura 2. Grafico delle attività Airflow (fai clic per ingrandire)

    Nel grafico delle attività di Airflow, c'è un picco di attività in coda che dura per più di 10 minuti, il che potrebbe significare che le risorse non sono sufficienti nel tuo ambiente per elaborare tutte le attività pianificate.

  3. Esamina il grafico Worker attivi:

    Il grafico dei worker Airflow attivi nel tempo mostra che il
    numero di worker attivi è stato aumentato fino al limite massimo
    Figura 3. Grafico dei worker attivi (fai clic per ingrandire)

    Il grafico Worker attivi indica che il DAG ha attivato la scalabilità automatica fino al limite massimo consentito di tre worker durante l'esecuzione del DAG.

  4. I grafici di utilizzo delle risorse possono indicare la mancanza di capacità nei worker di Airflow per eseguire attività in coda. Nella scheda Monitoraggio, seleziona Worker e esamina i grafici Utilizzo totale CPU worker e Utilizzo totale memoria worker.

    Il grafico dell'utilizzo della CPU da parte dei worker di Airflow mostra l'utilizzo della CPU
    aumentando fino al limite massimo
    Figura 4. Grafico di utilizzo totale della CPU dei worker (fai clic per ingrandire)
    Il grafico dell'utilizzo della memoria da parte dei worker Airflow mostra un aumento dell'utilizzo della memoria, ma non raggiunge il limite massimo
    Figura 5. Grafico totale dell'utilizzo della memoria dei worker (fai clic per ingrandire)

    I grafici indicano che l'esecuzione di troppe attività contemporaneamente per raggiungere il limite di CPU. Le risorse sono state utilizzate da oltre 30 di minuti, che è ancora più lungo della durata totale di 200 attività in 10 I DAG vengono eseguiti singolarmente.

Questi sono gli indicatori del riempimento della coda e della mancanza di risorse per elaborare tutte le attività pianificate.

Consolida le tue attività

Il codice attuale crea molti DAG e attività senza risorse sufficienti per elaborare tutte le attività in parallelo, il che comporta il riempimento della coda. Se le attività rimangono in coda per troppo tempo, la loro riprogrammazione potrebbe non andare a buon fine. In questi casi, ti consigliamo di optare per un numero minore di attività più consolidate.

Il seguente DAG di esempio modifica il numero di attività nell'esempio iniziale da 200 a 20 e aumenta il tempo di attesa da 1 a 10 secondi per imitare altre attività consolidate che svolgono la stessa quantità di lavoro.

Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Valuta l'impatto di attività più consolidate sulla pianificazione dei processi:

  1. Attendi il completamento delle esecuzioni del DAG.

  2. Nell'interfaccia utente di Airflow, nella pagina DAG, fai clic sul DAG dag_10_tasks_20_seconds_10. Vedrai 10 esecuzioni di DAG, ognuna con 20 e attività completate correttamente.

  3. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  4. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  5. Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.

    Il secondo esempio con attività più consolidate ha generato circa 10 avvisi e 7 errori. Nell'istogramma, puoi confrontare il numero di errori e avvisi nell'esempio iniziale (valori precedenti) e nel secondo esempio (valori successivi).

    L'istogramma dei log dei worker di Airflow con errori e avvisi
    mostra la diminuzione degli errori e degli avvisi dopo che le attività
    consolidato
    Figura 6. Istogramma dei log dei worker di Airflow dopo il consolidamento delle attività (fai clic per ingrandire)

    Se confronti il primo esempio con quello più consolidato, puoi vedere che il secondo contiene molto meno errori e avvisi. Tuttavia, gli stessi errori relativi all'arresto a caldo continuano a essere visualizzati in dei log a causa del sovraccarico delle risorse.

  6. Nella scheda Monitoraggio, seleziona Worker ed esamina i grafici.

    Quando confronti il grafico delle attività Airflow per il primo esempio (precedente valori) con il grafico per il secondo esempio con attività più consolidate, puoi vedere che il picco di attività in coda è durato per un periodo più breve tempo in cui le attività erano più consolidate. Tuttavia, è durato quasi 10 di minuti, che non è comunque ottimale.

    Il grafico delle attività Airflow nel tempo mostra che il picco di
    Le attività Airflow sono durate per un periodo di tempo più breve rispetto a prima.
    Figura 7. Grafico delle attività di Airflow dopo attività consolidate (fai clic per ingrandire)

    Nel grafico Lavoratori attivi, puoi vedere che il primo esempio (sul lato sinistro del grafico) ha utilizzato le risorse per un periodo di tempo molto più lungo rispetto al secondo, anche se entrambi gli esempi simulano la stessa quantità di lavoro.

    Il grafico dei worker Airflow attivi nel tempo mostra che
    il numero di lavoratori attivi è stato aumentato per un periodo di tempo più breve
    rispetto a prima.
    Figura 8. Grafico dei worker attivi dopo il attività consolidate (fai clic per ingrandire)

    Esamina i grafici sul consumo delle risorse dei worker. Anche se la differenza tra le risorse utilizzate nell'esempio con attività più consolidate e l'esempio iniziale è abbastanza significativo, l'utilizzo della CPU è ancora fino al 70% del limite.

    Il grafico dell'utilizzo della CPU da parte dei worker di Airflow mostra un aumento dell'utilizzo della CPU fino al 70% del limite massimo
    Figura 9. Grafico dell'utilizzo totale della CPU dei worker dopo il attività consolidate (fai clic per ingrandire)
    Il grafico dell'utilizzo della memoria da parte dei worker Airflow mostra un aumento dell'utilizzo della memoria, ma non raggiunge il limite massimo
    Figura 10. Grafico dell'utilizzo totale della memoria dei worker dopo le attività sono state consolidate (fai clic per ingrandire)

Distribuisci le attività in modo più uniforme nel tempo

Troppe attività simultanee comportano il riempimento della coda, il che porta a bloccate nella coda o riprogrammate. Nei passaggi precedenti hai ridotto il numero di attività consolidandole, tuttavia, i log di output e il monitoraggio hanno indicato che il numero di attività simultanee è ancora subottimale.

Puoi controllare il numero di esecuzioni di attività simultanee implementando una pianificazione o l'impostazione di limiti per il numero di attività che possono essere eseguite contemporaneamente.

In questo tutorial, distribuisci le attività in modo più uniforme nel tempo aggiungendo parametri a livello di DAG nel DAG dag_10_tasks_20_seconds_10:

  1. Aggiungi l'argomento max_active_runs=1 al gestore di contesto del DAG. Questo argomento imposta un limite di una sola istanza di esecuzione di un DAG in un determinato momento.

  2. Aggiungi l'argomento max_active_tasks=5 al gestore di contesto del DAG. Questo argomento controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ogni DAG.

Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Valuta l'impatto della distribuzione delle attività nel tempo sulla pianificazione dei processi:

  1. Attendi il completamento delle esecuzioni del DAG.

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  3. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  4. Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.

  5. Nell'istogramma, puoi vedere che il terzo DAG con un numero limitato di attività ed esecuzioni attive non ha generato avvisi o errori e la distribuzione dei log sembra più uniforme rispetto ai valori precedenti.

    L'istogramma dei log dei worker di Airflow con errori e avvisi
    non mostra errori o avvisi dopo il consolidamento delle attività
    distribuiti nel tempo.
    Figura 11. Il worker Airflow registra l'istogramma dopo il le attività sono state consolidate e distribuite nel tempo (fai clic per ingrandire)

Le attività nell'esempio dag_10_tasks_20_seconds_10_scheduled che hanno un un numero limitato di attività ed esecuzioni attive non ha causato pressione sulle risorse le attività erano messe in coda in modo uniforme.

Dopo aver eseguito i passaggi descritti, hai ottimizzato l'utilizzo delle risorse consolidando piccole attività e distribuendole in modo più uniforme nel tempo.

Ottimizza le configurazioni dell'ambiente

Puoi regolare le configurazioni dell'ambiente per assicurarti che i worker Airflow abbiano sempre la capacità di eseguire attività in coda.

Numero di worker e contemporaneità dei worker

Puoi modificare il numero massimo di worker per fare in modo che Cloud Composer scala automaticamente il tuo ambiente i limiti impostati.

Il parametro [celery]worker_concurrency definisce il numero massimo di attività che un singolo worker può recuperare dalla coda di attività. La modifica di questo parametro consente di regolare il numero di attività che un singolo worker può eseguire contemporaneamente. Puoi modificare questa opzione di configurazione di Airflow tramite scattando l'override. Per impostazione predefinita, la contemporaneità dei worker è impostata su un almeno quanto segue: 32, 12 * worker_CPU, 8 * worker_memory, che significa dipende dai limiti delle risorse worker. Consulta: Ottimizza gli ambienti per ulteriori informazioni sugli ambienti predefiniti valori di contemporaneità dei worker.

Il numero di worker e la concorrenza dei worker agiscono in combinazione tra loro e le prestazioni del tuo ambiente dipendono molto da entrambi i parametri. Per scegliere la combinazione corretta, puoi utilizzare le seguenti considerazioni:

  • Più attività rapide in esecuzione in parallelo. Puoi aumentare il quando ci sono attività in attesa e i tuoi lavoratori usano una bassa percentuale di CPU e memoria contemporaneamente. Tuttavia, sotto la coda potrebbe non riempirsi mai, causando la scalabilità automatica non si attivano mai. Se le attività di piccole dimensioni terminano l'esecuzione quando i nuovi worker sono pronti, un worker esistente può riprendere le attività rimanenti non saranno attività per i worker appena creati.

    In queste situazioni, è consigliabile Aumenta il numero minimo di worker e la contemporaneità dei worker per evitare una scalabilità eccessiva.

  • Più attività lunghe in esecuzione in parallelo. L'elevata concorrenza dei worker impedisce al sistema di scalare il numero di worker. Se vengono eseguite più attività richiedono molte risorse e il loro completamento richiede molto tempo. la contemporaneità può portare a un mancato riempimento della coda e a tutte le attività rilevato da un solo worker, con conseguenti problemi di prestazioni. In queste situazioni in cui è consigliabile Aumenta il numero massimo di worker e diminuisci la contemporaneità dei worker.

L'importanza del parallelismo

Gli scheduler di Airflow controllano la pianificazione delle esecuzioni dei DAG e delle singole attività e i DAG. L'opzione di configurazione [core]parallelism di Airflow controlla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'executor dopo che tutte le dipendenze per queste attività sono soddisfatte.

Il parallelismo è un meccanismo protettivo di Airflow che determina quante attività possono essere eseguite contemporaneamente in ogni scheduler, indipendentemente dal conteggio dei worker. Il valore di parallelismo, moltiplicato per il numero di scheduler nel cluster, è il numero massimo di istanze di attività che il tuo ambiente può mettere in coda.

In genere, [core]parallelism viene impostato come prodotto di un numero massimo di worker e [celery]worker_concurrency. È influenzata anche dalle pool. Puoi modificare questa opzione di configurazione di Airflow tramite scattando l'override. Per ulteriori informazioni sulla regolazione di Airflow configurazioni relative alla scalabilità, consulta Scalabilità della configurazione di Airflow.

Trovare configurazioni dell'ambiente ottimali

Il modo consigliato per risolvere i problemi di pianificazione è raggruppare le piccole attività in attività più grandi e distribuirle in modo più uniforme nel tempo. Oltre a ottimizzare il codice DAG, puoi anche ottimizzare le configurazioni dell'ambiente per avere una capacità sufficiente per l'esecuzione di più attività contemporaneamente.

Ad esempio, supponiamo di consolidare attività nel DAG il più possibile, ma limitando le attività attive per distribuirle in modo più uniforme non è una soluzione preferita per il tuo caso d'uso specifico.

Puoi regolare il parallelismo, il numero di worker e la contemporaneità dei worker parametri per eseguire il DAG dag_10_tasks_20_seconds_10 senza limitare lo stato attivo attività di machine learning. In questo esempio, il DAG viene eseguito 10 volte e ogni esecuzione contiene 20 piccole attività. Se vuoi eseguirli tutti contemporaneamente:

  • Avrai bisogno di una dimensione dell'ambiente più grande, perché controlla le prestazioni dell'infrastruttura Cloud Composer gestita del tuo ambiente.

  • I worker di Airflow devono essere in grado di eseguire 20 attività contemporaneamente, il che significa che devi impostare la concorrenza dei worker su 20.

  • I worker hanno bisogno di CPU e memoria sufficienti per gestire tutte le attività. La concorrenza dei worker è influenzata dalla CPU e dalla memoria dei worker, pertanto sono necessarie almeno worker_concurrency / 12 in CPU e least worker_concurrency / 8 in memoria.

  • Dovrai aumentare il parallelismo per far corrispondere la contemporaneità dei worker più alta. Affinché i worker possano recuperare 20 attività dalla coda, il programmatore dovrà prima pianificarle.

Modifica le configurazioni dell'ambiente nel seguente modo:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.

  3. Vai alla scheda Configurazione dell'ambiente.

  4. Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.

  5. Nella sezione Worker, specifica la nuova memoria nel campo Memoria per i worker di Airflow. In questo tutorial, utilizza 4 GB.

  6. Nel campo CPU, specifica il nuovo limite di CPU per i worker di Airflow. In questo usa 2 vCPU.

  7. Salva le modifiche e attendi alcuni minuti per il riavvio dei worker Airflow.

A questo punto, sostituisci le opzioni di configurazione di Airflow per il parallelismo e la concorrenza dei worker:

  1. Vai alla scheda Override della configurazione di Airflow.

  2. Fai clic su Modifica e poi su Aggiungi override della configurazione di Airflow.

  3. Sostituisci la configurazione del parallelismo:

    Sezione Chiave Valore
    core parallelism 20
  4. Fai clic su Aggiungi override della configurazione Airflow ed esegui l'override della configurazione della concorrenza dei worker:

    Sezione Chiave Valore
    celery worker_concurrency 20
  5. Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.

Attiva di nuovo lo stesso DAG di esempio con le configurazioni modificate:

  1. Nella UI di Airflow, vai alla pagina DAG.

  2. Trova il DAG dag_10_tasks_20_seconds_10 ed eliminalo.

    Dopo l'eliminazione del DAG, Airflow controlla la cartella dei DAG in del bucket dell'ambiente ed esegue automaticamente di nuovo il DAG.

Al termine delle esecuzioni del DAG, esamina di nuovo l'istogramma dei log. Nel diagramma, puoi vedere che l'esempio dag_10_tasks_20_seconds_10 con più attività consolidate non ha generato errori e avvisi durante l'esecuzione con la configurazione dell'ambiente modificata. Confronta i risultati con i dati precedenti nel diagramma, dove lo stesso esempio ha generato errori e avvisi quando è stato eseguito con la configurazione dell'ambiente predefinita.

L'istogramma dei log dei worker di Airflow con errori e avvisi
        non mostra errori e avvisi dopo che la configurazione dell'ambiente è stata
        modificato
Figura 12. Istogramma dei log del worker Airflow dopo aver regolato la configurazione dell'ambiente (fai clic per ingrandire)

Le configurazioni dell'ambiente e di Airflow svolgono un ruolo fondamentale nella programmazione delle attività, tuttavia non è possibile aumentare le configurazioni oltre determinati limiti.

Ti consigliamo di ottimizzare il codice DAG, consolidare le attività e utilizzare la pianificazione per ottimizzare il rendimento e l'efficienza.

Esempio: errori di analisi e latenza del DAG a causa di un codice DAG complesso

In questo esempio, esamini la latenza di analisi di un DAG di esempio che imita un eccesso di variabili Airflow.

Creare una nuova variabile Airflow

Prima di caricare il codice campione, crea una nuova variabile Airflow.

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nella colonna Server web Airflow, segui il link Airflow per il tuo ambiente.

  3. Vai ad Amministrazione > Voci > Aggiungi un nuovo record.

  4. Imposta i seguenti valori:

    • key: example_var
    • valore: test_airflow_variable

Carica il DAG di esempio nel tuo ambiente

Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato dag_for_loop_airflow_variable.

Questo DAG contiene un ciclo for che viene eseguito 1000 volte e imita un eccesso di variabili Airflow. Ogni iterazione legge la variabile example_var e genera un'attività. Ogni attività contiene un comando che stampa il valore della variabile.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostica i problemi di analisi

Il tempo di analisi del DAG è il tempo necessario allo scheduler di Airflow per leggere e analizzare un file DAG. Prima che lo scheduler Airflow possa pianificare qualsiasi attività da un DAG, lo scheduler deve analizzare il file DAG per scoprire la struttura il DAG e le attività definite.

Se l'analisi di un DAG richiede molto tempo, viene consumata la capacità dello scheduler e potrebbe essere ridotto il rendimento delle esecuzioni del DAG.

Per monitorare il tempo di analisi del DAG:

  1. Esegui il comando dell'interfaccia a riga di comando Airflow dags report in gcloud CLI per visualizzare il tempo di analisi di tutti i DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Sostituisci quanto segue:

    • ENVIRONMENT_NAME: il nome del tuo ambiente.
    • LOCATION: la regione in cui si trova l'ambiente.
  2. Nell'output del comando, cerca il valore della durata per il DAGdag_for_loop_airflow_variables. Un valore elevato potrebbe indicare che questo DAG non è implementato in modo ottimale. Se hai più DAG, dalla tabella di output, puoi identificare i DAG con un tempo di analisi lungo.

    Esempio:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Controlla i tempi di analisi del DAG nella console Google Cloud:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  4. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.

  5. Vai alla scheda Log, poi a Tutti i log > Gestore processore DAG.

  6. Esamina i log dag-processor-manager e identifica i possibili problemi.

    Una voce di log per il DAG di esempio mostra che il tempo di analisi del DAG è di 46,3 secondi
    Figura 13. I log del gestore del processore DAG mostrano i tempi di analisi del DAG (fai clic per ingrandire)

Se il tempo totale di analisi dei DAG supera circa 10 secondi, gli scheduler potrebbero essere è sovraccarico con l'analisi dei DAG e non può eseguire i DAG in modo efficace.

Ottimizza il codice DAG

È consigliato di evitare codice Python "di primo livello" non necessario nei DAG. I DAG con molti import, variabili e funzioni al di fuori del DAG introducono tempi di analisi più elevati per lo scheduler Airflow. Ciò riduce le prestazioni e la scalabilità di Cloud Composer e Airflow. Eccesso di lettura delle variabili Airflow porta a lunghi tempi di analisi e a un carico del database elevato. Se questo codice si trova in un DAG queste funzioni vengono eseguite su ogni heartbeat dello scheduler, il che potrebbe essere lento.

I campi del modello di Airflow ti consentono di incorporare i valori di Airflow e modelli Jinja nei tuoi DAG. In questo modo si evitano durante l'esecuzione di una funzione durante gli heartbeat dello scheduler.

Per implementare l'esempio di DAG in modo migliore, evita di utilizzare le variabili Airflow nel codice Python di primo livello dei DAG. Passa invece le variabili Airflow agli operatori esistenti tramite un modello Jinja, che ritarderà la lettura del valore fino all'esecuzione dell'attività.

Carica la nuova versione del DAG di esempio nel tuo completamente gestito di Google Cloud. In questo tutorial, questo DAG è denominato dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Controlla il nuovo tempo di analisi del DAG:

  1. Attendi il completamento dell'esecuzione del DAG.

  2. Esegui di nuovo il comando dags report per visualizzare di analisi per tutti i DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Esamina di nuovo dag-processor-manager log e per analizzare la durata dell'analisi.

    Una voce di log per il DAG di esempio mostra che il tempo di analisi del DAG è di 4,21 secondi
    Figura 14. I log del gestore processore DAG mostrano il DAG tempi di analisi dopo l'ottimizzazione del codice DAG (fai clic per ingrandire)

Sostituendo le variabili di ambiente con i modelli Airflow, hai semplificato il codice DAG e ha ridotto la latenza di analisi di circa dieci volte.

Ottimizza le configurazioni dell'ambiente Airflow

Lo scheduler di Airflow tenta costantemente di attivare nuove attività e analizza tutti i DAG nel bucket dell'ambiente. Se i DAG hanno un tempo di analisi lungo e lo scheduler consuma molte risorse, puoi ottimizzare le configurazioni dello scheduler di Airflow in modo che utilizzi le risorse in modo più efficiente.

In questo tutorial, i cicli di analisi e analisi dei file DAG richiedono molto tempo iniziano a sovrapporsi e questo comporta l'esaurimento della capacità dello scheduler. Nel nostro esempio, l'analisi del primo DAG richiede più di 5 secondi, quindi configurerai lo scheduler in modo che venga eseguito meno di frequente per utilizzare le risorse in modo più efficiente. Eseguirai l'override dell'opzione di configurazione di Airflow scheduler_heartbeat_sec. Questa configurazione definisce la frequenza scheduler dovrebbe essere eseguito (in secondi). Per impostazione predefinita, il valore è impostato su 5 secondi. Puoi modificare questa opzione di configurazione di Airflow tramite scattando l'override.

Esegui l'override dell'opzione di configurazione Airflow scheduler_heartbeat_sec:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  3. Vai alla scheda Override della configurazione Airflow.

  4. Fai clic su Modifica e poi su Aggiungi override della configurazione di Airflow.

  5. Esegui l'override dell'opzione di configurazione Airflow:

    Sezione Chiave Valore
    scheduler scheduler_heartbeat_sec 10
  6. Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.

Controlla le metriche dello scheduler:

  1. Vai alla scheda Monitoring e seleziona Scheduler.

  2. Nel grafico Heartbeat dell'agente di pianificazione, fai clic sul pulsante Altre opzioni (tre puntini) e poi su Visualizza in Metrics Explorer.

Il grafico dell'heartbeat dello scheduler mostra che l'heartbeat si verifica meno di frequente
Figura 15. Grafico heartbeat dello scheduler (fai clic per ingrandire)

Sul grafico vedrai che lo scheduler viene eseguito due volte meno frequentemente dopo la configurazione predefinita è cambiata da 5 a 10 secondi. Riduce il valore frequenza di heartbeat, ti assicuri che lo scheduler non avvii in esecuzione mentre il precedente ciclo di analisi è in corso e lo scheduler la capacità della risorsa non è esaurita.

Assegna più risorse all'organizzatore

In Cloud Composer 2, puoi allocare più risorse di CPU e memoria allo schedulatore. In questo modo, puoi aumentare le prestazioni dello scheduler accelerare i tempi di analisi del DAG.

Assegna CPU e memoria aggiuntive allo scheduler:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  3. Vai alla scheda Configurazione dell'ambiente.

  4. Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.

  5. Nella sezione Scheduler, nel campo Memory (Memoria), specifica la nuova memoria limite. In questo tutorial vengono utilizzati 4 GB.

  6. Nel campo CPU, specifica il nuovo limite della CPU. In questo usa 2 vCPU.

  7. Salva le modifiche e attendi alcuni minuti per il riavvio degli pianificatori Airflow.

  8. Vai alla scheda Log, poi a Tutti i log > Gestore processore DAG.

  9. Esamina i log di dag-processor-manager e confronta la durata di analisi per il di esempio di DAG:

    Una voce di log per il DAG di esempio mostra che il tempo di analisi del DAG per il DAG ottimizzato è di 1,5 secondi. Per il DAG non ottimizzato, il tempo di analisi è di 28,71 secondi
    Figura 16. I log del gestore del processore DAG mostrano i tempi di analisi del DAG dopo che sono state assegnate altre risorse allo scheduler (fai clic per ingrandire)

Se assegni più risorse allo scheduler, ne aumenti la capacità e riduci notevolmente la latenza di analisi rispetto alle configurazioni predefinite dell'ambiente. Con più risorse, lo scheduler può analizzare I DAG sono più veloci, tuttavia, i costi associati a Cloud Composer aumenteranno anche le risorse. Inoltre, non è possibile aumentare risorse che superano un certo limite.

Consigliamo di allocare le risorse solo dopo il possibile codice DAG Sono state implementate le ottimizzazioni della configurazione di Airflow.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse usati in questo tutorial, elimina il progetto che contiene le risorse o mantenere il progetto ed eliminare le singole risorse.

Elimina il progetto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina singole risorse

Se intendi esplorare più tutorial e guide rapide, il riuso dei progetti ti aiuta a non superare i limiti di quota.

Elimina l'ambiente Cloud Composer. Inoltre elimina il bucket dell'ambiente durante questa procedura.

Passaggi successivi