Cloud Composer 1 | Cloud Composer 2
In questa pagina viene descritto come raggruppare le attività nelle pipeline di Airflow utilizzando i seguenti pattern di progettazione:
- Raggruppare le attività nel grafico DAG.
- Attivazione di DAG secondari da un DAG padre.
- Raggruppamento delle attività con l'operatore
TaskGroup
(solo con Airflow 2).
Raggruppamento delle attività nel grafico DAG
Per raggruppare le attività in determinate fasi della pipeline, puoi utilizzare le relazioni tra le attività nel file DAG.
Considera l'esempio seguente:
In questo flusso di lavoro, le attività op-1
e op-2
vengono eseguite insieme dopo l'attività iniziale start
.
Puoi ottenere questo risultato raggruppando le attività con l'istruzione
start >> [task_1, task_2]
.
Il seguente codice fornisce un'implementazione completa del DAG riportato sopra:
Airflow 2
Airflow 1
Attivazione di DAG secondari da un DAG padre
Puoi attivare un DAG da un altro DAG con
TriggerDagRunOperator
.
A seconda della versione di Airflow, puoi trovare
l'operatore TriggerDagRunOperator
in un modulo diverso:
- Per Airflow 1.10.*, utilizza il modulo
airflow.operators.dagrun_operator
. - Per Airflow 2, utilizza il modulo
airflow.operators.trigger_dagrun
.
Considera l'esempio seguente:
In questo flusso di lavoro, i blocchi dag_1
e dag_2
rappresentano una serie di attività raggruppate in un DAG separato nell'ambiente Cloud Composer.
L'implementazione di questo flusso di lavoro richiede due file DAG separati. Il file DAG di controllo è simile al seguente:
Airflow 2
Airflow 1
L'implementazione del DAG secondario, che viene attivata dal DAG di controllo, ha il seguente aspetto:
Airflow 2
Airflow 1
Affinché il flusso di lavoro funzioni, devi caricare entrambi i file DAG nel tuo ambiente Cloud Composer.
Raggruppamento delle attività con l'operatore TaskGroup
In Airflow 2, puoi utilizzare l'operatore TaskGroup
per raggruppare le attività nel DAG.
Le attività definite all'interno di un blocco TaskGroup
fanno ancora parte del DAG principale.
Considera l'esempio seguente:
Le attività op-1
e op-2
sono raggruppate in un blocco con ID taskgroup_1
.
Un'implementazione di questo flusso di lavoro è simile al seguente codice: