Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página demonstra como implementar um DAG que aciona DAGs em outros ambientes e projetos do Cloud Composer usando operadores do Airflow para o Cloud Composer.
Se você quiser acionar DAGs no seu ambiente, consulte Programar e acionar DAGs.
configure as permissões do IAM
Se o ambiente de destino estiver localizado em outro projeto, a conta de serviço do seu ambiente precisará de funções que permitam interagir com os ambientes nesse projeto.
Projeto | Recurso | Principal | Papel |
---|---|---|---|
Projeto em que o ambiente de destino está localizado | Projeto | Conta de serviço do ambiente de origem |
Papel de Worker do Composer (composer.worker ) |
Projeto em que o ambiente de destino está localizado | Projeto | Conta de serviço do ambiente de origem |
Um papel personalizado com a
permissão composer.environment.executeAirflowCommand |
Acionar um DAG em outro ambiente
O exemplo de DAG descrito nesta seção faz o seguinte:
- Acionar um DAG em outro ambiente do Cloud Composer.
- Verifica se uma execução de DAG foi concluída.
Depois que a execução do DAG em outro ambiente for concluída, o DAG de exemplo será marcado como concluído.
Executar comandos da CLI do Airflow com o CloudComposerRunAirflowCLICommandOperator
Você pode usar o operador
CloudComposerRunAirflowCLICommandOperator
para executar comandos da CLI do Airflow em outro ambiente do Cloud Composer. O DAG de exemplo executa o comando dags trigger
, que
aciona um DAG.
Esse operador pode ser executado no modo de transferência. Para
ativá-lo, defina o parâmetro deferrable
como True
.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Verificar se uma execução de DAG foi concluída
Você pode usar o sensor CloudComposerDAGRunSensor para verificar se uma execução de DAG foi concluída em outro ambiente do Cloud Composer.
Esse sensor pode ser executado no modo de transferência. Para
ativá-lo, defina o parâmetro deferrable
como True
.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Exemplo de código completo
Confira a seguir o exemplo completo de código de um DAG que combina as duas tarefas descritas anteriormente.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor