run_airflow_cli_cmd=CloudComposerRunAirflowCLICommandOperator(task_id="run_airflow_cli_cmd",project_id="target-project",environment_id="target-composer-environment",region="us-central1",command="dags trigger -- target_dag",# You can run this operator in the deferrable mode:# deferrable=True)
dag_run_sensor=CloudComposerDAGRunSensor(task_id="dag_run_sensor",project_id="target-project",environment_id="target-composer-environment",region="us-central1",composer_dag_id="target_dag",allowed_states=["success"],# You can run this sensor in the deferrable mode:# deferrable=True)
完整程式碼範例
以下是 DAG 的完整程式碼範例,其中結合了先前說明的兩項工作。
fromdatetimeimportdatetime,timedeltafromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.cloud_composerimport(CloudComposerRunAirflowCLICommandOperator,)fromairflow.providers.google.cloud.sensors.cloud_composerimportCloudComposerDAGRunSensorDAG_ID="trigger_dag_in_another_composer_environment"TARGET_PROJECT_ID="example-target-project"TARGET_REGION="example-target-region"TARGET_ENV_ID="example-target-composer-environment"TARGET_DAG="example_target_dag_id"COMMAND=f"dags trigger -- {TARGET_DAG}"withDAG(DAG_ID,schedule="@once",start_date=datetime(2024,1,1),catchup=False,tags=["example","composer"],)asdag:run_airflow_cli_cmd=CloudComposerRunAirflowCLICommandOperator(task_id="run_airflow_cli_cmd",project_id=TARGET_PROJECT_ID,environment_id=TARGET_ENV_ID,region=TARGET_REGION,command=COMMAND,# You can run this operator in the deferrable mode:# deferrable=True)dag_run_sensor=CloudComposerDAGRunSensor(task_id="dag_run_sensor",project_id=TARGET_PROJECT_ID,environment_id=TARGET_ENV_ID,region=TARGET_REGION,composer_dag_id=TARGET_DAG,allowed_states=["success"],execution_range=timedelta(minutes=5),# You can run this sensor in the deferrable mode:# deferrable=True)run_airflow_cli_cmd >> dag_run_sensor
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["上次更新時間:2025-09-03 (世界標準時間)。"],[[["\u003cp\u003eThis guide details how to use Airflow operators within Cloud Composer 2 to trigger and monitor DAGs in other Cloud Composer environments or projects.\u003c/p\u003e\n"],["\u003cp\u003eWhen triggering DAGs in a different project, the source environment's service account requires specific IAM permissions, such as the "Composer Worker" role and a custom role with \u003ccode\u003ecomposer.environment.executeAirflowCommand\u003c/code\u003e permission in the target project.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eCloudComposerRunAirflowCLICommandOperator\u003c/code\u003e is utilized to execute Airflow CLI commands, like \u003ccode\u003edags trigger\u003c/code\u003e, to initiate a DAG in a target environment, and it can be run in deferrable mode for efficiency.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eCloudComposerDAGRunSensor\u003c/code\u003e monitors the status of a DAG run in another environment, checking for completion and allowing specification of allowed states, and it can be run in deferrable mode.\u003c/p\u003e\n"],["\u003cp\u003eThe provided full code example demonstrates how to combine the \u003ccode\u003eCloudComposerRunAirflowCLICommandOperator\u003c/code\u003e and \u003ccode\u003eCloudComposerDAGRunSensor\u003c/code\u003e to trigger a DAG and then monitor its successful completion in a separate Cloud Composer environment.\u003c/p\u003e\n"]]],[],null,["\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n[Cloud Composer 3](/composer/docs/composer-3/trigger-dags-in-other-environments \"View this page for Cloud Composer 3\") \\| **Cloud Composer 2** \\| Cloud Composer 1\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page demonstrates how to implement a DAG that triggers DAGs in other\nCloud Composer environments and projects by using Airflow operators\nfor Cloud Composer.\n\nIf you want to trigger DAGs in your environment instead, see\n[Schedule and trigger DAGs](/composer/docs/composer-2/schedule-and-trigger-dags).\n\nConfigure IAM permissions\n\n**If the target environment is located in another project**, then the service\naccount of your environment needs roles that allows interacting with\nenvironments in that project.\n| **Note:** To grant a single permission, [create a custom role](/iam/docs/creating-custom-roles#creating) in the target project, add the permission to it, and then [grant this role](/iam/docs/granting-changing-revoking-access#single-role) to a principal.\n\n| Project | Resource | Principal | Role |\n|-------------------------------------------------|----------|---------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|\n| Project where the target environment is located | Project | Environment's service account of the source environment | **Composer Worker** role (`composer.worker`) |\n| Project where the target environment is located | Project | Environment's service account of the source environment | A [custom role](/iam/docs/creating-custom-roles#creating) with the `composer.environments.executeAirflowCommand` permission |\n\nTrigger a DAG in another environment\n\nThe example DAG described in this section does the following:\n\n1. Trigger a DAG in another Cloud Composer environment.\n2. Checks if a DAG run is completed.\n\nAfter the DAG run in another environment is completed, the example DAG is\nmarked as successful.\n| **Note:** Google provides more [Airflow operators](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/cloud_composer/index.html), [sensors](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/cloud_composer/index.html), [triggers](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/triggers/cloud_composer/index.html), and [hooks](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/cloud_composer/index.html) that work with Cloud Composer environments. For example, you can create, update and delete environments with these operators.\n\nRun Airflow CLI commands with CloudComposerRunAirflowCLICommandOperator\n\nYou can use the\n[CloudComposerRunAirflowCLICommandOperator](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/cloud_composer/index.html#airflow.providers.google.cloud.operators.cloud_composer.CloudComposerRunAirflowCLICommandOperator)\noperator to run Airflow CLI commands in another Cloud Composer\nenvironment. The example DAG executes the `dags trigger` command, which\ntriggers a DAG.\n\nThis operator can run in the [deferrable mode](/composer/docs/composer-2/use-deferrable-operators), you\ncan enable it by setting the `deferrable` parameter to `True`.\n**Important:** This operator consumes the `environments.executeAirflowCommand` [quota](/composer/quotas). If you want to **trigger a large number of DAGs** , we recommend to do it [with Airflow REST API](/composer/docs/composer-2/access-airflow-api#call-api). Triggering DAGs through CloudComposerRunAirflowCLICommandOperator might cause your environment to reach the quota limit, and as a result, Airflow CLI commands will no longer be executed. \n\n run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(\n task_id=\"run_airflow_cli_cmd\",\n project_id=\"target-project\",\n environment_id=\"target-composer-environment\",\n region=\"us-central1\",\n command=\"dags trigger -- target_dag\",\n # You can run this operator in the deferrable mode:\n # deferrable=True\n )\n\nCheck if a DAG run is completed\n\nYou can use the [CloudComposerDAGRunSensor](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/cloud_composer/index.html#airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerDAGRunSensor)\nsensor to checks if a DAG run is completed in another Cloud Composer\nenvironment.\n\nThis sensor can run in the [deferrable mode](/composer/docs/composer-2/use-deferrable-operators), you can\nenable it by setting the `deferrable` parameter to `True`. \n\n dag_run_sensor = CloudComposerDAGRunSensor(\n task_id=\"dag_run_sensor\",\n project_id=\"target-project\",\n environment_id=\"target-composer-environment\",\n region=\"us-central1\",\n composer_dag_id=\"target_dag\",\n allowed_states=[\"success\"],\n # You can run this sensor in the deferrable mode:\n # deferrable=True\n )\n\nFull example code\n\nThe following is the full code example of a DAG that combines the two\npreviously described tasks.\n**Note:** If the target environment is located in another project, make sure to [configure IAM permissions](#iam-permissions) before running this DAG. \n\n from datetime import datetime, timedelta\n from airflow.models.dag import DAG\n from airflow.providers.google.cloud.operators.cloud_composer import (\n CloudComposerRunAirflowCLICommandOperator,\n )\n from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor\n\n DAG_ID = \"trigger_dag_in_another_composer_environment\"\n\n TARGET_PROJECT_ID = \"example-target-project\"\n TARGET_REGION = \"example-target-region\"\n TARGET_ENV_ID = \"example-target-composer-environment\"\n\n TARGET_DAG = \"example_target_dag_id\"\n COMMAND = f\"dags trigger -- {TARGET_DAG}\"\n\n with DAG(\n DAG_ID,\n schedule=\"@once\",\n start_date=datetime(2024, 1, 1),\n catchup=False,\n tags=[\"example\", \"composer\"],\n ) as dag:\n\n run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(\n task_id=\"run_airflow_cli_cmd\",\n project_id=TARGET_PROJECT_ID,\n environment_id=TARGET_ENV_ID,\n region=TARGET_REGION,\n command=COMMAND,\n # You can run this operator in the deferrable mode:\n # deferrable=True\n )\n\n dag_run_sensor = CloudComposerDAGRunSensor(\n task_id=\"dag_run_sensor\",\n project_id=TARGET_PROJECT_ID,\n environment_id=TARGET_ENV_ID,\n region=TARGET_REGION,\n composer_dag_id=TARGET_DAG,\n allowed_states=[\"success\"],\n execution_range=timedelta(minutes=5),\n # You can run this sensor in the deferrable mode:\n # deferrable=True\n )\n\n run_airflow_cli_cmd \u003e\u003e dag_run_sensor\n\nWhat's next\n\n- [Schedule and trigger DAGs](/composer/docs/composer-2/schedule-and-trigger-dags)\n- [Access control with IAM](/composer/docs/composer-2/access-control)"]]