Risolvere i problemi di pianificazione delle attività

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questo tutorial illustra la procedura di diagnosi e risoluzione dei problemi di pianificazione e analisi delle attività che causano malfunzionamento del programmatore, errori di analisi e latenza e fallimento delle attività.

Introduzione

Lo scheduler Airflow è influenzato principalmente da due fattori: la pianificazione delle attività e Analisi dei DAG. I problemi in uno di questi fattori possono avere un impatto negativo sulla l'integrità e le prestazioni dell'ambiente.

A volte vengono pianificate contemporaneamente troppe attività. In questa situazione, la coda è piena e le attività rimangono nello stato "Pianificata" o vengono riprogrammate dopo essere state messe in coda, il che potrebbe causare errori nelle attività e latenza delle prestazioni.

Un altro problema comune è l'analisi della latenza e degli errori causati dalla complessità del un codice DAG. Ad esempio, un codice DAG che contiene variabili Airflow al livello superiore del codice può causare ritardi nell'analisi, sovraccarico del database, errori di pianificazione e timeout del DAG.

In questo tutorial, eseguirai la diagnosi dei DAG di esempio e imparerai a risolvere i problemi di pianificazione e analisi, migliorare la pianificazione dei DAG e ottimizzare il codice DAG e le configurazioni dell'ambiente per migliorare le prestazioni.

Obiettivi

Questa sezione elenca gli obiettivi degli esempi in questo tutorial.

Esempio: malfunzionamento e latenza dello scheduler causati da un'elevata contemporaneità delle attività

  • Carica il DAG di esempio che viene eseguito più volte contemporaneamente e diagnostica i problemi di latenza e malfunzionamento dell'organizzatore con Cloud Monitoring.

  • Ottimizza il codice DAG consolidando le attività e valutando un impatto sulle prestazioni.

  • Distribuisci le attività in modo più uniforme nel tempo e valuta le prestazioni impatto.

  • Ottimizzare le configurazioni di Airflow e di ambiente valutarne l'impatto.

Esempio: latenza e errori di analisi dei DAG causati da codice complesso

  • Carica il DAG di esempio con le variabili Airflow e diagnostica i problemi di analisi con Cloud Monitoring.

  • Ottimizza il codice DAG evitando le variabili Airflow al livello superiore di il codice e valutare l'impatto sul tempo di analisi.

  • Ottimizzare le configurazioni di Airflow e le configurazioni di ambiente valutare l'impatto sul tempo di analisi.

Costi

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

Al termine di questo tutorial, puoi evitare la fatturazione continua eliminando il le risorse che hai creato. Per maggiori dettagli, vedi Pulizia.

Prima di iniziare

Questa sezione descrive le azioni necessarie prima di iniziare il tutorial.

Creare e configurare un progetto

Per questo tutorial è necessario un account Google Cloud progetto. Configura il progetto nel seguente modo:

  1. Nella console Google Cloud, seleziona o crea un progetto:

    Vai al selettore di progetti

  2. Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata per un progetto.

  3. Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:

    • Amministratore ambienti e oggetti Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Amministratore di Compute (roles/compute.admin)

Abilita le API per il tuo progetto

Enable the Cloud Composer API.

Enable the API

crea il tuo ambiente Cloud Composer

Crea un ambiente Cloud Composer 2.

Durante la creazione dell'ambiente, devi concedere il ruolo Estensione agente di servizio API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) all'account agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni nel progetto Google Cloud.

Esempio: malfunzionamento della pianificazione e errore delle attività a causa di problemi di pianificazione delle attività

Questo esempio mostra il malfunzionamento dello scheduler di debug e la latenza per un'elevata contemporaneità di attività.

Carica il DAG di esempio nel tuo ambiente

Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato dag_10_tasks_200_seconds_1.

Questo DAG ha 200 attività. Ogni attività attende 1 secondo e stampa "Completato!". Il DAG viene attivato automaticamente dopo il caricamento. Cloud Composer esegue questo DAG 10 volte e tutte le esecuzioni del DAG avvengono in parallelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostica i problemi di malfunzionamento dello scheduler e di errore delle attività

Al termine delle esecuzioni del DAG, apri l'interfaccia utente di Airflow e fai clic sul DAG dag_10_tasks_200_seconds_1. Vedrai che sono state eseguite 10 esecuzioni di DAG in totale e ognuna ha 200 attività riuscite.

Esamina i log delle attività Airflow:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Si apre la pagina Dettagli ambiente.

  3. Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.

Nell'istogramma dei log puoi vedere gli errori e gli avvisi indicati in rosso e arancione:

L'istogramma dei log dei worker di Airflow con errori e avvisi indicati con colori rossi e arancioni
Figura 1. Istogramma dei log del worker Airflow (fai clic per ingrandire)
di Gemini Advanced.

Il DAG di esempio ha generato circa 130 avvisi e 60 errori. Fai clic su qualsiasi che contiene le barre gialle e rosse. Nei log vedrai alcuni dei seguenti avvertimenti e errori:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Questi log potrebbero indicare che l'utilizzo delle risorse ha superato i limiti e il worker si è riavviato.

Se un'attività Airflow viene tenuta in coda troppo a lungo, lo scheduler contrassegna come non riuscito e up_for_retry e lo riprogramma di nuovo dell'esecuzione. Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda e, se i picchi in questo grafico non diminuiscono in circa 10 minuti, è probabile che si verifichino errori nelle attività (senza log).

Esamina le informazioni di monitoraggio:

  1. Vai alla scheda Monitoring e seleziona Panoramica.

  2. Esamina il grafico Attività Airflow.

    Il grafico delle attività Airflow nel tempo, che mostra un picco nel
    numero di attività in coda
    Figura 2. Grafico delle attività Airflow (fai clic per ingrandire)

    Nel grafico delle attività Airflow è presente un picco di attività in coda che dura più di 10 minuti, il che potrebbe significare che non sono disponibili risorse sufficienti nel tuo ambiente per elaborare tutte le attività pianificate.

  3. Esamina il grafico Worker attivi:

    Il grafico dei worker Airflow attivi nel tempo mostra che il
    numero di worker attivi è stato aumentato fino al limite massimo
    Figura 3. Grafico dei worker attivi (fai clic per ingrandire)
    di Gemini Advanced.

    Il grafico Worker attivi indica che il DAG ha attivato la scalabilità automatica fino al limite massimo consentito di tre worker durante l'esecuzione del DAG.

  4. I grafici di utilizzo delle risorse possono indicare la mancanza di capacità nei worker Airflow per eseguire le attività in coda. Nella scheda Monitoring, seleziona Worker e rivedi i grafici Utilizzo totale CPU da parte dei lavoratori e Utilizzo totale della memoria dei lavoratori.

    Il grafico dell'utilizzo della CPU da parte dei worker Airflow mostra un aumento dell'utilizzo della CPU fino al limite massimo
    Figura 4. Grafico dell'utilizzo della CPU dei worker totali (fai clic per ingrandire)
    Il grafico dell'utilizzo della memoria da parte dei worker Airflow mostra un aumento dell'utilizzo della memoria, ma non raggiunge il limite massimo
    Figura 5. Grafico totale dell'utilizzo della memoria dei worker (fai clic per ingrandire)
    di Gemini Advanced.

    I grafici indicano che l'esecuzione di troppe attività contemporaneamente per raggiungere il limite di CPU. Le risorse sono state utilizzate da oltre 30 di minuti, che è ancora più lungo della durata totale di 200 attività in 10 I DAG vengono eseguiti singolarmente.

Questi sono gli indicatori del riempimento della coda e della mancanza di risorse per elaborare tutte le attività pianificate.

Consolidare le attività

