Acionar DAGs em outros ambientes e projetos

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página demonstra como implementar um DAG que aciona DAGs em outros ambientes e projetos do Cloud Composer usando operadores do Airflow para o Cloud Composer.

Se você quiser acionar DAGs no seu ambiente, consulte Programar e acionar DAGs.

configure as permissões do IAM

Se o ambiente de destino estiver localizado em outro projeto, a conta de serviço do seu ambiente precisará de funções que permitam interagir com os ambientes nesse projeto.

Projeto Recurso Principal Papel
Projeto em que o ambiente de destino está localizado Projeto Conta de serviço do ambiente de origem Papel de Worker do Composer (composer.worker)
Projeto em que o ambiente de destino está localizado Projeto Conta de serviço do ambiente de origem Um papel personalizado com a permissão composer.environment.executeAirflowCommand

Acionar um DAG em outro ambiente

O exemplo de DAG descrito nesta seção faz o seguinte:

  1. Acionar um DAG em outro ambiente do Cloud Composer.
  2. Verifica se uma execução de DAG foi concluída.

Depois que a execução do DAG em outro ambiente for concluída, o DAG de exemplo será marcado como concluído.

Executar comandos da CLI do Airflow com o CloudComposerRunAirflowCLICommandOperator

Você pode usar o operador CloudComposerRunAirflowCLICommandOperator para executar comandos da CLI do Airflow em outro ambiente do Cloud Composer. O DAG de exemplo executa o comando dags trigger, que aciona um DAG.

Esse operador pode ser executado no modo de transferência. Para ativá-lo, defina o parâmetro deferrable como True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Verificar se uma execução de DAG foi concluída

Use o sensor CloudComposerDAGRunSensor para verificar se uma execução de DAG foi concluída em outro ambiente do Cloud Composer.

Esse sensor pode ser executado no modo de transferência. Para ativá-lo, defina o parâmetro deferrable como True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Exemplo de código completo

Confira a seguir o exemplo completo de código de um DAG que combina as duas tarefas descritas anteriormente.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

A seguir