다른 환경 및 프로젝트에서 DAG 트리거

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 Cloud Composer용 Airflow 연산자를 사용하여 다른 Cloud Composer 환경 및 프로젝트에서 DAG를 트리거하는 DAG를 구현하는 방법을 보여줍니다.

대신 환경에서 DAG를 트리거하려면 DAG 예약 및 트리거를 참고하세요.

IAM 권한 구성

타겟 환경이 다른 프로젝트에 있는 경우 환경의 서비스 계정에 해당 프로젝트의 환경과 상호작용할 수 있는 역할이 필요합니다.

프로젝트 리소스 주 구성원 역할
대상 환경이 있는 프로젝트 프로젝트 원본 환경의 환경 서비스 계정 Composer 작업자 역할 (composer.worker)
대상 환경이 있는 프로젝트 프로젝트 원본 환경의 환경 서비스 계정 composer.environment.executeAirflowCommand 권한이 있는 커스텀 역할

다른 환경에서 DAG 트리거

이 섹션에 설명된 DAG 예시는 다음을 실행합니다.

  1. 다른 Cloud Composer 환경에서 DAG를 트리거합니다.
  2. DAG 실행이 완료되었는지 확인합니다.

다른 환경에서 DAG 실행이 완료되면 예시 DAG가 성공으로 표시됩니다.

CloudComposerRunAirflowCLICommandOperator로 Airflow CLI 명령어 실행

CloudComposerRunAirflowCLICommandOperator 연산자를 사용하여 다른 Cloud Composer 환경에서 Airflow CLI 명령어를 실행할 수 있습니다. 이 DAG 예시는 DAG를 트리거하는 dags trigger 명령어를 실행합니다.

이 연산자는 지연 가능한 모드에서 실행할 수 있으며 deferrable 매개변수를 True로 설정하여 사용 설정할 수 있습니다.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

DAG 실행 완료 여부 확인

CloudComposerDAGRunSensor 센서를 사용하여 다른 Cloud Composer 환경에서 DAG 실행이 완료되었는지 확인할 수 있습니다.

이 센서는 지연 모드로 실행할 수 있으며 deferrable 매개변수를 True로 설정하여 사용 설정할 수 있습니다.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

전체 예시 코드

다음은 앞에서 설명한 두 태스크를 결합한 DAG의 전체 코드 예입니다.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

다음 단계