Il codice attuale crea molti DAG e attività senza risorse sufficienti per elaborare tutte le attività in parallelo, il che comporta il riempimento della coda. Se le attività vengono tenute in coda troppo a lungo, le attività potrebbero non essere riprogrammate o non andare a buon fine. In questi casi, ti consigliamo di optare per un numero minore di attività più consolidate.

Il seguente DAG di esempio modifica il numero di attività nell'esempio iniziale da 200 a 20 e aumenta il tempo di attesa da 1 a 10 secondi per simulare attività più consolidate che svolgono la stessa quantità di lavoro.

Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Valuta l'impatto di attività più consolidate sui processi di pianificazione:

  1. Attendi il completamento delle esecuzioni del DAG.

  2. Nell'interfaccia utente di Airflow, nella pagina DAG, fai clic sul DAG dag_10_tasks_20_seconds_10. Vedrai 10 esecuzioni di DAG, ognuna con 20 e attività completate correttamente.

  3. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  4. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  5. Vai alla scheda Log, quindi vai a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.

    Il secondo esempio con attività più consolidate ha generato circa 10 avvisi e 7 errori. Nell'istogramma, puoi confrontare il numero di errori e avvisi nell'esempio iniziale (valori precedenti) e nel secondo esempio (valori successivi).

    L'istogramma dei log dei worker di Airflow con errori e avvisi
    mostra la diminuzione degli errori e degli avvisi dopo che le attività
    consolidato
    Figura 6. Istogramma dei log dei worker di Airflow dopo il consolidamento delle attività (fai clic per ingrandire)

    Confrontando il primo esempio con il più consolidato, puoi vedrai che ci sono molti meno errori e avvisi nel secondo esempio. Tuttavia, gli stessi errori relativi all'arresto caldo continuano a comparire nei log a causa del sovraccarico delle risorse.

  6. Nella scheda Monitoraggio, seleziona Worker ed esamina i grafici.

    Se confronti il grafico Attività Airflow per il primo esempio (valori precedenti) con il grafico per il secondo esempio con attività più consolidate, puoi vedere che il picco di attività in coda è durato per un periodo di tempo più breve quando le attività erano più consolidate. Tuttavia, è durata quasi 10 minuti, il che non è ancora ottimale.

    Il grafico delle attività Airflow nel tempo mostra che il picco di
    Le attività Airflow sono durate per un periodo di tempo più breve rispetto a prima.
    Figura 7. Grafico delle attività di Airflow dopo attività consolidate (fai clic per ingrandire)
    di Gemini Advanced.

    Nel grafico Lavoratori attivi, puoi vedere che il primo esempio (sul lato sinistro del grafico) ha utilizzato le risorse per un periodo di tempo molto più lungo rispetto al secondo, anche se entrambi gli esempi simulano la stessa quantità di lavoro.

    Il grafico dei worker Airflow attivi nel tempo mostra che
    il numero di lavoratori attivi è stato aumentato per un periodo di tempo più breve
    rispetto a prima.
    Figura 8. Grafico dei lavoratori attivi dopo il consolidamento delle attività (fai clic per ingrandire)

    Esamina i grafici del consumo di risorse dei worker. Anche se la differenza tra le risorse utilizzate nell'esempio con attività più consolidate e l'esempio iniziale è abbastanza significativo, l'utilizzo della CPU è ancora fino al 70% del limite.

    Il grafico dell'utilizzo della CPU da parte dei worker di Airflow mostra l'utilizzo della CPU
    aumentando fino al 70% del limite massimo
    Figura 9. Grafico dell'utilizzo totale della CPU dei worker dopo il attività consolidate (fai clic per ingrandire)
    di Gemini Advanced.
    Il grafico dell'utilizzo della memoria da parte dei worker Airflow mostra un aumento dell'utilizzo della memoria, ma non raggiunge il limite massimo
    Figura 10. Grafico dell'utilizzo totale della memoria dei worker dopo il consolidamento delle attività (fai clic per ingrandire)

Distribuisci le attività in modo più uniforme nel tempo

Troppe attività simultanee comportano il riempimento della coda, il che porta a bloccate nella coda o riprogrammate. Nei passaggi precedenti, ha diminuito il numero di attività consolidandole; tuttavia, l'output log e il monitoraggio indicano che il numero di attività simultanee non ottimale.

Puoi controllare il numero di esecuzioni di attività simultanee implementando una pianificazione o impostando limiti per il numero di attività che possono essere eseguite contemporaneamente.

In questo tutorial, distribuisci le attività in modo più uniforme nel tempo aggiungendo parametri a livello di DAG nel DAG dag_10_tasks_20_seconds_10:

  1. Aggiungi l'argomento max_active_runs=1 al gestore di contesto del DAG. Questo argomento imposta il limite di una sola istanza dell'esecuzione di un DAG in un determinato momento.

  2. Aggiungi l'argomento max_active_tasks=5 al gestore del contesto DAG. Questo argomento controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente per ogni DAG.

Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Valuta l'impatto della distribuzione delle attività nel tempo sulla pianificazione dei processi:

  1. Attendi il completamento delle esecuzioni del DAG.

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  3. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Si apre la pagina Dettagli ambiente.

  4. Vai alla scheda Log, quindi vai a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.

  5. Nell'istogramma, puoi vedere che il terzo DAG con un numero limitato di attività ed esecuzioni attive non ha generato avvisi o errori e la distribuzione dei log sembra più uniforme rispetto ai valori precedenti.

    L'istogramma dei log dei worker di Airflow con errori e avvisi
    non mostra errori o avvisi dopo il consolidamento delle attività
    distribuiti nel tempo.
    Figura 11. Istogramma dei log dei worker di Airflow dopo che le attività sono state consolidate e distribuite nel tempo (fai clic per ingrandire)

Le attività nell'esempio dag_10_tasks_20_seconds_10_scheduled con un numero limitato di attività ed esecuzioni attive non hanno causato una pressione sulle risorse perché le attività sono state messe in coda in modo uniforme.

Dopo aver eseguito i passaggi descritti, hai ottimizzato l'utilizzo delle risorse consolidando piccole attività e distribuendole in modo più uniforme nel tempo.

Ottimizza le configurazioni di ambiente

Puoi modificare le configurazioni dell'ambiente per assicurarti che i worker Airflow abbiano sempre la capacità di eseguire le attività in coda.

Numero di worker e concorrenza dei worker

Puoi regolare il numero massimo di worker per consentire a Cloud Composer di scalare automaticamente il tuo ambiente nei limiti impostati.

Il parametro [celery]worker_concurrency definisce il numero massimo di attività un singolo worker può prendere dalla coda di attività. Modifica di questo parametro regola il numero di attività che un singolo worker può eseguire contemporaneamente. Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola. Per impostazione predefinita, la concorrenza dei worker è impostata su un valore minimo di quanto segue: 32, 12 * worker_CPU, 8 * worker_memory, il che significa che dipende dai limiti delle risorse dei worker. Consulta Ottimizza gli ambienti per ulteriori informazioni sugli ambienti predefiniti valori di contemporaneità dei worker.

Il numero di worker e la concorrenza dei worker agiscono in combinazione tra loro e le prestazioni del tuo ambiente dipendono molto da entrambi i parametri. Per scegliere la combinazione corretta, tieni conto dei seguenti aspetti:

  • Più attività rapide in esecuzione in parallelo. Puoi aumentare il quando ci sono attività in attesa e i tuoi lavoratori usano una bassa percentuale di CPU e memoria contemporaneamente. Tuttavia, in determinate circostanze la coda potrebbe non riempirsi mai, causando l'attivazione della scalabilità automatica. Se le piccole attività terminano l'esecuzione entro il momento in cui i nuovi worker sono pronti, un worker esistente può assumere le attività rimanenti e non ci saranno attività per i worker appena creati.

    In queste situazioni, è consigliabile Aumenta il numero minimo di worker e la contemporaneità dei worker per evitare una scalabilità eccessiva.

  • Più attività lunghe in esecuzione in parallelo. L'elevata concorrenza dei worker impedisce al sistema di scalare il numero di worker. Se più attività richiedono molte risorse e richiedono molto tempo per essere completate, una concorrenza elevata dei worker può comportare che la coda non venga mai riempita e che tutte le attività vengano acquisite da un solo worker, con conseguenti problemi di prestazioni. In queste situazioni, è consigliabile aumentare il numero massimo di worker e diminuire la concorrenza dei worker.

L'importanza del parallelismo

Gli scheduler di Airflow controllano la pianificazione delle esecuzioni dei DAG e delle singole attività e i DAG. L'opzione di configurazione [core]parallelism di Airflow controlla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'executor dopo che tutte le dipendenze per queste attività sono state soddisfatte.

Il parallelismo è un meccanismo di protezione di Airflow che determina quante attività possono essere eseguite contemporaneamente per ogni pianificatore, indipendentemente dal numero di worker. Il valore di parallelismo, moltiplicato per il numero di scheduler nel cluster, è il numero massimo di istanze di attività che il tuo ambiente può mettere in coda.

Di solito, [core]parallelism è impostato come prodotto di un numero massimo di worker e [celery]worker_concurrency. È influenzata anche dalle pool. Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola. Per ulteriori informazioni su come modificare le configurazioni di Airflow relative al ridimensionamento, consulta Ridimensionare la configurazione di Airflow.

Trova le configurazioni di ambiente ottimali

Il modo consigliato per risolvere i problemi di pianificazione è consolidare le attività di piccole dimensioni in ad attività più grandi e a distribuire le attività in modo più uniforme nel tempo. Oltre a per ottimizzare il codice DAG, puoi anche ottimizzare le configurazioni dell'ambiente e una capacità sufficiente per l'esecuzione simultanea di più attività.

Ad esempio, supponiamo di consolidare attività nel DAG il più possibile, ma limitando le attività attive per distribuirle in modo più uniforme non è una soluzione preferita per il tuo caso d'uso specifico.

Puoi modificare i parametri di parallelismo, numero di worker e concorrenza dei worker per eseguire il DAG dag_10_tasks_20_seconds_10 senza limitare le attività attive. In questo esempio, il DAG viene eseguito 10 volte e ogni esecuzione contiene 20 piccole attività. Se vuoi eseguirli tutti contemporaneamente:

  • Avrai bisogno di un ambiente di dimensioni maggiori, perché controlla i parametri di prestazioni dell'infrastruttura Cloud Composer gestita del tuo ambiente.

  • I worker di Airflow devono essere in grado di eseguire 20 attività contemporaneamente, il che significa che devi impostare la concorrenza dei worker su 20.

  • I worker hanno bisogno di CPU e memoria sufficienti per gestire tutte le attività. Lavoratore la contemporaneità dipende dalla CPU e dalla memoria dei worker, quindi, almeno worker_concurrency / 12 nella CPU e almeno least worker_concurrency / 8 in memoria.

  • Dovrai aumentare il parallelismo per far corrispondere la contemporaneità dei worker più alta. Per consentire ai worker di raccogliere 20 attività dalla coda, lo scheduler dover pianificare prima queste 20 attività.

Regola le configurazioni di ambiente nel seguente modo:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.

  3. Vai alla scheda Configurazione dell'ambiente.

  4. Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.

  5. Nella sezione Worker, specifica la nuova memoria nel campo Memoria per i worker di Airflow. In questo tutorial vengono utilizzati 4 GB.

  6. Nel campo CPU, specifica il nuovo limite di CPU per i worker di Airflow. In questo usa 2 vCPU.

  7. Salva le modifiche e attendi diversi minuti per i worker di Airflow riavvia.

