Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa guida mostra come scrivere un grafo diretto aciclico (DAG) di Apache Airflow eseguito in un ambiente Cloud Composer.
Poiché Apache Airflow non fornisce un solido isolamento di DAG e attività, ti consigliamo di utilizzare ambienti di produzione e test separati per evitare interferenze DAG. Per maggiori informazioni, consulta Test dei DAG.
Strutturazione di un DAG Airflow
Un DAG Airflow è definito in un file Python ed è composto dai seguenti componenti:
- Definizione di DAG
- Operatori Airflow
- Relazioni con gli operatori
I seguenti snippet di codice mostrano esempi di ogni componente fuori contesto.
Una definizione di DAG
L'esempio seguente mostra una definizione di DAG Airflow:
Flusso d'aria 2
Flusso d'aria 1
Operatori e attività
Gli operatori Airflow descrivono il lavoro da svolgere. Un'attività è un'istanza specifica di un operatore.
Flusso d'aria 2
Flusso d'aria 1
Relazioni delle attività
Le relazioni delle attività descrivono l'ordine in cui deve essere completato il lavoro.
Flusso d'aria 2
Flusso d'aria 1
Esempio di flusso di lavoro DAG completo in Python
Il seguente flusso di lavoro è un modello DAG funzionante e composto da
due attività: un'attività hello_python
e un'attività goodbye_bash
:
Flusso d'aria 2
Flusso d'aria 1
Per ulteriori informazioni sulla definizione dei DAG Airflow, consulta il tutorial di Airflow e i concetti di Airflow.
Operatori Airflow
Gli esempi seguenti mostrano alcuni operatori di Airflow più diffusi. Per un riferimento autorevole degli operatori Airflow, consulta la documentazione Riferimento per operatori e hook e l'indice dei provider.
BashOperator
Utilizza BashOperator per eseguire programmi a riga di comando.
Flusso d'aria 2
Flusso d'aria 1
Cloud Composer esegue i comandi forniti in uno script Bash su un worker Airflow. Il worker è un container Docker basato su Debian e include diversi pacchetti.
PythonOperator
Usa PythonOperator per eseguire codice Python arbitrario.
Cloud Composer esegue il codice Python in un container che include i pacchetti per la versione dell'immagine di Cloud Composer utilizzata nel proprio ambiente.
Per installare pacchetti Python aggiuntivi, consulta Installazione delle dipendenze Python.
Operatori Google Cloud
Per eseguire attività che utilizzano i prodotti Google Cloud, utilizza gli operatori Google Cloud Airflow. Ad esempio, gli operatori di BigQuery eseguono query ed elaborano i dati in BigQuery.
Esistono molti altri operatori Airflow per Google Cloud e per i singoli servizi forniti da Google Cloud. Per l'elenco completo, consulta la pagina relativa agli operatori di Google Cloud.
Flusso d'aria 2
Flusso d'aria 1
EmailOperator
Utilizza EmailOperator per inviare email da un DAG. Per inviare email da un ambiente Cloud Composer, configura il tuo ambiente per l'utilizzo di SendGrid.
Flusso d'aria 2
Flusso d'aria 1
Notifiche in caso di errore dell'operatore
Imposta email_on_failure
su True
per inviare una notifica via email quando un operatore nel DAG non funziona. Per inviare notifiche email da un ambiente Cloud Composer, devi configurare l'ambiente per l'utilizzo di SendGrid.
Flusso d'aria 2
Flusso d'aria 1
Linee guida per i flussi di lavoro DAG
Inserisci eventuali librerie Python personalizzate nell'archivio ZIP di un DAG in una directory nidificata. Non inserire le librerie al livello superiore della directory dei DAG.
Quando Airflow analizza la cartella
dags/
, verifica la presenza di DAG solo nei moduli Python che si trovano al primo livello della cartella dei DAG e nel livello superiore di un archivio ZIP che si trova anche nella cartelladags/
di primo livello. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene entrambe le sottostringheairflow
eDAG
, Airflow interrompe l'elaborazione dell'archivio ZIP. Airflow restituisce solo i DAG trovati fino a quel punto.Utilizza Airflow 2 anziché Airflow 1.
La community di Airflow non pubblica più nuove release secondarie o patch per Airflow 1.
Per la tolleranza di errore, non definire più oggetti DAG nello stesso modulo Python.
Non utilizzare i DAG secondari. Puoi invece raggruppare le attività all'interno dei DAG.
Inserisci i file richiesti al momento dell'analisi dei DAG nella cartella
dags/
, non nella cartelladata/
.Testa i DAG sviluppati o modificati come consigliato nelle istruzioni per testare i DAG.
Verifica che i DAG sviluppati non aumentino troppo i tempi di analisi dei DAG.
Le attività Airflow possono non riuscire per diversi motivi. Per evitare errori di tutte le esecuzioni dei DAG, consigliamo di abilitare i nuovi tentativi delle attività. Se imposti il numero massimo di nuovi tentativi su
0
, non verrà eseguito alcun nuovo tentativo.Ti consigliamo di sostituire l'opzione
default_task_retries
con un valore diverso da0
per i nuovi tentativi di attività. Inoltre, puoi impostare il parametroretries
a livello dell'attività.Se vuoi utilizzare GPU nelle attività Airflow, crea un cluster GKE separato basato sui nodi che utilizzano macchine con GPU. Utilizza GKEStartPodOperator per eseguire le tue attività.
Evita di eseguire attività che richiedono un uso intensivo della CPU e della memoria nel pool di nodi del cluster dove sono in esecuzione altri componenti Airflow (pianificatori, worker, server web). Utilizza invece KubernetesPodOperator o GKEStartPodOperator.
Quando esegui il deployment dei DAG in un ambiente, carica solo i file assolutamente necessari per l'interpretazione e l'esecuzione dei DAG nella cartella
/dags
.Limita il numero di file DAG nella cartella
/dags
.Airflow esegue l'analisi continua dei DAG nella cartella
/dags
. L'analisi è un processo che esegue il loop attraverso la cartella dei DAG e il numero di file che devono essere caricati (con le loro dipendenze) influisce sulle prestazioni dell'analisi dei DAG e della pianificazione delle attività. È molto più efficiente utilizzare 100 file con 100 DAG ciascuno rispetto a 10.000 file con 1 DAG ciascuno, per cui è consigliata questa ottimizzazione. Questa ottimizzazione è un equilibrio tra il tempo di analisi e l'efficienza della creazione e della gestione dei DAG.Puoi anche prendere in considerazione, ad esempio, il deployment di 10.000 file DAG, potresti creare 100 file ZIP ciascuno contenente 100 file DAG.
Oltre ai suggerimenti precedenti, se hai più di 10.000 file DAG, la generazione di DAG in modo programmatico potrebbe essere una buona opzione. Ad esempio, puoi implementare un singolo file DAG Python che genera un certo numero di oggetti DAG (ad esempio, 20, 100 oggetti DAG).
Evita di utilizzare operatori Airflow deprecati
Gli operatori elencati nella seguente tabella sono deprecati. Alcuni di questi operatori erano supportati nelle prime versioni di Cloud Composer 1. Evita di usarli nei DAG. Utilizza invece le alternative fornite e aggiornate.
Operatore obsoleto | Operatore da utilizzare |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator, MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
Domande frequenti sulla scrittura dei DAG
Come faccio a ridurre al minimo le ripetizioni del codice se voglio eseguire attività uguali o simili in più DAG?
Consigliamo di definire le librerie e i wrapper per ridurre al minimo la ripetizione del codice.
Come posso riutilizzare il codice tra i file DAG?
Inserisci le funzioni di utilità in una libreria Python locale e importa le funzioni. Puoi fare riferimento alle funzioni in qualsiasi DAG nella cartella dags/
del bucket del tuo ambiente.
Come faccio a ridurre al minimo il rischio che possano insorgere definizioni diverse?
Ad esempio, hai due team che vogliono aggregare i dati non elaborati nelle metriche delle entrate. I team scrivono due attività leggermente diverse che ottengono lo stesso risultato. Definisci le librerie per lavorare con i dati sulle entrate in modo che gli implementatori dei DAG debbano chiarire la definizione di entrate che vengono aggregate.
Come posso impostare le dipendenze tra i DAG?
Questo dipende da come vuoi definire la dipendenza.
Se hai due DAG (DAG A e DAG B) e vuoi che il DAG B si attivi dopo il DAG
A, puoi inserire un
TriggerDagRunOperator
alla fine del DAG A.
Se il DAG B dipende solo da un artefatto generato dal DAG A, ad esempio un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.
Se il DAG B è integrato strettamente con il DAG A, potresti essere in grado di unire i due DAG in un DAG.
Come posso passare ID esecuzione univoci a un DAG e alle sue attività?
Ad esempio, vuoi passare i nomi dei cluster Dataproc e i percorsi dei file.
Puoi generare un ID univoco casuale restituendo str(uuid.uuid4())
in
un PythonOperator
. L'ID viene inserito in XComs
per consentirti di fare riferimento all'ID in altri operatori tramite campi basati su modello.
Prima di generare un uuid
, valuta se un ID specifico di DagRun sarebbe più utile. Puoi anche fare riferimento a questi ID nelle sostituzioni Jinja
utilizzando le macro.
Come si separano le attività in un DAG?
Ogni attività deve essere un'unità di lavoro idempotente. Di conseguenza, dovresti evitare di includere un flusso di lavoro in più passaggi in una singola attività, ad esempio un programma complesso eseguito in un PythonOperator
.
Devo definire più attività in un singolo DAG per aggregare dati da più origini?
Ad esempio, hai più tabelle con dati non elaborati e vuoi creare aggregati giornalieri per ogni tabella. Le attività non dipendono l'una dall'altra. Dovresti creare un'attività e un DAG per ogni tabella o creare un DAG generale?
Se vuoi che ogni attività condivida le stesse proprietà a livello di DAG, come
schedule_interval
, ha senso definire più attività in un singolo
DAG. Altrimenti, per ridurre al minimo le ripetizioni del codice, è possibile generare più DAG da un singolo modulo Python inserendoli nell'oggetto globals()
del modulo.
Come posso limitare il numero di attività simultanee in esecuzione in un DAG?
Ad esempio, vuoi evitare di superare i limiti/quote di utilizzo delle API o di eseguire troppi processi simultanei.
Puoi definire pool Airflow nella UI web di Airflow e associare attività a pool esistenti nei tuoi DAG.
Domande frequenti sull'utilizzo degli operatori
Dovrei usare DockerOperator
?
Consigliamo di non utilizzare DockerOperator
, a meno che non venga utilizzato per avviare container su un'installazione Docker remota (non all'interno del cluster di un ambiente). In un ambiente Cloud Composer, l'operatore
non ha accesso ai daemon Docker.
Usa invece KubernetesPodOperator
o
GKEStartPodOperator
. Questi operatori avviano i pod Kubernetes
rispettivamente in cluster Kubernetes o GKE. Tieni presente che sconsigliamo di avviare pod nel cluster di un ambiente, perché ciò può portare a concorrenza tra risorse.
Dovrei usare SubDagOperator
?
Ti consigliamo di non utilizzare SubDagOperator
.
Utilizza le alternative come suggerito in Raggruppamento delle attività.
Devo eseguire il codice Python solo in PythonOperators
per separare completamente gli operatori Python?
A seconda del tuo obiettivo, hai a disposizione alcune opzioni.
Se il tuo unico problema è mantenere dipendenze Python separate, puoi utilizzare PythonVirtualenvOperator
.
Valuta la possibilità di utilizzare il KubernetesPodOperator
. Questo operatore consente di definire i pod Kubernetes ed eseguirli in altri cluster.
Come faccio ad aggiungere pacchetti binari personalizzati o non PyPI?
Puoi installare pacchetti ospitati in repository di pacchetti privati.
Come posso passare in modo uniforme gli argomenti a un DAG e alle sue attività?
Puoi utilizzare il supporto integrato di Airflow per i modelli Jinja per passare argomenti che possono essere utilizzati nei campi basati su modelli.
Quando avviene la sostituzione del modello?
La sostituzione dei modelli avviene sui worker di Airflow appena prima della chiamata della funzione pre_execute
di un operatore. In pratica, ciò significa che i modelli
vengono sostituiti solo poco prima dell'esecuzione di un'attività.
Come faccio a sapere quali argomenti dell'operatore supportano la sostituzione del modello?
Gli argomenti dell'operatore che supportano la sostituzione del modello Jinja2 sono contrassegnati esplicitamente come tali.
Cerca il campo template_fields
nella definizione dell'operatore, che contiene un elenco di nomi di argomenti da sostituire con il modello.
Ad esempio, consulta BashOperator
, che supporta la creazione di modelli per gli argomenti bash_command
e env
.
Passaggi successivi
- Risoluzione dei problemi dei DAG
- Pianificazione dei problemi
- Operatori Google
- Operatori Google Cloud
- Tutorial su Apache Airflow