Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questo tutorial illustra la procedura di diagnosi e risoluzione dei problemi di pianificazione e analisi delle attività che causano malfunzionamento dell'organizzatore, errori di analisi e latenza e fallimento delle attività.
Introduzione
Lo scheduler Airflow è influenzato principalmente da due fattori: la pianificazione delle attività e l'analisi dei DAG. I problemi relativi a uno di questi fattori possono avere un impatto negativo sulla salute e sul rendimento dell'ambiente.
A volte vengono pianificate contemporaneamente troppe attività. In questa situazione, la coda è piena e le attività rimangono nello stato "Pianificata" o vengono riprogrammate dopo essere state messe in coda, il che potrebbe causare errori nelle attività e latenza delle prestazioni.
Un altro problema comune è la latenza di analisi e gli errori causati dalla complessità del codice DAG. Ad esempio, un codice DAG che contiene variabili Airflow al livello superiore del codice può causare ritardi nell'analisi, sovraccarico del database, errori di pianificazione e timeout del DAG.
In questo tutorial, diagnosticherai i DAG di esempio e imparerai a risolvere i problemi di pianificazione e analisi, a migliorare la pianificazione dei DAG e a ottimizzare il codice e le configurazioni dell'ambiente dei DAG per migliorare le prestazioni.
Obiettivi
Questa sezione elenca gli obiettivi degli esempi in questo tutorial.
Esempio: malfunzionamento e latenza dell'organizzatore causati da una concorrenza elevata delle attività
Carica il DAG di esempio che viene eseguito più volte contemporaneamente e diagnostica i problemi di latenza e malfunzionamento del programmatore con Cloud Monitoring.
Ottimizza il codice DAG consolidando le attività e valuta l'impatto sul rendimento.
Distribuisci le attività in modo più uniforme nel tempo e valuta l'impatto sul rendimento.
Ottimizza le configurazioni di Airflow e dell'ambiente e valuta l'impatto.
Esempio: errori di analisi del DAG e latenza causati da codice complesso
Carica il DAG di esempio con le variabili Airflow e diagnostica i problemi di analisi con Cloud Monitoring.
Ottimizza il codice DAG evitando le variabili Airflow al primo livello del codice e valuta l'impatto sul tempo di analisi.
Ottimizza le configurazioni di Airflow e dell'ambiente e valuta l'impatto sul tempo di analisi.
Costi
Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:
Al termine di questo tutorial, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per maggiori dettagli, consulta la sezione Pulizia.
Prima di iniziare
Questa sezione descrive le azioni necessarie prima di iniziare il tutorial.
Creare e configurare un progetto
Per questo tutorial, hai bisogno di un Google Cloud progetto. Configura il progetto nel seguente modo:
Nella console Google Cloud, seleziona o crea un progetto:
Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è attivata per un progetto.
Assicurati che l' Google Cloud utente del progetto disponga dei seguenti ruoli per creare le risorse necessarie:
- Amministratore ambienti e oggetti Storage
(
roles/composer.environmentAndStorageObjectAdmin
) - Amministratore di Compute (
roles/compute.admin
)
- Amministratore ambienti e oggetti Storage
(
Abilita le API per il tuo progetto
Enable the Cloud Composer API.
Crea l'ambiente Cloud Composer
Crea un ambiente Cloud Composer 2.
Durante la creazione dell'ambiente,
devi concedere il ruolo Estensione agente di servizio API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) all'account agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni nel tuo Google Cloud progetto.
Esempio: malfunzionamento dell'organizzatore e errore di esecuzione delle attività a causa di problemi di pianificazione delle attività
Questo esempio mostra il debug del malfunzionamento e della latenza dell'organizzatore causati da una concorrenza elevata delle attività.
Carica il DAG di esempio nel tuo ambiente
Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato
dag_10_tasks_200_seconds_1
.
Questo DAG contiene 200 attività. Ogni attività attende 1 secondo e stampa "Completato!". Il DAG viene attivato automaticamente dopo il caricamento. Cloud Composer esegue questo DAG 10 volte e tutte le esecuzioni del DAG avvengono in parallelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnostica i problemi di malfunzionamento dello scheduler e di errore delle attività
Al termine dell'esecuzione del DAG, apri l'interfaccia utente di Airflow e fai clic sul DAGdag_10_tasks_200_seconds_1
. Vedrai che sono state eseguite 10 esecuzioni di DAG in totale e ognuna ha 200 attività riuscite.
Esamina i log delle attività Airflow:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.
Nell'istogramma dei log, puoi vedere gli errori e gli avvisi indicati con i colori rosso e arancione:
![L'istogramma dei log dei worker di Airflow con errori e avvisi indicati con colori rossi e arancioni](https://cloud.google.com/static/composer/docs/images/composer-scheduling-logs-histogram.png?hl=it)
Il DAG di esempio ha generato circa 130 avvisi e 60 errori. Fai clic su una colonna contenente barre gialle e rosse. Nei log vedrai alcuni dei seguenti avvertimenti e errori:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Questi log potrebbero indicare che l'utilizzo delle risorse ha superato i limiti e che il worker si è riavviato.
Se un'attività Airflow viene mantenuta in coda per troppo tempo, lo scheduler la contrassegna come non riuscita e up_for_retry e la riprogramma di nuovo per l'esecuzione. Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda e, se i picchi in questo grafico non diminuiscono in circa 10 minuti, è probabile che si verifichino errori nelle attività (senza log).
Esamina le informazioni di monitoraggio:
Vai alla scheda Monitoraggio e seleziona Panoramica.
Esamina il grafico Attività Airflow.
Figura 2. Grafico delle attività Airflow (fai clic per ingrandire) Nel grafico delle attività Airflow è presente un picco di attività in coda che dura più di 10 minuti, il che potrebbe significare che non sono disponibili risorse sufficienti nel tuo ambiente per elaborare tutte le attività pianificate.
Esamina il grafico Worker attivi:
Figura 3. Grafico dei worker attivi (fai clic per ingrandire) Il grafico Worker attivi indica che il DAG ha attivato la scalabilità automatica fino al limite massimo consentito di tre worker durante l'esecuzione del DAG.
I grafici di utilizzo delle risorse possono indicare la mancanza di capacità nei worker di Airflow per eseguire le attività in coda. Nella scheda Monitoraggio, seleziona Worker e esamina i grafici Utilizzo totale CPU worker e Utilizzo totale memoria worker.
Figura 4. Grafico dell'utilizzo della CPU dei worker totali (fai clic per ingrandire) Figura 5. Grafico dell'utilizzo totale della memoria dei worker (fai clic per ingrandire) I grafici indicano che l'esecuzione di troppe attività contemporaneamente ha comportato il raggiungimento del limite della CPU. Le risorse erano state utilizzate per oltre 30 minuti, ovvero più a lungo della durata totale di 200 attività in 10 esecuzioni DAG eseguite una alla volta.
Questi sono gli indicatori della coda che si sta riempiendo e della mancanza di risorse per elaborare tutte le attività pianificate.
Consolidare le attività
Il codice attuale crea molti DAG e attività senza risorse sufficienti per elaborare tutte le attività in parallelo, il che comporta il riempimento della coda. Se le attività rimangono in coda per troppo tempo, la loro riprogrammazione potrebbe non andare a buon fine. In questi casi, ti consigliamo di optare per un numero minore di attività più consolidate.
Il seguente DAG di esempio modifica il numero di attività nell'esempio iniziale da 200 a 20 e aumenta il tempo di attesa da 1 a 10 secondi per simulare attività più consolidate che svolgono la stessa quantità di lavoro.
Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato
dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Valuta l'impatto di attività più consolidate sui processi di pianificazione:
Attendi il completamento delle esecuzioni del DAG.
Nell'interfaccia utente di Airflow, nella pagina DAG, fai clic sul DAG
dag_10_tasks_20_seconds_10
. Vedrai 10 esecuzioni di DAG, ciascuna con 20 attività andate a buon fine.Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.
Il secondo esempio con attività più consolidate ha generato circa 10 avvisi e 7 errori. Nell'istogramma, puoi confrontare il numero di errori e avvisi nell'esempio iniziale (valori precedenti) e nel secondo esempio (valori successivi).
Figura 6. Istogramma dei log dei worker di Airflow dopo il consolidamento delle attività (fai clic per ingrandire) Se confronti il primo esempio con quello più consolidato, puoi vedere che il secondo contiene molto meno errori e avvisi. Tuttavia, gli stessi errori relativi all'arresto caldo continuano a comparire nei log a causa del sovraccarico delle risorse.
Nella scheda Monitoraggio, seleziona Worker ed esamina i grafici.
Se confronti il grafico Attività Airflow per il primo esempio (valori precedenti) con il grafico per il secondo esempio con attività più consolidate, puoi vedere che il picco di attività in coda è durato per un periodo di tempo più breve quando le attività erano più consolidate. Tuttavia, è durata quasi 10 minuti, il che non è ancora ottimale.
Figura 7. Grafico delle attività Airflow dopo il consolidamento delle attività (fai clic per ingrandire) Nel grafico Lavoratori attivi, puoi vedere che il primo esempio (sul lato sinistro del grafico) ha utilizzato le risorse per un periodo di tempo molto più lungo rispetto al secondo, anche se entrambi gli esempi simulano la stessa quantità di lavoro.
Figura 8. Grafico dei lavoratori attivi dopo il consolidamento delle attività (fai clic per ingrandire) Esamina i grafici sul consumo delle risorse dei worker. Anche se la differenza tra le risorse utilizzate nell'esempio con attività più consolidate e l'esempio iniziale è piuttosto significativa, l'utilizzo della CPU continua a raggiungere il 70% del limite.
Figura 9. Grafico dell'utilizzo totale della CPU dei worker dopo il consolidamento delle attività (fai clic per ingrandire) Figura 10. Grafico dell'utilizzo totale della memoria dei worker dopo il consolidamento delle attività (fai clic per ingrandire)
Distribuisci le attività in modo più uniforme nel tempo
Troppe attività in parallelo fanno sì che la coda si riempia, il che porta a un blocco delle attività in coda o alla loro riprogrammazione. Nei passaggi precedenti hai ridotto il numero di attività consolidandole, tuttavia, i log di output e il monitoraggio hanno indicato che il numero di attività simultanee è ancora subottimale.
Puoi controllare il numero di esecuzioni di attività simultanee implementando una pianificazione o impostando limiti per il numero di attività che possono essere eseguite contemporaneamente.
In questo tutorial, distribuisci le attività in modo più uniforme nel tempo aggiungendo parametri a livello di DAG nel DAG dag_10_tasks_20_seconds_10
:
Aggiungi l'argomento
max_active_runs=1
al gestore del contesto DAG. Questo argomento imposta un limite di una sola istanza di esecuzione di un DAG in un determinato momento.Aggiungi l'argomento
max_active_tasks=5
al gestore del contesto DAG. Questo argomento controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ogni DAG.
Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato
dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Valuta l'impatto della distribuzione delle attività nel tempo sulle procedure di pianificazione:
Attendi il completamento delle esecuzioni del DAG.
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.
Nell'istogramma, puoi vedere che il terzo DAG con un numero limitato di attività ed esecuzioni attive non ha generato avvisi o errori e la distribuzione dei log sembra più uniforme rispetto ai valori precedenti.
Figura 11. Istogramma dei log dei worker di Airflow dopo che le attività sono state consolidate e distribuite nel tempo (fai clic per ingrandire)
Le attività nell'esempio dag_10_tasks_20_seconds_10_scheduled
con un
numero limitato di attività ed esecuzioni attive non hanno causato una pressione sulle risorse perché
le attività sono state messe in coda in modo uniforme.
Dopo aver eseguito i passaggi descritti, hai ottimizzato l'utilizzo delle risorse consolidando piccole attività e distribuendole in modo più uniforme nel tempo.
Ottimizza le configurazioni dell'ambiente
Puoi modificare le configurazioni dell'ambiente per assicurarti che i worker Airflow abbiano sempre la capacità di eseguire le attività in coda.
Numero di worker e concorrenza dei worker
Puoi regolare il numero massimo di worker per consentire a Cloud Composer di scalare automaticamente il tuo ambiente nei limiti impostati.
Il parametro [celery]worker_concurrency
definisce il numero massimo di attività che un singolo worker può recuperare dalla coda di attività. La modifica di questo parametro consente di regolare il numero di attività che un singolo worker può eseguire contemporaneamente.
Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola. Per impostazione predefinita, la concorrenza dei worker è impostata su un valore minimo di quanto segue: 32, 12 * worker_CPU, 8 * worker_memory
, il che significa che dipende dai limiti delle risorse dei worker. Per ulteriori informazioni sui valori predefiniti della concorrenza dei worker, consulta Ottimizzare gli ambienti.
Il numero di worker e la concorrenza dei worker agiscono in combinazione tra loro e le prestazioni del tuo ambiente dipendono molto da entrambi i parametri. Per scegliere la combinazione corretta, puoi utilizzare le seguenti considerazioni:
Più attività rapide in esecuzione in parallelo. Puoi aumentare la concorrenza dei worker quando ci sono attività in attesa nella coda e i worker utilizzano contemporaneamente una bassa percentuale di CPU e memoria. Tuttavia, in determinate circostanze la coda potrebbe non riempirsi mai, causando l'attivazione della scalabilità automatica. Se le piccole attività terminano l'esecuzione entro il momento in cui i nuovi worker sono pronti, un worker esistente può assumere le attività rimanenti e non ci saranno attività per i worker appena creati.
In queste situazioni, è consigliabile aumentare il numero minimo di worker e la concorrenza dei worker per evitare un ridimensionamento eccessivo.
Più attività lunghe in esecuzione in parallelo. L'elevata concorrenza dei worker impedisce al sistema di scalare il numero di worker. Se più attività richiedono molte risorse e richiedono molto tempo per essere completate, una concorrenza elevata dei worker può comportare che la coda non venga mai riempita e che tutte le attività vengano acquisite da un solo worker, con conseguenti problemi di prestazioni. In queste situazioni, è consigliabile aumentare il numero massimo di worker e diminuire la concorrenza dei worker.
L'importanza del parallelismo
Gli scheduler di Airflow controllano la pianificazione delle esecuzioni dei DAG e delle singole attività dei DAG. L'opzione di configurazione [core]parallelism
di Airflow controlla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'executor dopo che tutte le dipendenze per queste attività sono state soddisfatte.
Il parallelismo è un meccanismo di protezione di Airflow che determina quante attività possono essere eseguite contemporaneamente per ogni pianificatore, indipendentemente dal numero di worker. Il valore del parallelismo, moltiplicato per il numero di pianificatori nel cluster, è il numero massimo di istanze di attività che l'ambiente può mettere in coda.
In genere, [core]parallelism
viene impostato come prodotto di un numero massimo di worker
e [celery]worker_concurrency
. È influenzato anche dal
pool.
Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola. Per ulteriori informazioni su come modificare le configurazioni di Airflow relative al ridimensionamento, consulta Ridimensionare la configurazione di Airflow.
Trovare configurazioni dell'ambiente ottimali
Il modo consigliato per risolvere i problemi di pianificazione è raggruppare le piccole attività in attività più grandi e distribuirle in modo più uniforme nel tempo. Oltre a ottimizzare il codice DAG, puoi anche ottimizzare le configurazioni dell'ambiente per avere una capacità sufficiente per l'esecuzione di più attività contemporaneamente.
Ad esempio, supponiamo che tu consolidi le attività nel DAG il più possibile, ma che limitare le attività attive per distribuirle in modo più uniforme nel tempo non sia una soluzione preferita per il tuo caso d'uso specifico.
Puoi modificare i parametri di parallelismo, numero di worker e concorrenza dei worker per eseguire il DAG dag_10_tasks_20_seconds_10
senza limitare le attività attive. In questo esempio, il DAG viene eseguito 10 volte e ogni esecuzione contiene 20 piccole attività.
Se vuoi eseguirli tutti contemporaneamente:
Avrai bisogno di un ambiente di dimensioni maggiori, perché controlla i parametri di prestazioni dell'infrastruttura Cloud Composer gestita del tuo ambiente.
I worker di Airflow devono essere in grado di eseguire 20 attività contemporaneamente, il che significa che devi impostare la concorrenza dei worker su 20.
I worker devono disporre di CPU e memoria sufficienti per gestire tutte le attività. La concorrenza dei worker è influenzata dalla CPU e dalla memoria dei worker, pertanto, avrai bisogno di almeno
worker_concurrency / 12
in CPU eleast worker_concurrency / 8
in memoria.Dovrai aumentare il parallelismo per adattarlo alla maggiore concorrenza dei worker. Affinché i worker possano recuperare 20 attività dalla coda, il programmatore dovrà prima pianificarle.
Modifica le configurazioni dell'ambiente nel seguente modo:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Configurazione dell'ambiente.
Individua la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.
Nella sezione Worker, nel campo Memoria, specifica il nuovo limite di memoria per i worker Airflow. In questo tutorial, utilizza 4 GB.
Nel campo CPU, specifica il nuovo limite di CPU per i worker Airflow. In questo tutorial, utilizza 2 vCPU.
Salva le modifiche e attendi alcuni minuti per il riavvio dei worker Airflow.
A questo punto, sostituisci le opzioni di configurazione di Airflow relative al parallelismo e alla concorrenza dei worker:
Vai alla scheda Override della configurazione Airflow.
Fai clic su Modifica e poi su Aggiungi override della configurazione di Airflow.
Sostituisci la configurazione del parallelismo:
Sezione Chiave Valore core
parallelism
20
Fai clic su Aggiungi override della configurazione Airflow ed esegui l'override della configurazione della concorrenza dei worker:
Sezione Chiave Valore celery
worker_concurrency
20
Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.
Attiva di nuovo lo stesso DAG di esempio con le configurazioni adeguate:
Nell'interfaccia utente di Airflow, vai alla pagina DAG.
Individua il DAG
dag_10_tasks_20_seconds_10
ed eliminalo.Dopo l'eliminazione del DAG, Airflow controlla la cartella dei DAG nel bucket dell'ambiente ed esegue di nuovo automaticamente il DAG.
Al termine delle esecuzioni del DAG, esamina di nuovo l'istogramma dei log. Nel diagramma,
puoi vedere che l'esempio dag_10_tasks_20_seconds_10
con più attività consolidate non ha generato errori e avvisi durante l'esecuzione con la configurazione dell'ambiente modificata. Confronta i risultati con i dati precedenti nel diagramma, in cui lo stesso esempio generava errori e avvisi quando veniva eseguito con la configurazione dell'ambiente predefinita.
![L'istogramma dei log dei worker di Airflow con errori e avvisi
non mostra errori e avvisi dopo la modifica della configurazione dell'ambiente.](https://cloud.google.com/static/composer/docs/images/composer-scheduling-logs-histogram-configurations.png?hl=it)
Le configurazioni dell'ambiente e di Airflow svolgono un ruolo fondamentale nella programmazione delle attività, tuttavia non è possibile aumentare le configurazioni oltre determinati limiti.
Ti consigliamo di ottimizzare il codice DAG, consolidare le attività e utilizzare la pianificazione per ottimizzare le prestazioni e l'efficienza.
Esempio: errori di analisi e latenza del DAG a causa di un codice DAG complesso
In questo esempio, esamini la latenza di analisi di un DAG di esempio che imita un eccesso di variabili Airflow.
Creare una nuova variabile Airflow
Prima di caricare il codice di esempio, crea una nuova variabile Airflow.
Nella console Google Cloud, vai alla pagina Ambienti.
Nella colonna Server web Airflow, segui il link Airflow per il tuo ambiente.
Vai ad Amministrazione > Voci > Aggiungi un nuovo record.
Imposta i seguenti valori:
- key:
example_var
- val:
test_airflow_variable
- key:
Carica il DAG di esempio nel tuo ambiente
Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato
dag_for_loop_airflow_variable
.
Questo DAG contiene un ciclo for che viene eseguito 1000 volte e imita un eccesso di variabili Airflow. Ogni iterazione legge la variabile example_var
e genera un'attività. Ogni attività contiene un comando che stampa il valore della variabile.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostica i problemi di analisi
Il tempo di analisi del DAG è il tempo necessario allo scheduler di Airflow per leggere e analizzare un file DAG. Prima che lo scheduler di Airflow possa pianificare qualsiasi attività da un DAG, lo scheduler deve analizzare il file DAG per scoprire la struttura del DAG e le attività definite.
Se l'analisi di un DAG richiede molto tempo, viene consumata la capacità dello scheduler e potrebbe essere ridotto il rendimento delle esecuzioni del DAG.
Per monitorare il tempo di analisi del DAG:
Esegui il comando dell'interfaccia a riga di comando Airflow
dags report
in gcloud CLI per visualizzare il tempo di analisi per tutti i DAG:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Sostituisci quanto segue:
ENVIRONMENT_NAME
: il nome del tuo ambiente.LOCATION
: la regione in cui si trova l'ambiente.
Nell'output del comando, cerca il valore della durata per il DAG
dag_for_loop_airflow_variables
. Un valore elevato potrebbe indicare che questo DAG non è implementato in modo ottimale. Se hai più DAG, dalla tabella di output puoi identificare i DAG che hanno un tempo di analisi lungo.Esempio:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Controlla i tempi di analisi del DAG nella console Google Cloud:
- Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log, poi a Tutti i log > Gestore processore DAG.
Esamina i log
dag-processor-manager
e identifica i possibili problemi.Figura 13. I log del gestore del processore DAG mostrano i tempi di analisi del DAG (fai clic per ingrandire)
Se il tempo di analisi totale del DAG supera circa 10 secondi, gli pianificatori potrebbero essere sovraccaricati dall'analisi del DAG e non essere in grado di eseguire i DAG in modo efficace.
Ottimizza il codice DAG
È consigliato di evitare codice Python "di primo livello" non necessario nei DAG. I DAG con molti import, variabili e funzioni al di fuori del DAG introducono tempi di analisi più elevati per lo scheduler Airflow. Ciò riduce le prestazioni e la scalabilità di Cloud Composer e Airflow. Un eccesso di lettura delle variabili Airflow porta a tempi di analisi lunghi e a un carico elevato del database. Se questo codice si trova in un file DAG, queste funzioni vengono eseguite a ogni heartbeat dello scheduler, il che potrebbe essere lento.
I campi modello di Airflow ti consentono di incorporare i valori delle variabili Airflow e dei modelli Jinja nei tuoi DAG. In questo modo viene evitata l'esecuzione non necessaria delle funzioni durante i heartbeat dello scheduler.
Per implementare l'esempio di DAG in modo migliore, evita di utilizzare le variabili Airflow nel codice Python di primo livello dei DAG. Passa invece le variabili Airflow agli operatori esistenti tramite un modello Jinja, che ritarderà la lettura del valore fino all'esecuzione dell'attività.
Carica la nuova versione del DAG di esempio nel tuo ambiente. In questo tutorial, questo DAG è denominato
dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Controlla il nuovo tempo di analisi del DAG:
Attendi il completamento dell'esecuzione del DAG.
Esegui di nuovo il comando
dags report
per visualizzare il tempo di analisi per tutti i DAG:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Rivedi i log
dag-processor-manager
e analizza la durata dell'analisi.Figura 14. I log del gestore del processore DAG mostrano i tempi di analisi del DAG dopo l'ottimizzazione del codice DAG (fai clic per ingrandire)
Sostituendo le variabili di ambiente con i modelli Airflow, hai semplificato il codice DAG e ridotto la latenza di analisi circa dieci volte.
Ottimizza le configurazioni dell'ambiente Airflow
Lo scheduler di Airflow tenta costantemente di attivare nuove attività e analizza tutti i DAG nel bucket dell'ambiente. Se i tuoi DAG hanno un tempo di analisi lungo e lo scheduler consuma molte risorse, puoi ottimizzare le configurazioni dello scheduler di Airflow in modo che utilizzi le risorse in modo più efficiente.
In questo tutorial, l'analisi dei file DAG richiede molto tempo e i cicli di analisi
iniziano a sovrapporsi, il che esaurisce la capacità dello scheduler. Nel nostro esempio,
l'analisi del primo DAG richiede più di 5 secondi, quindi configurerai
lo scheduler in modo che venga eseguito meno di frequente per utilizzare le risorse in modo più efficiente. Eseguirai l'override dell'opzione di configurazione di Airflow scheduler_heartbeat_sec
. Questa configurazione definisce la frequenza con cui deve essere eseguito il programmatore (in secondi). Per impostazione predefinita, il valore è impostato su 5 secondi.
Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola.
Sostituisci l'opzione di configurazione di Airflow scheduler_heartbeat_sec
:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Override della configurazione Airflow.
Fai clic su Modifica e poi su Aggiungi override della configurazione di Airflow.
Esegui l'override dell'opzione di configurazione di Airflow:
Sezione Chiave Valore scheduler
scheduler_heartbeat_sec
10
Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.
Controlla le metriche dello scheduler:
Vai alla scheda Monitoraggio e seleziona Pianificatori.
Nel grafico Heartbeat dell'agente di pianificazione, fai clic sul pulsante Altre opzioni (tre puntini) e poi su Visualizza in Metrics Explorer.
![Il grafico dell'heartbeat dello scheduler mostra che l'heartbeat si verifica meno di frequente](https://cloud.google.com/static/composer/docs/images/composer-scheduling-heartbeat.png?hl=it)
Nel grafico vedrai che la pianificazione viene eseguita con la metà della frequenza dopo aver cambiato la configurazione predefinita da 5 secondi a 10 secondi. Riducendo la frequenza dei heartbeat, ti assicuri che lo scheduler non inizi a funzionare mentre è in corso il ciclo di analisi precedente e che la capacità delle risorse dello scheduler non sia esaurita.
Assegna più risorse all'organizzatore
In Cloud Composer 2, puoi allocare più risorse di CPU e memoria allo schedulatore. In questo modo, puoi aumentare il rendimento del programmatore e accelerare il tempo di analisi del DAG.
Assegna CPU e memoria aggiuntive allo scheduler:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Configurazione dell'ambiente.
Individua la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.
Nella sezione Pianificatore, nel campo Memoria, specifica il nuovo limite di memoria. In questo tutorial, utilizza 4 GB.
Nel campo CPU, specifica il nuovo limite della CPU. In questo tutorial, utilizza 2 vCPU.
Salva le modifiche e attendi alcuni minuti per il riavvio degli pianificatori Airflow.
Vai alla scheda Log, poi a Tutti i log > Gestore processore DAG.
Esamina i log
dag-processor-manager
e confronta la durata dell'analisi per i DAG di esempio:Figura 16. I log del gestore del processore DAG mostrano i tempi di analisi del DAG dopo che sono state assegnate altre risorse allo scheduler (fai clic per ingrandire)
Se assegni più risorse allo scheduler, ne aumenti la capacità e riduci notevolmente la latenza di analisi rispetto alle configurazioni predefinite dell'ambiente. Con più risorse, lo scheduler può analizzare i DAG più velocemente, ma aumenteranno anche i costi associati alle risorse Cloud Composer. Inoltre, non è possibile aumentare le risorse oltre un determinato limite.
Ti consigliamo di allocare le risorse solo dopo aver implementato le eventuali ottimizzazioni del codice DAG e della configurazione di Airflow.
Esegui la pulizia
Per evitare che al tuo Google Cloud account vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse o mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Elimina singole risorse
Se intendi esplorare più tutorial e guide rapide, il riuso dei progetti ti aiuta a non superare i limiti di quota.
Elimina l'ambiente Cloud Composer. Inoltre, durante questa procedura viene eliminato il bucket dell'ambiente.
Passaggi successivi
- Ottimizzare le prestazioni e i costi dell'ambiente
- Scala gli ambienti
- Risoluzione dei problemi dei DAG