A questo punto, sostituisci le opzioni di configurazione di Airflow per il parallelismo e la concorrenza dei worker:

  1. Vai alla scheda Override della configurazione Airflow.

  2. Fai clic su Modifica, quindi su Aggiungi override della configurazione di Airflow.

  3. Sostituisci la configurazione del parallelismo:

    Sezione Chiave Valore
    core parallelism 20
  4. Fai clic su Aggiungi override della configurazione Airflow ed esegui l'override della configurazione della concorrenza dei worker:

    Sezione Chiave Valore
    celery worker_concurrency 20
  5. Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.

Attiva di nuovo lo stesso DAG di esempio con le configurazioni adeguate:

  1. Nella UI di Airflow, vai alla pagina DAG.

  2. Individua il DAG dag_10_tasks_20_seconds_10 ed eliminalo.

    Dopo l'eliminazione del DAG, Airflow controlla la cartella dei DAG in del bucket dell'ambiente ed esegue automaticamente di nuovo il DAG.

Al termine delle esecuzioni del DAG, esamina di nuovo l'istogramma dei log. Nel diagramma, puoi vedere che l'esempio dag_10_tasks_20_seconds_10 con più attività consolidate non ha generato errori e avvisi durante l'esecuzione con la configurazione dell'ambiente modificata. Confronta i risultati con i dati precedenti nel diagramma, dove lo stesso esempio ha generato errori e avvisi quando in esecuzione con la configurazione predefinita dell'ambiente tge.

L'istogramma dei log dei worker di Airflow con errori e avvisi
        non mostra errori e avvisi dopo la modifica della configurazione dell'ambiente
Figura 12. Istogramma dei log del worker Airflow dopo aver regolato la configurazione dell'ambiente (fai clic per ingrandire)
di Gemini Advanced.

Le configurazioni dell'ambiente e le configurazioni di Airflow svolgono un ruolo cruciale pianificazione delle attività, tuttavia, non è possibile aumentare le configurazioni al di là di determinati limiti.

Consigliamo di ottimizzare il codice DAG, consolidare le attività e utilizzare la pianificazione per ottimizzare prestazioni ed efficienza.

Esempio: latenza e errori di analisi dei DAG a causa di codice DAG complesso

In questo esempio, analizzerai la latenza di analisi di un DAG di esempio che imita un eccesso di variabili Airflow.

Crea una nuova variabile Airflow

Prima di caricare il codice campione, crea una nuova variabile Airflow.

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nella colonna Server web Airflow, segui il link Airflow per il tuo completamente gestito di Google Cloud.

  3. Vai ad Amministrazione > Variabili > Aggiungi un nuovo record.

  4. Imposta i seguenti valori:

    • chiave: example_var
    • valore: test_airflow_variable

Carica il DAG di esempio nel tuo ambiente

Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato dag_for_loop_airflow_variable.

Questo DAG contiene un ciclo for che viene eseguito 1000 volte e imita un eccesso di Variabili Airflow. Ogni iterazione legge la variabile example_var e genera un'attività. Ogni attività contiene un comando che stampa valore.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostica i problemi di analisi

Il tempo di analisi del DAG è il tempo necessario allo scheduler di Airflow per leggere e analizzare un file DAG. Prima che lo scheduler Airflow possa pianificare qualsiasi attività da un DAG, lo scheduler deve analizzare il file DAG per scoprire la struttura del DAG e le attività definite.

Se l'analisi di un DAG richiede molto tempo, questo consuma la capacità dello scheduler ridurre le prestazioni delle esecuzioni dei DAG.

