Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina descrive come raggruppare le attività nelle pipeline Airflow utilizzando i seguenti pattern di progettazione:
- Raggruppamento delle attività nel grafico DAG.
- Attivazione dei DAG secondari da un DAG padre.
- Raggruppamento delle attività con l'operatore
TaskGroup
(solo con Airflow 2).
Raggruppamento delle attività nel grafico DAG
Per raggruppare le attività in determinate fasi della pipeline, puoi utilizzare le relazioni tra le attività nel file DAG.
Considera l'esempio seguente:
In questo flusso di lavoro, le attività op-1
e op-2
vengono eseguite insieme dopo il
l'attività start
.
Puoi farlo raggruppando le attività con l'istruzione
start >> [task_1, task_2]
.
Il seguente codice fornisce un'implementazione completa del DAG sopra:
Flusso d'aria 2
Airflow 1
Attivazione dei DAG secondari da un DAG padre
Puoi attivare un DAG da un altro DAG con il
TriggerDagRunOperator
.
A seconda della versione di Airflow, puoi trovare
l'operatore TriggerDagRunOperator
in un altro modulo:
- Per Airflow 1.10.*, utilizza il
modulo
airflow.operators.dagrun_operator
. - Per Airflow 2, utilizza
airflow.operators.trigger_dagrun
in maggior dettaglio più avanti in questo modulo.
Considera il seguente esempio:
In questo flusso di lavoro, i blocchi dag_1
e dag_2
rappresentano una serie di attività
raggruppati in un DAG separato in Cloud Composer
completamente gestito di Google Cloud.
L'implementazione di questo flusso di lavoro richiede due file DAG separati. Il file DAG di controllo ha il seguente aspetto:
Airflow 2
Flusso d'aria 1
L'implementazione del DAG secondario, attivato dal DAG di controllo, è la seguente:
Airflow 2
Flusso d'aria 1
Devi caricare entrambi i file DAG nel tuo ambiente Cloud Composer per far funzionare il flusso di lavoro.
Raggruppamento delle attività con l'operatore TaskGroup
In Airflow 2, puoi utilizzare
Operatore TaskGroup
per raggruppare le attività nel tuo DAG.
Le attività definite all'interno di un blocco TaskGroup
fanno comunque parte del DAG principale.
Considera l'esempio seguente:
Le attività op-1
e op-2
sono raggruppate in un blocco con ID
taskgroup_1
.
Un'implementazione di questo flusso di lavoro ha il seguente codice: