Cloud Composer 1 | Cloud Composer 2
Questa guida mostra come scrivere un grafo diretto aciclico (DAG) di Apache Airflow eseguito in un ambiente Cloud Composer.
Poiché Apache Airflow non fornisce un isolamento elevato di DAG e attività, ti consigliamo di utilizzare ambienti di produzione e di test separati per evitare interferenze DAG. Per maggiori informazioni, consulta la pagina Test dei DAG.
Struttura di un DAG Airflow
Un DAG Airflow viene definito in un file Python ed è composto dai seguenti componenti:
- Definizione di DAG
- Operatori Airflow
- Relazioni con gli operatori
I seguenti snippet di codice mostrano esempi di ogni componente fuori contesto.
Definizione di DAG
L'esempio seguente illustra una definizione di DAG:
Flusso d'aria 2
Flusso d'aria 1
Operatori e attività
Gli operatori descrivono il lavoro da svolgere. Un'attività è un'istanza specifica di un operatore.
Flusso d'aria 2
Flusso d'aria 1
Relazioni attività
Le relazioni tra le attività descrivono l'ordine in cui il lavoro deve essere completato.
Flusso d'aria 2
Flusso d'aria 1
Esempio di flusso di lavoro DAG completo in Python
Il seguente flusso di lavoro è un modello DAG funzionante completo composto da due attività: un'attività hello_python
e un'attività goodbye_bash
:
Flusso d'aria 2
Flusso d'aria 1
Per ulteriori informazioni sulla definizione dei DAG di Airflow, consulta il tutorial di Airflow e i concetti di Airflow.
Operatori Airflow
I seguenti esempi mostrano alcuni tra gli operatori Airflow più diffusi. Per un riferimento autorevole degli operatori Airflow, consulta gli articoli Riferimento per gli operatori e gli hook e l'Indice provider.
BashOperator
Utilizza BashOperator per eseguire programmi a riga di comando.
Flusso d'aria 2
Flusso d'aria 1
Cloud Composer esegue i comandi forniti in uno script Bash su un worker Airflow. Il worker è un container Docker basato su Debian e include diversi pacchetti.
PythonOperator
Utilizza PythonOperator per eseguire codice Python arbitrario.
Cloud Composer esegue il codice Python in un container che include pacchetti per la versione dell'immagine di Cloud Composer utilizzata nel tuo ambiente.
Per installare pacchetti Python aggiuntivi, consulta la pagina relativa all'installazione delle dipendenze Python.
Operatori di Google Cloud
Per eseguire attività che utilizzano i prodotti Google Cloud, utilizza gli operatori di Google Cloud Airflow. Ad esempio, gli operatori di BigQuery eseguono query ed elaborano dati in BigQuery.
Esistono molti altri operatori Airflow per Google Cloud e singoli servizi forniti da Google Cloud. Per l'elenco completo, consulta gli Operatori di Google Cloud.
Flusso d'aria 2
Flusso d'aria 1
EmailOperator
Utilizza EmailOperator per inviare email da un DAG. Per inviare email da un ambiente Cloud Composer, devi configurare il tuo ambiente per l'utilizzo di SendGrid.
Flusso d'aria 2
Flusso d'aria 1
Notifiche in caso di errore dell'operatore
Imposta email_on_failure
su True
per inviare una notifica via email in caso di errore di un operatore nel DAG. Per inviare notifiche email da un ambiente Cloud Composer, devi configurare il tuo ambiente per l'utilizzo di SendGrid.
Flusso d'aria 2
Flusso d'aria 1
Linee guida per il flusso di lavoro dei DAG
Inserisci eventuali librerie Python personalizzate in un archivio ZIP di un DAG all'interno di una directory nidificata. Non posizionare le librerie al livello superiore della directory dei DAG.
Quando Airflow analizza la cartella
dags/
, Airflow controlla solo la presenza di DAG nei moduli Python che si trovano al primo livello della cartella DAG e nel livello superiore di un archivio ZIP, anch'esso nella cartelladags/
di primo livello. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene entrambe le sottostringheairflow
eDAG
, Airflow smette di elaborare l'archivio ZIP. Airflow restituisce solo i DAG trovati fino a quel momento.Utilizza Airflow 2 anziché Airflow 1.
La community di Airflow non pubblica più nuove release secondarie o patch per Airflow 1.
Per la tolleranza di errore, non definire più oggetti DAG nello stesso modulo Python.
Non utilizzare SubDAG. Puoi invece raggruppare le attività all'interno dei DAG.
Inserisci i file necessari al momento dell'analisi DAG nella cartella
dags/
, non nella cartelladata/
.Testa i DAG sviluppati o modificati come consigliato nelle istruzioni per i test dei DAG.
Verifica che i DAG sviluppati non aumentino troppo i tempi di analisi DAG.
Le attività di Airflow possono non riuscire per diversi motivi. Per evitare errori di intere esecuzioni di DAG, ti consigliamo di abilitare i nuovi tentativi delle attività. L'impostazione del numero massimo di tentativi su
0
significa che non vengono eseguiti nuovi tentativi.Ti consigliamo di eseguire l'override dell'opzione
default_task_retries
con un valore per l'attività ritirato diverso da0
. Inoltre, puoi impostare il parametroretries
a livello di attività.Se vuoi utilizzare una GPU nelle attività Airflow, crea un cluster GKE separato in base ai nodi che utilizzano macchine con GPU. Utilizza GKEStartPodOperator per eseguire le tue attività.
Evita di eseguire attività che utilizzano molta CPU e memoria nel pool di nodi del cluster, dove sono in esecuzione altri componenti Airflow (pianificatori, worker, server web). Utilizza invece KubernetesPodOperator o GKEStartPodOperator.
Quando esegui il deployment di DAG in un ambiente, carica solo i file assolutamente necessari per l'interpretazione e l'esecuzione dei DAG nella cartella
/dags
.Limita il numero di file DAG nella cartella
/dags
.Airflow esegue l'analisi continua dei DAG nella cartella
/dags
. L'analisi è un processo che si ripete nella cartella DAG e il numero di file che devono essere caricati (con le relative dipendenze) determina un impatto sulle prestazioni dell'analisi dei DAG e della pianificazione delle attività. Utilizzare 100 file con 100 DAG ciascuno è molto più efficiente rispetto a 10.000 file con 1 DAG ciascuno, pertanto è consigliabile eseguire questa ottimizzazione. Questa ottimizzazione è un equilibrio tra tempo di analisi e efficienza della creazione e della gestione dei DAG.Puoi anche prendere in considerazione, ad esempio, per eseguire il deployment di 10.000 file DAG, puoi creare 100 file ZIP ciascuno contenente 100 file DAG.
Oltre ai suggerimenti sopra riportati, se hai più di 10.000 file DAG, generare i DAG in modo programmatico potrebbe essere una buona opzione. Ad esempio, puoi implementare un singolo file DAG Python che generi un determinato numero di oggetti DAG, ad esempio 20.100 oggetti DAG.
Evita di utilizzare operatori Airflow deprecati
Gli operatori elencati nella seguente tabella sono deprecati. Evita di utilizzarle nei DAG. Utilizza invece le alternative aggiornate fornite.
Operatore obsoleto | Operatore da utilizzare |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator, MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
Domande frequenti per la scrittura di DAG
Come posso ridurre al minimo la ripetizione del codice se voglio eseguire le stesse attività o attività simili in più DAG?
Ti suggeriamo di definire le librerie e i wrapper per ridurre al minimo la ripetizione del codice.
Come si riutilizza il codice tra i file DAG?
Inserisci le funzioni di utilità in una libreria Python locale e importale. Puoi fare riferimento alle funzioni in qualsiasi DAG situato nella cartella dags/
nel bucket del tuo ambiente.
Come faccio a ridurre al minimo il rischio che possano derivare definizioni diverse?
Ad esempio, hai due team che vogliono aggregare i dati non elaborati in metriche relative alle entrate. I team scrivono due attività leggermente diverse che portano a termine la stessa cosa. Definisci le librerie per utilizzare i dati sulle entrate in modo che gli implementanti dei DAG siano tenuti a chiarire la definizione di entrate aggregate.
Come faccio a impostare le dipendenze tra i DAG?
Ciò dipende da come vuoi definire la dipendenza.
Se hai due DAG (DAG A e DAG B) e vuoi che il DAG B si attivi dopo il DAG
A, puoi inserire un
TriggerDagRunOperator
alla fine di
Dag A.
Se il DAG B dipende solo da un artefatto generato dal DAG A, ad esempio un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.
Se il DAG B è integrato strettamente con il DAG A, potresti essere in grado di unire i due DAG in un unico DAG.
Come posso passare ID di esecuzione univoci a un DAG e alle relative attività?
Ad esempio, vuoi passare i nomi dei cluster Dataproc e i percorsi dei file.
Puoi generare un ID univoco casuale restituendo str(uuid.uuid4())
in
un PythonOperator
. L'ID viene inserito in XComs
in modo che tu possa fare riferimento all'ID in altri operatori tramite campi basati su modelli.
Prima di generare un uuid
, valuta se un ID specifico di DagRun sarebbe
più utile. Puoi anche fare riferimento a questi ID nelle sostituzioni Jinja utilizzando le macro.
Come posso separare le attività in un DAG?
Ogni attività dovrebbe essere un'unità di lavoro idempotente. Di conseguenza, dovresti evitare di incapsulare un flusso di lavoro a più passaggi all'interno di una singola attività, ad esempio un programma complesso eseguito in un PythonOperator
.
Devo definire più attività in un singolo DAG per aggregare i dati di più origini?
Ad esempio, hai più tabelle con dati non elaborati e vuoi creare dati aggregati giornalieri per ogni tabella. Le attività non dipendono l'una dall'altra. Devi creare un'attività e un DAG per ogni tabella o un DAG generale?
Se ogni attività può condividere le stesse proprietà a livello di DAG, ad esempio schedule_interval
, ha senso definire più attività in un singolo DAG. In caso contrario, per ridurre al minimo la ripetizione del codice, è possibile generare più DAG da un singolo modulo Python inserendoli nell'elemento globals()
del modulo.
Come posso limitare il numero di attività simultanee in esecuzione in un DAG?
Ad esempio, vuoi evitare di superare i limiti di utilizzo delle API/le quote o di eseguire troppi processi simultanei.
Puoi definire i pool di Airflow nell'interfaccia utente web di Airflow e associare le attività ai pool esistenti nei tuoi DAG.
Domande frequenti sull'utilizzo degli operatori
Devo usare DockerOperator
?
Non è consigliabile utilizzare DockerOperator
, a meno che non venga utilizzato per avviare i container in un'installazione Docker remota (non all'interno di un cluster di ambiente). In un ambiente Cloud Composer,
l'operatore non ha accesso ai daemon Docker.
Utilizza invece KubernetesPodOperator
o GKEStartPodOperator
. Questi operatori avviano i pod Kubernetes, rispettivamente,
in cluster Kubernetes o GKE. Tieni presente che è sconsigliato avviare pod in un cluster di ambiente, perché questo può portare alla concorrenza delle risorse.
Devo usare SubDagOperator
?
Sconsigliamo di utilizzare SubDagOperator
.
Utilizza le alternative suggerite nelle istruzioni sul raggruppamento delle attività.
Devo eseguire il codice Python solo in PythonOperators
per separare completamente gli operatori Python?
A seconda dell'obiettivo, hai a disposizione alcune opzioni.
Se il tuo unico problema è mantenere dipendenze Python separate, puoi utilizzare PythonVirtualenvOperator
.
Valuta la possibilità di utilizzare il KubernetesPodOperator
. Questo operatore consente di definire i pod Kubernetes e di eseguirli in altri cluster.
Come posso aggiungere pacchetti binari personalizzati o non PyPI?
Puoi installare pacchetti ospitati in repository di pacchetti privati.
Puoi anche
utilizzare KubernetesPodOperator
per eseguire un pod Kubernetes con la tua immagine creata con pacchetti personalizzati.
Come posso passare gli argomenti in modo uniforme a un DAG e alle relative attività?
Puoi utilizzare il supporto integrato di Airflow per i modelli Jinja al fine di trasmettere argomenti che possono essere utilizzati nei campi basati su modelli.
Quando avviene la sostituzione dei modelli?
La sostituzione del modello avviene sui worker Airflow appena prima della chiamata della funzione pre_execute
di un operatore. In pratica, ciò significa che i modelli non
vengono sostituiti prima dell'esecuzione di un'attività.
Come faccio a sapere quali argomenti dell'operatore supportano la sostituzione del modello?
Gli argomenti dell'operatore che supportano la sostituzione del modello Jinja2 sono esplicitamente contrassegnati come tali.
Cerca il campo template_fields
nella definizione dell'operatore, che contiene un elenco di nomi di argomenti che vengono sottoposti a sostituzione del modello.
Ad esempio, consulta BashOperator
, che supporta la creazione di modelli per gli argomenti bash_command
e env
.
Passaggi successivi
- Risoluzione dei problemi dei DAG
- Strumento di pianificazione della risoluzione dei problemi
- Operatori Google
- Operatori Google Cloud
- Tutorial di Apache Airflow
- Interfaccia pubblica di Airflow
- Operatori di Airflow principali su GitHub. Controlla il ramo per la release Airflow.
- Operatori di pacchetti del provider su GitHub. Controlla il ramo per la release Airflow.