Per monitorare il tempo di analisi del DAG:

  1. Esegui il comando interfaccia a riga di comando di Airflow dags report in gcloud CLI per vedere il tempo di analisi per tutti i tuoi DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Sostituisci quanto segue:

    • ENVIRONMENT_NAME: il nome dell'ambiente.
    • LOCATION: la regione in cui si trova l'ambiente.
  2. Nell'output del comando, cerca il valore della durata per il DAGdag_for_loop_airflow_variables. Un valore elevato può indicare questo DAG non è implementato in modo ottimale. Se hai più DAG, dalla tabella di output puoi identificare i DAG che hanno un tempo di analisi lungo.

    Esempio:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Controlla i tempi di analisi del DAG nella console Google Cloud:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  4. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  5. Vai alla scheda Log, quindi vai a Tutti i log > Gestore processore DAG.

  6. Esamina i log di dag-processor-manager e identifica i possibili problemi.

    Una voce di log per il DAG di esempio mostra che il tempo di analisi del DAG è di 46,3 secondi
    Figura 13. I log del gestore processore DAG mostrano il DAG analizzare i tempi (fai clic per ingrandire)
    di Gemini Advanced.

Se il tempo totale di analisi dei DAG supera circa 10 secondi, gli scheduler potrebbero essere è sovraccarico con l'analisi dei DAG e non può eseguire i DAG in modo efficace.

Ottimizza il codice DAG

È consigliato di evitare codice Python "di primo livello" non necessario nei DAG. I DAG con molte importazioni, variabili e funzioni al di fuori del DAG introducono tempi di analisi più elevati per lo scheduler Airflow. Ciò riduce le prestazioni e la scalabilità di Cloud Composer e Airflow. Un eccesso di lettura delle variabili Airflow porta a tempi di analisi lunghi e a un carico elevato del database. Se questo codice si trova in un DAG queste funzioni vengono eseguite su ogni heartbeat dello scheduler, il che potrebbe essere lento.

I campi del modello di Airflow ti consentono di incorporare i valori di Airflow e modelli Jinja nei tuoi DAG. In questo modo viene evitata l'esecuzione non necessaria delle funzioni durante i heartbeat dello scheduler.

Per implementare l'esempio di DAG in modo migliore, evita di usare le variabili Airflow in il codice Python di primo livello dei DAG. Passa invece le variabili Airflow agli operatori esistenti tramite un modello Jinja, che ritarderà la lettura del valore fino all'esecuzione dell'attività.

Carica la nuova versione del DAG di esempio nel tuo ambiente. In questo tutorial, questo DAG è denominato dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Controlla il nuovo tempo di analisi del DAG:

  1. Attendi il completamento dell'esecuzione del DAG.

  2. Esegui di nuovo il comando dags report per visualizzare di analisi per tutti i DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Esamina di nuovo dag-processor-manager log e per analizzare la durata dell'analisi.

    Una voce di log per il DAG di esempio mostra che il tempo di analisi del DAG è 4,21
    secondi
    Figura 14. I log del gestore del processore DAG mostrano i tempi di analisi del DAG dopo l'ottimizzazione del codice DAG (fai clic per ingrandire)

Sostituendo le variabili di ambiente con i modelli Airflow, hai semplificato il codice DAG e ha ridotto la latenza di analisi di circa dieci volte.

Ottimizza le configurazioni dell'ambiente Airflow

Lo scheduler Airflow cerca costantemente di attivare nuove attività e analizza tutti i DAG nel tuo bucket di ambiente. Se i DAG hanno un tempo di analisi lungo e lo scheduler consuma molte risorse, puoi ottimizzare le configurazioni dello scheduler di Airflow in modo che utilizzi le risorse in modo più efficiente.

In questo tutorial, l'analisi dei file DAG richiede molto tempo e i cicli di analisi iniziano a sovrapporsi, il che esaurisce la capacità dello scheduler. Nel nostro esempio, l'analisi del primo DAG richiede più di 5 secondi, quindi configurerai lo scheduler in modo che venga eseguito meno di frequente per utilizzare le risorse in modo più efficiente. Tu sovrascriverà scheduler_heartbeat_sec Opzione di configurazione Airflow. Questa configurazione definisce la frequenza scheduler dovrebbe essere eseguito (in secondi). Per impostazione predefinita, il valore è impostato su 5 secondi. Puoi modificare questa opzione di configurazione di Airflow tramite scattando l'override.

