Am 15. September 2026erreichen alle Cloud Composer 1- und Cloud Composer 2-Umgebungen der Version 2.0.x das geplante Ende des Lebenszyklus und können nicht mehr verwendet werden. Wir empfehlen, die Migration zu Cloud Composer 3 zu planen.
Auf dieser Seite wird gezeigt, wie Sie einen DAG implementieren, der DAGs in anderen Cloud Composer-Umgebungen und -Projekten mithilfe von Airflow-Operatoren für Cloud Composer auslöst.
Wenn Sie stattdessen DAGs in Ihrer Umgebung auslösen möchten, lesen Sie den Abschnitt DAGs planen und auslösen.
IAM-Berechtigungen konfigurieren
Wenn sich die Zielumgebung in einem anderen Projekt befindet, benötigt das Dienstkonto Ihrer Umgebung Rollen, die die Interaktion mit Umgebungen in diesem Projekt ermöglichen.
Die in diesem Abschnitt beschriebene Beispiel-DAG führt folgende Schritte aus:
Lösen Sie einen DAG in einer anderen Cloud Composer-Umgebung aus.
Prüft, ob eine DAG-Ausführung abgeschlossen ist.
Nachdem die DAG-Ausführung in einer anderen Umgebung abgeschlossen ist, wird die Beispiel-DAG als erfolgreich markiert.
Befehle der Airflow-Befehlszeile mit CloudComposerRunAirflowCLICommandOperator ausführen
Mit dem Operator CloudComposerRunAirflowCLICommandOperator können Sie Airflow-Befehlszeilenbefehle in einer anderen Cloud Composer-Umgebung ausführen. Im Beispiel-DAG wird der Befehl dags trigger ausgeführt, der einen DAG auslöst.
Dieser Operator kann im aufschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable auf True festlegen.
run_airflow_cli_cmd=CloudComposerRunAirflowCLICommandOperator(task_id="run_airflow_cli_cmd",project_id="target-project",environment_id="target-composer-environment",region="us-central1",command="dags trigger -- target_dag",# You can run this operator in the deferrable mode:# deferrable=True)
Prüfen, ob ein DAG-Lauf abgeschlossen ist
Mit dem Sensor CloudComposerDAGRunSensor können Sie prüfen, ob ein DAG-Lauf in einer anderen Cloud Composer-Umgebung abgeschlossen ist.
Dieser Sensor kann im aufschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable auf True setzen.
dag_run_sensor=CloudComposerDAGRunSensor(task_id="dag_run_sensor",project_id="target-project",environment_id="target-composer-environment",region="us-central1",composer_dag_id="target_dag",allowed_states=["success"],# You can run this sensor in the deferrable mode:# deferrable=True)
Vollständiges Codebeispiel
Im Folgenden finden Sie das vollständige Codebeispiel für einen DAG, in dem die beiden zuvor beschriebenen Aufgaben kombiniert werden.
fromdatetimeimportdatetime,timedeltafromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.cloud_composerimport(CloudComposerRunAirflowCLICommandOperator,)fromairflow.providers.google.cloud.sensors.cloud_composerimportCloudComposerDAGRunSensorDAG_ID="trigger_dag_in_another_composer_environment"TARGET_PROJECT_ID="example-target-project"TARGET_REGION="example-target-region"TARGET_ENV_ID="example-target-composer-environment"TARGET_DAG="example_target_dag_id"COMMAND=f"dags trigger -- {TARGET_DAG}"withDAG(DAG_ID,schedule="@once",start_date=datetime(2024,1,1),catchup=False,tags=["example","composer"],)asdag:run_airflow_cli_cmd=CloudComposerRunAirflowCLICommandOperator(task_id="run_airflow_cli_cmd",project_id=TARGET_PROJECT_ID,environment_id=TARGET_ENV_ID,region=TARGET_REGION,command=COMMAND,# You can run this operator in the deferrable mode:# deferrable=True)dag_run_sensor=CloudComposerDAGRunSensor(task_id="dag_run_sensor",project_id=TARGET_PROJECT_ID,environment_id=TARGET_ENV_ID,region=TARGET_REGION,composer_dag_id=TARGET_DAG,allowed_states=["success"],execution_range=timedelta(minutes=5),# You can run this sensor in the deferrable mode:# deferrable=True)run_airflow_cli_cmd >> dag_run_sensor
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2025-09-03 (UTC)."],[[["\u003cp\u003eThis guide details how to use Airflow operators within Cloud Composer 2 to trigger and monitor DAGs in other Cloud Composer environments or projects.\u003c/p\u003e\n"],["\u003cp\u003eWhen triggering DAGs in a different project, the source environment's service account requires specific IAM permissions, such as the "Composer Worker" role and a custom role with \u003ccode\u003ecomposer.environment.executeAirflowCommand\u003c/code\u003e permission in the target project.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eCloudComposerRunAirflowCLICommandOperator\u003c/code\u003e is utilized to execute Airflow CLI commands, like \u003ccode\u003edags trigger\u003c/code\u003e, to initiate a DAG in a target environment, and it can be run in deferrable mode for efficiency.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eCloudComposerDAGRunSensor\u003c/code\u003e monitors the status of a DAG run in another environment, checking for completion and allowing specification of allowed states, and it can be run in deferrable mode.\u003c/p\u003e\n"],["\u003cp\u003eThe provided full code example demonstrates how to combine the \u003ccode\u003eCloudComposerRunAirflowCLICommandOperator\u003c/code\u003e and \u003ccode\u003eCloudComposerDAGRunSensor\u003c/code\u003e to trigger a DAG and then monitor its successful completion in a separate Cloud Composer environment.\u003c/p\u003e\n"]]],[],null,["\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n[Cloud Composer 3](/composer/docs/composer-3/trigger-dags-in-other-environments \"View this page for Cloud Composer 3\") \\| **Cloud Composer 2** \\| Cloud Composer 1\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page demonstrates how to implement a DAG that triggers DAGs in other\nCloud Composer environments and projects by using Airflow operators\nfor Cloud Composer.\n\nIf you want to trigger DAGs in your environment instead, see\n[Schedule and trigger DAGs](/composer/docs/composer-2/schedule-and-trigger-dags).\n\nConfigure IAM permissions\n\n**If the target environment is located in another project**, then the service\naccount of your environment needs roles that allows interacting with\nenvironments in that project.\n| **Note:** To grant a single permission, [create a custom role](/iam/docs/creating-custom-roles#creating) in the target project, add the permission to it, and then [grant this role](/iam/docs/granting-changing-revoking-access#single-role) to a principal.\n\n| Project | Resource | Principal | Role |\n|-------------------------------------------------|----------|---------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|\n| Project where the target environment is located | Project | Environment's service account of the source environment | **Composer Worker** role (`composer.worker`) |\n| Project where the target environment is located | Project | Environment's service account of the source environment | A [custom role](/iam/docs/creating-custom-roles#creating) with the `composer.environments.executeAirflowCommand` permission |\n\nTrigger a DAG in another environment\n\nThe example DAG described in this section does the following:\n\n1. Trigger a DAG in another Cloud Composer environment.\n2. Checks if a DAG run is completed.\n\nAfter the DAG run in another environment is completed, the example DAG is\nmarked as successful.\n| **Note:** Google provides more [Airflow operators](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/cloud_composer/index.html), [sensors](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/cloud_composer/index.html), [triggers](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/triggers/cloud_composer/index.html), and [hooks](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/cloud_composer/index.html) that work with Cloud Composer environments. For example, you can create, update and delete environments with these operators.\n\nRun Airflow CLI commands with CloudComposerRunAirflowCLICommandOperator\n\nYou can use the\n[CloudComposerRunAirflowCLICommandOperator](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/cloud_composer/index.html#airflow.providers.google.cloud.operators.cloud_composer.CloudComposerRunAirflowCLICommandOperator)\noperator to run Airflow CLI commands in another Cloud Composer\nenvironment. The example DAG executes the `dags trigger` command, which\ntriggers a DAG.\n\nThis operator can run in the [deferrable mode](/composer/docs/composer-2/use-deferrable-operators), you\ncan enable it by setting the `deferrable` parameter to `True`.\n**Important:** This operator consumes the `environments.executeAirflowCommand` [quota](/composer/quotas). If you want to **trigger a large number of DAGs** , we recommend to do it [with Airflow REST API](/composer/docs/composer-2/access-airflow-api#call-api). Triggering DAGs through CloudComposerRunAirflowCLICommandOperator might cause your environment to reach the quota limit, and as a result, Airflow CLI commands will no longer be executed. \n\n run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(\n task_id=\"run_airflow_cli_cmd\",\n project_id=\"target-project\",\n environment_id=\"target-composer-environment\",\n region=\"us-central1\",\n command=\"dags trigger -- target_dag\",\n # You can run this operator in the deferrable mode:\n # deferrable=True\n )\n\nCheck if a DAG run is completed\n\nYou can use the [CloudComposerDAGRunSensor](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/cloud_composer/index.html#airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerDAGRunSensor)\nsensor to checks if a DAG run is completed in another Cloud Composer\nenvironment.\n\nThis sensor can run in the [deferrable mode](/composer/docs/composer-2/use-deferrable-operators), you can\nenable it by setting the `deferrable` parameter to `True`. \n\n dag_run_sensor = CloudComposerDAGRunSensor(\n task_id=\"dag_run_sensor\",\n project_id=\"target-project\",\n environment_id=\"target-composer-environment\",\n region=\"us-central1\",\n composer_dag_id=\"target_dag\",\n allowed_states=[\"success\"],\n # You can run this sensor in the deferrable mode:\n # deferrable=True\n )\n\nFull example code\n\nThe following is the full code example of a DAG that combines the two\npreviously described tasks.\n**Note:** If the target environment is located in another project, make sure to [configure IAM permissions](#iam-permissions) before running this DAG. \n\n from datetime import datetime, timedelta\n from airflow.models.dag import DAG\n from airflow.providers.google.cloud.operators.cloud_composer import (\n CloudComposerRunAirflowCLICommandOperator,\n )\n from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor\n\n DAG_ID = \"trigger_dag_in_another_composer_environment\"\n\n TARGET_PROJECT_ID = \"example-target-project\"\n TARGET_REGION = \"example-target-region\"\n TARGET_ENV_ID = \"example-target-composer-environment\"\n\n TARGET_DAG = \"example_target_dag_id\"\n COMMAND = f\"dags trigger -- {TARGET_DAG}\"\n\n with DAG(\n DAG_ID,\n schedule=\"@once\",\n start_date=datetime(2024, 1, 1),\n catchup=False,\n tags=[\"example\", \"composer\"],\n ) as dag:\n\n run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(\n task_id=\"run_airflow_cli_cmd\",\n project_id=TARGET_PROJECT_ID,\n environment_id=TARGET_ENV_ID,\n region=TARGET_REGION,\n command=COMMAND,\n # You can run this operator in the deferrable mode:\n # deferrable=True\n )\n\n dag_run_sensor = CloudComposerDAGRunSensor(\n task_id=\"dag_run_sensor\",\n project_id=TARGET_PROJECT_ID,\n environment_id=TARGET_ENV_ID,\n region=TARGET_REGION,\n composer_dag_id=TARGET_DAG,\n allowed_states=[\"success\"],\n execution_range=timedelta(minutes=5),\n # You can run this sensor in the deferrable mode:\n # deferrable=True\n )\n\n run_airflow_cli_cmd \u003e\u003e dag_run_sensor\n\nWhat's next\n\n- [Schedule and trigger DAGs](/composer/docs/composer-2/schedule-and-trigger-dags)\n- [Access control with IAM](/composer/docs/composer-2/access-control)"]]