Acione DAGs noutros ambientes e projetos

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página demonstra como implementar um DAG que aciona DAGs noutros ambientes e projetos do Cloud Composer através de operadores do Airflow para o Cloud Composer.

Se, em alternativa, quiser acionar DAGs no seu ambiente, consulte o artigo Agende e acione DAGs.

Configure as autorizações de IAM

Se o ambiente de destino estiver localizado noutro projeto, a conta de serviço do seu ambiente precisa de funções que permitam a interação com ambientes nesse projeto.

Projeto Recurso Principal Função
Projeto onde o ambiente de destino está localizado Projeto Conta de serviço do ambiente de origem Função Composer Worker (composer.worker)
Projeto onde o ambiente de destino está localizado Projeto Conta de serviço do ambiente de origem Uma função personalizada com a autorização composer.environments.executeAirflowCommand

Acione um DAG noutro ambiente

O DAG de exemplo descrito nesta secção faz o seguinte:

  1. Acionar um DAG noutro ambiente do Cloud Composer.
  2. Verifica se uma execução de DAG está concluída.

Depois de a DAG ser executada noutro ambiente, a DAG de exemplo é marcada como bem-sucedida.

Execute comandos da CLI do Airflow com CloudComposerRunAirflowCLICommandOperator

Pode usar o operador CloudComposerRunAirflowCLICommandOperator para executar comandos da CLI do Airflow noutro ambiente do Cloud Composer. O DAG de exemplo executa o comando dags trigger, que aciona um DAG.

Este operador pode ser executado no modo diferível. Pode ativá-lo definindo o parâmetro deferrable como True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Verifique se uma execução de DAG está concluída

Pode usar o sensor CloudComposerDAGRunSensor para verificar se uma execução de DAG está concluída noutro ambiente do Cloud Composer.

Este sensor pode ser executado no modo diferível. Pode ativá-lo definindo o parâmetro deferrable como True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Código de exemplo completo

Segue-se o exemplo de código completo de um DAG que combina as duas tarefas descritas anteriormente.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

O que se segue?