Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa guida mostra come scrivere un grafo diretto aciclico di Apache Airflow (DAG) eseguito in un ambiente Cloud Composer.
Poiché Apache Airflow non fornisce un solido isolamento di DAG e attività, consigliamo di usare ambienti di produzione e di test separati per evitare interferenze DAG. Per ulteriori informazioni, consulta la sezione Testare i DAG.
Strutturare un DAG Airflow
Un DAG di Airflow è definito in un file Python ed è costituito dai seguenti componenti:
- Definizione di DAG
- Operatori Airflow
- Relazioni con gli operatori
I seguenti snippet di codice mostrano esempi di ciascun componente fuori contesto.
Una definizione DAG
L'esempio seguente mostra una definizione di DAG Airflow:
Operatori e attività
Gli operatori Airflow descrivono il lavoro da svolgere. Un'attività task è un'istanza specifica di un operatore.
Relazioni tra attività
Le relazioni tra attività descrivono l'ordine in cui deve essere completato il lavoro.
Esempio di flusso di lavoro DAG completo in Python
Il seguente flusso di lavoro è un modello DAG completo e funzionante, composto da
due attività: un'attività hello_python
e un'attività goodbye_bash
:
Per ulteriori informazioni sulla definizione dei DAG di Airflow, consulta il tutorial di Airflow e i concetti di Airflow.
Operatori Airflow
Gli esempi seguenti mostrano alcuni operatori di Airflow più diffusi. Per un riferimento autorevole sugli operatori di Airflow, consulta la Guida di riferimento su operatori e hook e l'Indice dei fornitori.
BashOperator
Utilizza BashOperator per eseguire programmi a riga di comando.
Cloud Composer esegue i comandi forniti in uno script Bash su un Worker Airflow. Il worker è un container Docker basato su Debian e include diversi pacchetti.
- Comando
gcloud
, inclusogcloud storage
per lavorare con i bucket Cloud Storage. - Comando
bq
- Comando
kubectl
PythonOperator
Utilizza la PythonOperator per eseguire codice Python arbitrario.
Cloud Composer esegue il codice Python in un contenitore che include i pacchetti per la versione dell'immagine Cloud Composer utilizzata nel tuo ambiente.
Per installare altri pacchetti Python, consulta Installazione delle dipendenze Python.
Operatori Google Cloud
Per eseguire attività che utilizzano i prodotti Google Cloud, utilizza la Operatori di Google Cloud Airflow. Ad esempio: Operatori di BigQuery interrogare ed elaborare i dati in BigQuery.
Esistono molti altri operatori Airflow per Google Cloud e per i singoli servizi forniti da Google Cloud. Consulta Operatori Google Cloud per consultare l'elenco completo.
EmailOperator
Utilizza la EmailOperator per inviare email da un DAG. Per inviare le email da un ambiente Cloud Composer, configurare l'ambiente per l'utilizzo di SendGrid.
Notifiche in caso di errore dell'operatore
Imposta email_on_failure
su True
per inviare una notifica via email quando un operatore nel DAG non funziona. Per inviare notifiche via email da un ambiente Cloud Composer, devi configurare l'ambiente in modo che utilizzi SendGrid.
Linee guida per i flussi di lavoro DAG
Inserisci eventuali librerie Python personalizzate nell'archivio ZIP di un DAG in una . Non posizionare le librerie al livello superiore della directory dei DAG.
Quando Airflow analizza la cartella
dags/
, verifica solo la presenza di DAG in I moduli Python che si trovano al primo livello della cartella dei DAG e nella parte superiore livello di un archivio ZIP situato anche nella cartella di primo livellodags/
. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene entrambe le sottostringheairflow
eDAG
, interrompe l'elaborazione dell'archivio ZIP. Airflow restituisce solo i DAG trovati fino a quel momento.Per la tolleranza ai guasti, non definire più oggetti DAG nello stesso modulo Python.
Non utilizzare DAG secondari. Invece, raggruppare le attività all'interno dei DAG.
Inserisci i file richiesti al momento dell'analisi dei DAG nella cartella
dags/
, non nella cartelladata/
.Testa i DAG sviluppati o modificati come consigliato nelle istruzioni per il test dei DAG.
Lo strumento CLI Composer Local Development semplifica lo sviluppo di DAG Apache Airflow per Cloud Composer 2 eseguendo un ambiente Airflow in locale. Questo ambiente Airflow locale utilizza l'immagine di un una versione specifica di Cloud Composer 2.
Verifica che i DAG sviluppati non aumentino troppo i tempi di analisi dei DAG.
Le attività Airflow possono non riuscire per diversi motivi. Per evitare errori delle intere esecuzioni di DAG, consigliamo di abilitare i nuovi tentativi delle attività. Se imposti il numero massimo di tentativi su
0
, non viene eseguito alcun tentativo.Ti consigliamo di sostituire l'opzione
default_task_retries
con un valore per le ripetizioni della tarefa diverso da0
. Inoltre, puoi impostare Parametroretries
a livello di attività.Se vuoi utilizzare GPU nelle attività Airflow, crea Cluster GKE basato su nodi che utilizzano macchine con GPU. Utilizza le funzionalità di GKEStartPodOperator per eseguire le tue attività.
Evita di eseguire attività che usano molta CPU e memoria nel pool di nodi del cluster, dove sono in esecuzione altri componenti di Airflow (scheduler, worker, server web). Utilizza invece KubernetesPodOperator o GKEStartPodOperator.
Quando esegui il deployment dei DAG in un ambiente, carica solo i file che sono assolutamente necessarie per interpretare ed eseguire i DAG nella cartella
/dags
.Limita il numero di file DAG nella cartella
/dags
.Airflow analizza continuamente i DAG nella cartella
/dags
. L'analisi è un procedura che esegue un ciclo nella cartella dei DAG e il numero di file che devono essere caricati (con le relative dipendenze) influisce sul rendimento dell'analisi dei DAG e della pianificazione delle attività. È molto più efficiente usare 100 file con 100 DAG ciascuno di 10.000 file con 1 DAG ciascuno e quindi tali è consigliabile eseguire l'ottimizzazione. Questa ottimizzazione è un equilibrio tra tempi di analisi ed efficienza della creazione e della gestione dei DAG.Puoi anche prendere in considerazione, ad esempio, di eseguire il deployment di 10000 file DAG creando 100 file ZIP contenenti ciascuno 100 file DAG.
Oltre ai suggerimenti riportati sopra, se hai più di 10.000 file DAG, la generazione di DAG in modo programmatico potrebbe essere una buona opzione. Ad esempio, puoi implementare un singolo file DAG Python che genera un certo numero di oggetti DAG (ad esempio 20, 100 oggetti DAG).
Domande frequenti sulla scrittura dei DAG
Come faccio a ridurre al minimo la ripetizione del codice se voglio eseguire le stesse attività o attività simili in più DAG?
Ti consigliamo di definire librerie e wrapper per minimizzare la ripetizione del codice.
Come posso riutilizzare il codice tra i file DAG?
Inserisci le funzioni di utilità in un
libreria Python locale
e importare le funzioni. Puoi fare riferimento alle funzioni in qualsiasi DAG localizzato
nella cartella dags/
del bucket dell'ambiente.
Come faccio a ridurre al minimo il rischio che possano insorgere definizioni diverse?
Ad esempio, hai due team che vogliono aggregare i dati non elaborati in metriche sulle entrate. I team scrivono due attività leggermente diverse che svolgono lo stesso cosa. Definisci le librerie lavorare con i dati sulle entrate in modo che gli implementatori DAG debbano chiarire la definizione di entrate aggregate.
Come posso impostare le dipendenze tra i DAG?
Questo dipende da come vuoi definire la dipendenza.
Se hai due DAG (DAG A e DAG B) e vuoi che il DAG B venga attivato dopo il DAG A, puoi inserire un TriggerDagRunOperator
alla fine del DAG A.
Se il DAG B dipende solo da un artefatto generato dal DAG A, ad esempio un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.
Se il DAG B è integrato strettamente con il DAG A, potresti essere in grado di unire i due DAG in un unico DAG.
Come posso passare ID esecuzione univoci a un DAG e alle sue attività?
Ad esempio, vuoi passare i nomi dei cluster Dataproc e i percorsi dei file.
Puoi generare un ID univoco casuale restituendo str(uuid.uuid4())
in
PythonOperator
. L'ID viene inserito
XComs
per poter fare riferimento all'ID in altri operatori
tramite campi basati su modelli.
Prima di generare un uuid
, valuta se un ID specifico per DagRun deve essere
più prezioso. Puoi anche fare riferimento a questi ID nelle sostituzioni Jinja utilizzando le macro.
Come si separano le attività in un DAG?
Ogni attività deve essere un'unità di lavoro idempotente. Di conseguenza, dovresti evitare di eseguire il wrapping di un flusso di lavoro a più passaggi all'interno di un'unica attività, ad esempio un programma complesso in esecuzione in un PythonOperator
.
Devo definire più attività in un unico DAG per aggregare i dati di più origini?
Ad esempio, hai più tabelle con dati non elaborati e vuoi creare aggregati giornalieri per ogni tabella. Le attività non dipendono l'una dall'altra. Dovrei crei un'attività e un DAG per ogni tabella o crei un DAG generale?
Se ti va bene che ogni attività condivida le stesse proprietà a livello di DAG, come
schedule_interval
, allora ha senso definire più attività in un singolo
con il DAG. Altrimenti, per ridurre al minimo le ripetizioni del codice, è possibile generare più DAG
da un singolo modulo Python inserendoli nell'oggetto globals()
del modulo.
Come faccio a limitare il numero di attività simultanee in esecuzione in un DAG?
Ad esempio, vuoi evitare di superare i limiti/quote di utilizzo delle API eseguendo troppi processi simultanei.
Puoi definire pool Airflow nell'interfaccia utente web di Airflow e associare le attività ai pool esistenti nei tuoi DAG.
Domande frequenti sull'utilizzo degli operatori
Dovrei usare DockerOperator
?
Ti sconsigliamo di utilizzare
DockerOperator
, a meno che non venga utilizzato per avviare
contenuti su un'installazione Docker remota (non all'interno del
cluster di un ambiente). In un ambiente Cloud Composer l'operatore non ha
ai daemon Docker.
Usa invece KubernetesPodOperator
o
GKEStartPodOperator
. Questi operatori avviano i pod Kubernetes
Kubernetes o GKE. Tieni presente che non consigliamo di avviare pod nel cluster di un ambiente, perché ciò può portare alla concorrenza per le risorse.
Dovrei usare SubDagOperator
?
Ti consigliamo di non utilizzare SubDagOperator
.
Utilizza le alternative suggerite in Raggruppare le attività.
Devo eseguire il codice Python solo in PythonOperators
per separare completamente gli operatori Python?
A seconda del tuo obiettivo, hai a disposizione alcune opzioni.
Se il tuo unico problema è mantenere dipendenze Python separate, puoi utilizzare PythonVirtualenvOperator
.
Valuta la possibilità di utilizzare KubernetesPodOperator
. Questo operatore ti consente
per definire i pod Kubernetes ed eseguirli in altri cluster.
Come faccio ad aggiungere pacchetti binari personalizzati o non PyPI?
Puoi installare pacchetti ospitati in repository di pacchetti privati.
Come faccio a passare gli argomenti in modo uniforme a un DAG e alle relative attività?
Puoi utilizzare il supporto integrato di Airflow per il modello Jinja per passare gli argomenti che possono essere utilizzati nei campi basati su modelli.
Quando avviene la sostituzione del modello?
La sostituzione del modello viene eseguita sui worker Airflow appena prima di pre_execute
di un operatore. In pratica, ciò significa che i modelli
non viene sostituito fino a poco prima dell'esecuzione di un'attività.
Come faccio a sapere quali argomenti dell'operatore supportano la sostituzione del modello?
Gli argomenti dell'operatore che supportano la sostituzione del modello Jinja2 sono esplicitamente contrassegnati come tali.
Cerca il campo template_fields
nella definizione dell'operatore.
che contiene un elenco di nomi di argomenti sottoposti a sostituzione del modello.
Ad esempio, consulta
BashOperator
, che supporta la creazione di modelli per
gli argomenti bash_command
e env
.
Passaggi successivi
- Risoluzione dei problemi dei DAG
- Pianificazione dei problemi
- Operatori Google
- Operatori Google Cloud
- Tutorial su Apache Airflow