Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
이 페이지에서는 Cloud Composer용 Airflow 연산자를 사용하여 다른 Cloud Composer 환경 및 프로젝트에서 DAG를 트리거하는 DAG를 구현하는 방법을 보여줍니다.
대신 환경에서 DAG를 트리거하려면 DAG 예약 및 트리거를 참고하세요.
IAM 권한 구성
타겟 환경이 다른 프로젝트에 있는 경우 환경의 서비스 계정에 해당 프로젝트의 환경과 상호작용할 수 있는 역할이 필요합니다.
프로젝트 | 리소스 | 주 구성원 | 역할 |
---|---|---|---|
대상 환경이 있는 프로젝트 | 프로젝트 | 원본 환경의 환경 서비스 계정 |
Composer 작업자 역할 (composer.worker ) |
대상 환경이 있는 프로젝트 | 프로젝트 | 원본 환경의 환경 서비스 계정 |
composer.environment.executeAirflowCommand 권한이 있는 커스텀 역할 |
다른 환경에서 DAG 트리거
이 섹션에 설명된 DAG 예시는 다음을 실행합니다.
- 다른 Cloud Composer 환경에서 DAG를 트리거합니다.
- DAG 실행이 완료되었는지 확인합니다.
다른 환경에서 DAG 실행이 완료되면 예시 DAG가 성공으로 표시됩니다.
CloudComposerRunAirflowCLICommandOperator로 Airflow CLI 명령어 실행
CloudComposerRunAirflowCLICommandOperator 연산자를 사용하여 다른 Cloud Composer 환경에서 Airflow CLI 명령어를 실행할 수 있습니다. 이 DAG 예시는 DAG를 트리거하는 dags trigger
명령어를 실행합니다.
이 연산자는 지연 가능한 모드에서 실행할 수 있으며 deferrable
매개변수를 True
로 설정하여 사용 설정할 수 있습니다.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
DAG 실행 완료 여부 확인
CloudComposerDAGRunSensor 센서를 사용하여 다른 Cloud Composer 환경에서 DAG 실행이 완료되었는지 확인할 수 있습니다.
이 센서는 지연 모드로 실행할 수 있으며 deferrable
매개변수를 True
로 설정하여 사용 설정할 수 있습니다.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
전체 예시 코드
다음은 앞에서 설명한 두 태스크를 결합한 DAG의 전체 코드 예입니다.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor