Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questo tutorial illustra la procedura di diagnosi e risoluzione dei problemi di pianificazione e analisi delle attività che causano malfunzionamento del programmatore, errori di analisi e latenza e fallimento delle attività.
Introduzione
Lo scheduler Airflow è influenzato principalmente da due fattori: la pianificazione delle attività e Analisi dei DAG. I problemi in uno di questi fattori possono avere un impatto negativo sulla l'integrità e le prestazioni dell'ambiente.
A volte vengono pianificate contemporaneamente troppe attività. In questa situazione, la coda è piena e le attività rimangono nello stato "Pianificata" o vengono riprogrammate dopo essere state messe in coda, il che potrebbe causare errori nelle attività e latenza delle prestazioni.
Un altro problema comune è l'analisi della latenza e degli errori causati dalla complessità del un codice DAG. Ad esempio, un codice DAG che contiene variabili Airflow al livello superiore del codice può causare ritardi nell'analisi, sovraccarico del database, errori di pianificazione e timeout del DAG.
In questo tutorial, eseguirai la diagnosi dei DAG di esempio e imparerai a risolvere i problemi di pianificazione e analisi, migliorare la pianificazione dei DAG e ottimizzare il codice DAG e le configurazioni dell'ambiente per migliorare le prestazioni.
Obiettivi
Questa sezione elenca gli obiettivi degli esempi in questo tutorial.
Esempio: malfunzionamento e latenza dello scheduler causati da un'elevata contemporaneità delle attività
Carica il DAG di esempio che viene eseguito più volte contemporaneamente e diagnostica i problemi di latenza e malfunzionamento dell'organizzatore con Cloud Monitoring.
Ottimizza il codice DAG consolidando le attività e valutando un impatto sulle prestazioni.
Distribuisci le attività in modo più uniforme nel tempo e valuta le prestazioni impatto.
Ottimizzare le configurazioni di Airflow e di ambiente valutarne l'impatto.
Esempio: latenza e errori di analisi dei DAG causati da codice complesso
Carica il DAG di esempio con le variabili Airflow e diagnostica i problemi di analisi con Cloud Monitoring.
Ottimizza il codice DAG evitando le variabili Airflow al livello superiore di il codice e valutare l'impatto sul tempo di analisi.
Ottimizzare le configurazioni di Airflow e le configurazioni di ambiente valutare l'impatto sul tempo di analisi.
Costi
Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:
Al termine di questo tutorial, puoi evitare la fatturazione continua eliminando il le risorse che hai creato. Per maggiori dettagli, vedi Pulizia.
Prima di iniziare
Questa sezione descrive le azioni necessarie prima di iniziare il tutorial.
Creare e configurare un progetto
Per questo tutorial è necessario un account Google Cloud progetto. Configura il progetto nel seguente modo:
Nella console Google Cloud, seleziona o crea un progetto:
Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata per un progetto.
Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:
- Amministratore ambienti e oggetti Storage
(
roles/composer.environmentAndStorageObjectAdmin
) - Amministratore di Compute (
roles/compute.admin
)
- Amministratore ambienti e oggetti Storage
(
Abilita le API per il tuo progetto
Enable the Cloud Composer API.
crea il tuo ambiente Cloud Composer
Crea un ambiente Cloud Composer 2.
Durante la creazione dell'ambiente,
devi concedere il ruolo Estensione agente di servizio API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) all'account agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni nel progetto Google Cloud.
Esempio: malfunzionamento della pianificazione e errore delle attività a causa di problemi di pianificazione delle attività
Questo esempio mostra il malfunzionamento dello scheduler di debug e la latenza per un'elevata contemporaneità di attività.
Carica il DAG di esempio nel tuo ambiente
Carica il seguente DAG di esempio nell'ambiente creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato
dag_10_tasks_200_seconds_1
.
Questo DAG ha 200 attività. Ogni attività attende 1 secondo e stampa "Completato!". Il DAG viene attivato automaticamente dopo il caricamento. Cloud Composer esegue questo DAG 10 volte e tutte le esecuzioni del DAG avvengono in parallelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnostica i problemi di malfunzionamento dello scheduler e di errore delle attività
Al termine delle esecuzioni del DAG, apri l'interfaccia utente di Airflow e fai clic sul DAG dag_10_tasks_200_seconds_1
. Vedrai che sono state eseguite 10 esecuzioni di DAG in totale e ognuna ha 200 attività riuscite.
Esamina i log delle attività Airflow:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Log, poi a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.
Nell'istogramma dei log puoi vedere gli errori e gli avvisi indicati in rosso e arancione:
Il DAG di esempio ha generato circa 130 avvisi e 60 errori. Fai clic su qualsiasi che contiene le barre gialle e rosse. Nei log vedrai alcuni dei seguenti avvertimenti e errori:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Questi log potrebbero indicare che l'utilizzo delle risorse ha superato i limiti e il worker si è riavviato.
Se un'attività Airflow viene tenuta in coda troppo a lungo, lo scheduler contrassegna come non riuscito e up_for_retry e lo riprogramma di nuovo dell'esecuzione. Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda e, se i picchi in questo grafico non diminuiscono in circa 10 minuti, è probabile che si verifichino errori nelle attività (senza log).
Esamina le informazioni di monitoraggio:
Vai alla scheda Monitoring e seleziona Panoramica.
Esamina il grafico Attività Airflow.
Nel grafico delle attività Airflow è presente un picco di attività in coda che dura più di 10 minuti, il che potrebbe significare che non sono disponibili risorse sufficienti nel tuo ambiente per elaborare tutte le attività pianificate.
Esamina il grafico Worker attivi:
Il grafico Worker attivi indica che il DAG ha attivato la scalabilità automatica fino al limite massimo consentito di tre worker durante l'esecuzione del DAG.
I grafici di utilizzo delle risorse possono indicare la mancanza di capacità nei worker Airflow per eseguire le attività in coda. Nella scheda Monitoring, seleziona Worker e rivedi i grafici Utilizzo totale CPU da parte dei lavoratori e Utilizzo totale della memoria dei lavoratori.
I grafici indicano che l'esecuzione di troppe attività contemporaneamente per raggiungere il limite di CPU. Le risorse sono state utilizzate da oltre 30 di minuti, che è ancora più lungo della durata totale di 200 attività in 10 I DAG vengono eseguiti singolarmente.
Questi sono gli indicatori del riempimento della coda e della mancanza di risorse per elaborare tutte le attività pianificate.
Consolidare le attività
Il codice attuale crea molti DAG e attività senza risorse sufficienti per elaborare tutte le attività in parallelo, il che comporta il riempimento della coda. Se le attività vengono tenute in coda troppo a lungo, le attività potrebbero non essere riprogrammate o non andare a buon fine. In questi casi, ti consigliamo di optare per un numero minore di attività più consolidate.
Il seguente DAG di esempio modifica il numero di attività nell'esempio iniziale da 200 a 20 e aumenta il tempo di attesa da 1 a 10 secondi per simulare attività più consolidate che svolgono la stessa quantità di lavoro.
Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato
dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Valuta l'impatto di attività più consolidate sui processi di pianificazione:
Attendi il completamento delle esecuzioni del DAG.
Nell'interfaccia utente di Airflow, nella pagina DAG, fai clic sul DAG
dag_10_tasks_20_seconds_10
. Vedrai 10 esecuzioni di DAG, ognuna con 20 e attività completate correttamente.Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log, quindi vai a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.
Il secondo esempio con attività più consolidate ha generato circa 10 avvisi e 7 errori. Nell'istogramma, puoi confrontare il numero di errori e avvisi nell'esempio iniziale (valori precedenti) e nel secondo esempio (valori successivi).
Confrontando il primo esempio con il più consolidato, puoi vedrai che ci sono molti meno errori e avvisi nel secondo esempio. Tuttavia, gli stessi errori relativi all'arresto caldo continuano a comparire nei log a causa del sovraccarico delle risorse.
Nella scheda Monitoraggio, seleziona Worker ed esamina i grafici.
Se confronti il grafico Attività Airflow per il primo esempio (valori precedenti) con il grafico per il secondo esempio con attività più consolidate, puoi vedere che il picco di attività in coda è durato per un periodo di tempo più breve quando le attività erano più consolidate. Tuttavia, è durata quasi 10 minuti, il che non è ancora ottimale.
Nel grafico Lavoratori attivi, puoi vedere che il primo esempio (sul lato sinistro del grafico) ha utilizzato le risorse per un periodo di tempo molto più lungo rispetto al secondo, anche se entrambi gli esempi simulano la stessa quantità di lavoro.
Esamina i grafici del consumo di risorse dei worker. Anche se la differenza tra le risorse utilizzate nell'esempio con attività più consolidate e l'esempio iniziale è abbastanza significativo, l'utilizzo della CPU è ancora fino al 70% del limite.
Distribuisci le attività in modo più uniforme nel tempo
Troppe attività simultanee comportano il riempimento della coda, il che porta a bloccate nella coda o riprogrammate. Nei passaggi precedenti, ha diminuito il numero di attività consolidandole; tuttavia, l'output log e il monitoraggio indicano che il numero di attività simultanee non ottimale.
Puoi controllare il numero di esecuzioni di attività simultanee implementando una pianificazione o impostando limiti per il numero di attività che possono essere eseguite contemporaneamente.
In questo tutorial, distribuisci le attività in modo più uniforme nel tempo aggiungendo parametri a livello di DAG nel DAG dag_10_tasks_20_seconds_10
:
Aggiungi l'argomento
max_active_runs=1
al gestore di contesto del DAG. Questo argomento imposta il limite di una sola istanza dell'esecuzione di un DAG in un determinato momento.Aggiungi l'argomento
max_active_tasks=5
al gestore del contesto DAG. Questo argomento controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente per ogni DAG.
Carica il seguente DAG di esempio nell'ambiente che hai creato. In questo tutorial, questo DAG è denominato
dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Valuta l'impatto della distribuzione delle attività nel tempo sulla pianificazione dei processi:
Attendi il completamento delle esecuzioni del DAG.
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Log, quindi vai a Tutti i log > Log Airflow > Worker > Visualizza in Esplora log.
Nell'istogramma, puoi vedere che il terzo DAG con un numero limitato di attività ed esecuzioni attive non ha generato avvisi o errori e la distribuzione dei log sembra più uniforme rispetto ai valori precedenti.
Le attività nell'esempio dag_10_tasks_20_seconds_10_scheduled
con un
numero limitato di attività ed esecuzioni attive non hanno causato una pressione sulle risorse perché
le attività sono state messe in coda in modo uniforme.
Dopo aver eseguito i passaggi descritti, hai ottimizzato l'utilizzo delle risorse consolidando piccole attività e distribuendole in modo più uniforme nel tempo.
Ottimizza le configurazioni di ambiente
Puoi modificare le configurazioni dell'ambiente per assicurarti che i worker Airflow abbiano sempre la capacità di eseguire le attività in coda.
Numero di worker e concorrenza dei worker
Puoi regolare il numero massimo di worker per consentire a Cloud Composer di scalare automaticamente il tuo ambiente nei limiti impostati.
Il parametro [celery]worker_concurrency
definisce il numero massimo di attività
un singolo worker può prendere dalla coda di attività. Modifica di questo parametro
regola il numero di attività che un singolo worker può eseguire contemporaneamente.
Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola. Per impostazione predefinita, la concorrenza dei worker è impostata su un valore minimo di quanto segue: 32, 12 * worker_CPU, 8 * worker_memory
, il che significa che dipende dai limiti delle risorse dei worker. Consulta
Ottimizza gli ambienti per ulteriori informazioni sugli ambienti predefiniti
valori di contemporaneità dei worker.
Il numero di worker e la concorrenza dei worker agiscono in combinazione tra loro e le prestazioni del tuo ambiente dipendono molto da entrambi i parametri. Per scegliere la combinazione corretta, tieni conto dei seguenti aspetti:
Più attività rapide in esecuzione in parallelo. Puoi aumentare il quando ci sono attività in attesa e i tuoi lavoratori usano una bassa percentuale di CPU e memoria contemporaneamente. Tuttavia, in determinate circostanze la coda potrebbe non riempirsi mai, causando l'attivazione della scalabilità automatica. Se le piccole attività terminano l'esecuzione entro il momento in cui i nuovi worker sono pronti, un worker esistente può assumere le attività rimanenti e non ci saranno attività per i worker appena creati.
In queste situazioni, è consigliabile Aumenta il numero minimo di worker e la contemporaneità dei worker per evitare una scalabilità eccessiva.
Più attività lunghe in esecuzione in parallelo. L'elevata concorrenza dei worker impedisce al sistema di scalare il numero di worker. Se più attività richiedono molte risorse e richiedono molto tempo per essere completate, una concorrenza elevata dei worker può comportare che la coda non venga mai riempita e che tutte le attività vengano acquisite da un solo worker, con conseguenti problemi di prestazioni. In queste situazioni, è consigliabile aumentare il numero massimo di worker e diminuire la concorrenza dei worker.
L'importanza del parallelismo
Gli scheduler di Airflow controllano la pianificazione delle esecuzioni dei DAG e delle singole attività
e i DAG. L'opzione di configurazione [core]parallelism
di Airflow controlla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'executor dopo che tutte le dipendenze per queste attività sono state soddisfatte.
Il parallelismo è un meccanismo di protezione di Airflow che determina quante attività possono essere eseguite contemporaneamente per ogni pianificatore, indipendentemente dal numero di worker. Il valore di parallelismo, moltiplicato per il numero di scheduler nel cluster, è il numero massimo di istanze di attività che il tuo ambiente può mettere in coda.
Di solito, [core]parallelism
è impostato come prodotto di un numero massimo di worker
e [celery]worker_concurrency
. È influenzata anche dalle
pool.
Puoi modificare questa opzione di configurazione di Airflow sovrascrivendola. Per ulteriori informazioni su come modificare le configurazioni di Airflow relative al ridimensionamento, consulta Ridimensionare la configurazione di Airflow.
Trova le configurazioni di ambiente ottimali
Il modo consigliato per risolvere i problemi di pianificazione è consolidare le attività di piccole dimensioni in ad attività più grandi e a distribuire le attività in modo più uniforme nel tempo. Oltre a per ottimizzare il codice DAG, puoi anche ottimizzare le configurazioni dell'ambiente e una capacità sufficiente per l'esecuzione simultanea di più attività.
Ad esempio, supponiamo di consolidare attività nel DAG il più possibile, ma limitando le attività attive per distribuirle in modo più uniforme non è una soluzione preferita per il tuo caso d'uso specifico.
Puoi modificare i parametri di parallelismo, numero di worker e concorrenza dei worker per eseguire il DAG dag_10_tasks_20_seconds_10
senza limitare le attività attive. In questo esempio, il DAG viene eseguito 10 volte e ogni esecuzione contiene 20 piccole attività.
Se vuoi eseguirli tutti contemporaneamente:
Avrai bisogno di un ambiente di dimensioni maggiori, perché controlla i parametri di prestazioni dell'infrastruttura Cloud Composer gestita del tuo ambiente.
I worker di Airflow devono essere in grado di eseguire 20 attività contemporaneamente, il che significa che devi impostare la concorrenza dei worker su 20.
I worker hanno bisogno di CPU e memoria sufficienti per gestire tutte le attività. Lavoratore la contemporaneità dipende dalla CPU e dalla memoria dei worker, quindi, almeno
worker_concurrency / 12
nella CPU e almenoleast worker_concurrency / 8
in memoria.Dovrai aumentare il parallelismo per far corrispondere la contemporaneità dei worker più alta. Per consentire ai worker di raccogliere 20 attività dalla coda, lo scheduler dover pianificare prima queste 20 attività.
Regola le configurazioni di ambiente nel seguente modo:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Configurazione dell'ambiente.
Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.
Nella sezione Worker, specifica la nuova memoria nel campo Memoria per i worker di Airflow. In questo tutorial vengono utilizzati 4 GB.
Nel campo CPU, specifica il nuovo limite di CPU per i worker di Airflow. In questo usa 2 vCPU.
Salva le modifiche e attendi diversi minuti per i worker di Airflow riavvia.
A questo punto, sostituisci le opzioni di configurazione di Airflow per il parallelismo e la concorrenza dei worker:
Vai alla scheda Override della configurazione Airflow.
Fai clic su Modifica, quindi su Aggiungi override della configurazione di Airflow.
Sostituisci la configurazione del parallelismo:
Sezione Chiave Valore core
parallelism
20
Fai clic su Aggiungi override della configurazione Airflow ed esegui l'override della configurazione della concorrenza dei worker:
Sezione Chiave Valore celery
worker_concurrency
20
Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.
Attiva di nuovo lo stesso DAG di esempio con le configurazioni adeguate:
Nella UI di Airflow, vai alla pagina DAG.
Individua il DAG
dag_10_tasks_20_seconds_10
ed eliminalo.Dopo l'eliminazione del DAG, Airflow controlla la cartella dei DAG in del bucket dell'ambiente ed esegue automaticamente di nuovo il DAG.
Al termine delle esecuzioni del DAG, esamina di nuovo l'istogramma dei log. Nel diagramma,
puoi vedere che l'esempio dag_10_tasks_20_seconds_10
con più attività consolidate non ha generato errori e avvisi durante l'esecuzione con la configurazione dell'ambiente modificata. Confronta i risultati con i dati precedenti
nel diagramma, dove lo stesso esempio ha generato errori e avvisi quando
in esecuzione con la configurazione predefinita dell'ambiente tge.
Le configurazioni dell'ambiente e le configurazioni di Airflow svolgono un ruolo cruciale pianificazione delle attività, tuttavia, non è possibile aumentare le configurazioni al di là di determinati limiti.
Consigliamo di ottimizzare il codice DAG, consolidare le attività e utilizzare la pianificazione per ottimizzare prestazioni ed efficienza.
Esempio: latenza e errori di analisi dei DAG a causa di codice DAG complesso
In questo esempio, analizzerai la latenza di analisi di un DAG di esempio che imita un eccesso di variabili Airflow.
Crea una nuova variabile Airflow
Prima di caricare il codice campione, crea una nuova variabile Airflow.
Nella console Google Cloud, vai alla pagina Ambienti.
Nella colonna Server web Airflow, segui il link Airflow per il tuo completamente gestito di Google Cloud.
Vai ad Amministrazione > Variabili > Aggiungi un nuovo record.
Imposta i seguenti valori:
- chiave:
example_var
- valore:
test_airflow_variable
- chiave:
Carica il DAG di esempio nel tuo ambiente
Carica il seguente DAG di esempio nell'ambiente
creato nei passaggi precedenti. In questo tutorial, questo DAG è denominato
dag_for_loop_airflow_variable
.
Questo DAG contiene un ciclo for che viene eseguito 1000 volte e imita un eccesso di
Variabili Airflow. Ogni iterazione legge la variabile example_var
e
genera un'attività. Ogni attività contiene un comando che stampa
valore.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostica i problemi di analisi
Il tempo di analisi del DAG è il tempo necessario allo scheduler di Airflow per leggere e analizzare un file DAG. Prima che lo scheduler Airflow possa pianificare qualsiasi attività da un DAG, lo scheduler deve analizzare il file DAG per scoprire la struttura del DAG e le attività definite.
Se l'analisi di un DAG richiede molto tempo, questo consuma la capacità dello scheduler ridurre le prestazioni delle esecuzioni dei DAG.
Per monitorare il tempo di analisi del DAG:
Esegui il comando interfaccia a riga di comando di Airflow
dags report
in gcloud CLI per vedere il tempo di analisi per tutti i tuoi DAG:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Sostituisci quanto segue:
ENVIRONMENT_NAME
: il nome dell'ambiente.LOCATION
: la regione in cui si trova l'ambiente.
Nell'output del comando, cerca il valore della durata per il DAG
dag_for_loop_airflow_variables
. Un valore elevato può indicare questo DAG non è implementato in modo ottimale. Se hai più DAG, dalla tabella di output puoi identificare i DAG che hanno un tempo di analisi lungo.Esempio:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Controlla i tempi di analisi del DAG nella console Google Cloud:
- Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Log, quindi vai a Tutti i log > Gestore processore DAG.
Esamina i log di
dag-processor-manager
e identifica i possibili problemi.
Se il tempo totale di analisi dei DAG supera circa 10 secondi, gli scheduler potrebbero essere è sovraccarico con l'analisi dei DAG e non può eseguire i DAG in modo efficace.
Ottimizza il codice DAG
È consigliato di evitare codice Python "di primo livello" non necessario nei DAG. I DAG con molte importazioni, variabili e funzioni al di fuori del DAG introducono tempi di analisi più elevati per lo scheduler Airflow. Ciò riduce le prestazioni e la scalabilità di Cloud Composer e Airflow. Un eccesso di lettura delle variabili Airflow porta a tempi di analisi lunghi e a un carico elevato del database. Se questo codice si trova in un DAG queste funzioni vengono eseguite su ogni heartbeat dello scheduler, il che potrebbe essere lento.
I campi del modello di Airflow ti consentono di incorporare i valori di Airflow e modelli Jinja nei tuoi DAG. In questo modo viene evitata l'esecuzione non necessaria delle funzioni durante i heartbeat dello scheduler.
Per implementare l'esempio di DAG in modo migliore, evita di usare le variabili Airflow in il codice Python di primo livello dei DAG. Passa invece le variabili Airflow agli operatori esistenti tramite un modello Jinja, che ritarderà la lettura del valore fino all'esecuzione dell'attività.
Carica la nuova versione del DAG di esempio nel tuo ambiente. In questo tutorial, questo DAG è denominato
dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Controlla il nuovo tempo di analisi del DAG:
Attendi il completamento dell'esecuzione del DAG.
Esegui di nuovo il comando
dags report
per visualizzare di analisi per tutti i DAG:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Esamina di nuovo
dag-processor-manager
log e per analizzare la durata dell'analisi.
Sostituendo le variabili di ambiente con i modelli Airflow, hai semplificato il codice DAG e ha ridotto la latenza di analisi di circa dieci volte.
Ottimizza le configurazioni dell'ambiente Airflow
Lo scheduler Airflow cerca costantemente di attivare nuove attività e analizza tutti i DAG nel tuo bucket di ambiente. Se i DAG hanno un tempo di analisi lungo e lo scheduler consuma molte risorse, puoi ottimizzare le configurazioni dello scheduler di Airflow in modo che utilizzi le risorse in modo più efficiente.
In questo tutorial, l'analisi dei file DAG richiede molto tempo e i cicli di analisi
iniziano a sovrapporsi, il che esaurisce la capacità dello scheduler. Nel nostro esempio,
l'analisi del primo DAG richiede più di 5 secondi, quindi configurerai
lo scheduler in modo che venga eseguito meno di frequente per utilizzare le risorse in modo più efficiente. Tu
sovrascriverà
scheduler_heartbeat_sec
Opzione di configurazione Airflow. Questa configurazione definisce la frequenza
scheduler dovrebbe essere eseguito (in secondi). Per impostazione predefinita, il valore è impostato su 5 secondi.
Puoi modificare questa opzione di configurazione di Airflow tramite
scattando l'override.
Sostituisci l'opzione di configurazione di Airflow scheduler_heartbeat_sec
:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Override della configurazione di Airflow.
Fai clic su Modifica e poi su Aggiungi override della configurazione di Airflow.
Esegui l'override dell'opzione di configurazione Airflow:
Sezione Chiave Valore scheduler
scheduler_heartbeat_sec
10
Fai clic su Salva e attendi che l'ambiente aggiorni la configurazione.
Controlla le metriche dello scheduler:
Vai alla scheda Monitoraggio e seleziona Pianificatori.
Nel grafico Heartbeat dell'agente di pianificazione, fai clic sul pulsante Altre opzioni (tre puntini) e poi su Visualizza in Metrics Explorer.
Nel grafico vedrai che la pianificazione viene eseguita con la metà della frequenza dopo aver cambiato la configurazione predefinita da 5 secondi a 10 secondi. Riduce il valore frequenza di heartbeat, ti assicuri che lo scheduler non avvii in esecuzione mentre il precedente ciclo di analisi è in corso e lo scheduler la capacità della risorsa non è esaurita.
Assegna più risorse all'organizzatore
In Cloud Composer 2, puoi allocare più risorse di CPU e memoria scheduler. In questo modo, puoi aumentare il rendimento del programmatore e accelerare il tempo di analisi del DAG.
Assegna CPU e memoria aggiuntive allo scheduler:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.
Vai alla scheda Configurazione dell'ambiente.
Trova la configurazione Risorse > Carichi di lavoro e fai clic su Modifica.
Nella sezione Pianificatore, nel campo Memoria, specifica il nuovo limite di memoria. In questo tutorial, utilizza 4 GB.
Nel campo CPU, specifica il nuovo limite di CPU. In questo usa 2 vCPU.
Salva le modifiche e attendi diversi minuti per gli scheduler di Airflow riavvia.
Vai alla scheda Log, quindi vai a Tutti i log > Gestore processore DAG.
Esamina i log di
dag-processor-manager
e confronta la durata di analisi per il di esempio di DAG:
Assegnando più risorse allo scheduler, hai aumentato e ha ridotto la latenza di analisi significativamente rispetto ai valori predefiniti configurazioni dell'ambiente di lavoro. Con più risorse, lo scheduler può analizzare i DAG più velocemente, ma aumenteranno anche i costi associati alle risorse Cloud Composer. Inoltre, non è possibile aumentare risorse che superano un certo limite.
Ti consigliamo di allocare le risorse solo dopo aver implementato le possibili ottimizzazioni del codice DAG e della configurazione di Airflow.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse usati in questo tutorial, elimina il progetto che contiene le risorse o mantenere il progetto ed eliminare le singole risorse.
Elimina il progetto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Elimina singole risorse
Se prevedi di esplorare più tutorial e guide rapide, puoi riutilizzare i progetti può aiutarti a evitare di superare i limiti di quota.
Eliminare l'ambiente Cloud Composer. Inoltre, durante questa procedura viene eliminato il bucket dell'ambiente.
Passaggi successivi
- Ottimizzare le prestazioni e i costi dell'ambiente
- Scala gli ambienti
- Risoluzione dei problemi dei DAG