Sostituisci l'opzione di configurazione di Airflow scheduler_heartbeat_sec:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.

  3. Vai alla scheda Override della configurazione di Airflow.

  4. Fai clic su Modifica e poi su Aggiungi override della configurazione di Airflow.

  5. Esegui l'override dell'opzione di configurazione Airflow:

    Sezione Chiave Valore
    scheduler scheduler_heartbeat_sec 10
  6. Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.

Controlla le metriche dello scheduler:

  1. Vai alla scheda Monitoraggio e seleziona Pianificatori.

  2. Nel grafico Heartbeat dell'agente di pianificazione, fai clic sul pulsante Altre opzioni (tre puntini) e poi su Visualizza in Metrics Explorer.

di Gemini Advanced.
Il grafico dell'heartbeat dello scheduler mostra che l'heartbeat si verifica meno di frequente
Figura 15. Grafico dell'heartbeat dello scheduler (fai clic per ingrandire)

Nel grafico vedrai che la pianificazione viene eseguita con la metà della frequenza dopo aver cambiato la configurazione predefinita da 5 secondi a 10 secondi. Riduce il valore frequenza di heartbeat, ti assicuri che lo scheduler non avvii in esecuzione mentre il precedente ciclo di analisi è in corso e lo scheduler la capacità della risorsa non è esaurita.

Assegna più risorse all'organizzatore

In Cloud Composer 2, puoi allocare più risorse di CPU e memoria scheduler. In questo modo, puoi aumentare il rendimento del programmatore e accelerare il tempo di analisi del DAG.

Assegna CPU e memoria aggiuntive allo scheduler:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  3. Vai alla scheda Configurazione dell'ambiente.

  4. Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.

  5. Nella sezione Pianificatore, nel campo Memoria, specifica il nuovo limite di memoria. In questo tutorial, utilizza 4 GB.

  6. Nel campo CPU, specifica il nuovo limite di CPU. In questo usa 2 vCPU.

  7. Salva le modifiche e attendi diversi minuti per gli scheduler di Airflow riavvia.

  8. Vai alla scheda Log, quindi vai a Tutti i log > Gestore processore DAG.

  9. Esamina i log di dag-processor-manager e confronta la durata di analisi per il di esempio di DAG:

    Una voce del log per il DAG di esempio mostra che il tempo di analisi del DAG per il DAG ottimizzato è di 1,5 secondi. Per il DAG non ottimizzato, il tempo di analisi è di 28,71 secondi
    Figura 16. I log del gestore processore DAG mostrano il DAG tempi di analisi dopo l'assegnazione di più risorse allo scheduler (fai clic per ingrandire)
    di Gemini Advanced.

Assegnando più risorse allo scheduler, hai aumentato e ha ridotto la latenza di analisi significativamente rispetto ai valori predefiniti configurazioni dell'ambiente di lavoro. Con più risorse, lo scheduler può analizzare i DAG più velocemente, ma aumenteranno anche i costi associati alle risorse Cloud Composer. Inoltre, non è possibile aumentare risorse che superano un certo limite.

Ti consigliamo di allocare le risorse solo dopo aver implementato le possibili ottimizzazioni del codice DAG e della configurazione di Airflow.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse usati in questo tutorial, elimina il progetto che contiene le risorse o mantenere il progetto ed eliminare le singole risorse.

Elimina il progetto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina singole risorse

Se prevedi di esplorare più tutorial e guide rapide, puoi riutilizzare i progetti può aiutarti a evitare di superare i limiti di quota.

Eliminare l'ambiente Cloud Composer. Inoltre, durante questa procedura viene eliminato il bucket dell'ambiente.

Passaggi successivi