Il 15 settembre 2026, tutti gli ambienti Cloud Composer 1 e Cloud Composer 2 versione 2.0.x raggiungeranno la fine del ciclo di vita pianificata e non potrai più utilizzarli. Ti consigliamo di pianificare la migrazione a Cloud Composer 3.
Questa pagina mostra come implementare un DAG che attiva DAG in altri
progetti e ambienti Cloud Composer utilizzando gli operatori Airflow
per Cloud Composer.
Se l'ambiente di destinazione si trova in un altro progetto, il service account del tuo ambiente deve disporre di ruoli che consentano l'interazione con gli ambienti in quel progetto.
Progetto
Risorsa
Entità
Ruolo
Il progetto in cui si trova l'ambiente di destinazione
Progetto
Il account di servizio dell'ambiente di origine
Ruolo Worker Composer (composer.worker)
Il progetto in cui si trova l'ambiente di destinazione
Progetto
Il account di servizio dell'ambiente di origine
Un ruolo personalizzato con l'autorizzazione
composer.environments.executeAirflowCommand
Attivare un DAG in un altro ambiente
Il DAG di esempio descritto in questa sezione esegue le seguenti operazioni:
Attiva un DAG in un altro ambiente Cloud Composer.
Verifica se un'esecuzione di DAG è stata completata.
Al termine dell'esecuzione del DAG in un altro ambiente, il DAG di esempio viene
contrassegnato come riuscito.
Esegui i comandi dell'interfaccia a riga di comando di Airflow con CloudComposerRunAirflowCLICommandOperator
Puoi utilizzare l'operatore
CloudComposerRunAirflowCLICommandOperator
per eseguire i comandi della CLI di Airflow in un altro ambiente Cloud Composer. Il DAG di esempio esegue il comando dags trigger, che
attiva un DAG.
Questo operatore può essere eseguito in modalità differibile, che
puoi attivare impostando il parametro deferrable su True.
run_airflow_cli_cmd=CloudComposerRunAirflowCLICommandOperator(task_id="run_airflow_cli_cmd",project_id="target-project",environment_id="target-composer-environment",region="us-central1",command="dags trigger -- target_dag",# You can run this operator in the deferrable mode:# deferrable=True)
Controllare se l'esecuzione di un DAG è stata completata
Puoi utilizzare il sensore CloudComposerDAGRunSensor
per verificare se l'esecuzione di un DAG è completata in un altro ambiente Cloud Composer.
Questo sensore può essere eseguito in modalità differibile, che puoi
attivare impostando il parametro deferrable su True.
dag_run_sensor=CloudComposerDAGRunSensor(task_id="dag_run_sensor",project_id="target-project",environment_id="target-composer-environment",region="us-central1",composer_dag_id="target_dag",allowed_states=["success"],# You can run this sensor in the deferrable mode:# deferrable=True)
Codice di esempio completo
Di seguito è riportato l'esempio di codice completo di un DAG che combina le due attività descritte in precedenza.
fromdatetimeimportdatetime,timedeltafromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.cloud_composerimport(CloudComposerRunAirflowCLICommandOperator,)fromairflow.providers.google.cloud.sensors.cloud_composerimportCloudComposerDAGRunSensorDAG_ID="trigger_dag_in_another_composer_environment"TARGET_PROJECT_ID="example-target-project"TARGET_REGION="example-target-region"TARGET_ENV_ID="example-target-composer-environment"TARGET_DAG="example_target_dag_id"COMMAND=f"dags trigger -- {TARGET_DAG}"withDAG(DAG_ID,schedule="@once",start_date=datetime(2024,1,1),catchup=False,tags=["example","composer"],)asdag:run_airflow_cli_cmd=CloudComposerRunAirflowCLICommandOperator(task_id="run_airflow_cli_cmd",project_id=TARGET_PROJECT_ID,environment_id=TARGET_ENV_ID,region=TARGET_REGION,command=COMMAND,# You can run this operator in the deferrable mode:# deferrable=True)dag_run_sensor=CloudComposerDAGRunSensor(task_id="dag_run_sensor",project_id=TARGET_PROJECT_ID,environment_id=TARGET_ENV_ID,region=TARGET_REGION,composer_dag_id=TARGET_DAG,allowed_states=["success"],execution_range=timedelta(minutes=5),# You can run this sensor in the deferrable mode:# deferrable=True)run_airflow_cli_cmd >> dag_run_sensor
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-08-29 UTC."],[[["\u003cp\u003eThis guide details how to use Airflow operators within Cloud Composer 3 to trigger DAGs in other Cloud Composer environments and projects.\u003c/p\u003e\n"],["\u003cp\u003eTo trigger a DAG in a different project, the source environment's service account must have the \u003cstrong\u003eComposer Worker\u003c/strong\u003e role and a custom role with the \u003ccode\u003ecomposer.environment.executeAirflowCommand\u003c/code\u003e permission in the target project.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eCloudComposerRunAirflowCLICommandOperator\u003c/code\u003e is used to execute Airflow CLI commands, such as \u003ccode\u003edags trigger\u003c/code\u003e, in a target Cloud Composer environment.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eCloudComposerDAGRunSensor\u003c/code\u003e is employed to monitor and verify the successful completion of a DAG run in a separate Cloud Composer environment.\u003c/p\u003e\n"],["\u003cp\u003eBoth of these operators mentioned can use the deferrable mode, by setting the \u003ccode\u003edeferrable\u003c/code\u003e parameter to true.\u003c/p\u003e\n"]]],[],null,["\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n**Cloud Composer 3** \\| [Cloud Composer 2](/composer/docs/composer-2/trigger-dags-in-other-environments \"View this page for Cloud Composer 2\") \\| Cloud Composer 1\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page demonstrates how to implement a DAG that triggers DAGs in other\nCloud Composer environments and projects by using Airflow operators\nfor Cloud Composer.\n\nIf you want to trigger DAGs in your environment instead, see\n[Schedule and trigger DAGs](/composer/docs/composer-3/schedule-and-trigger-dags).\n\nConfigure IAM permissions\n\n**If the target environment is located in another project**, then the service\naccount of your environment needs roles that allows interacting with\nenvironments in that project.\n| **Note:** To grant a single permission, [create a custom role](/iam/docs/creating-custom-roles#creating) in the target project, add the permission to it, and then [grant this role](/iam/docs/granting-changing-revoking-access#single-role) to a principal.\n\n| Project | Resource | Principal | Role |\n|-------------------------------------------------|----------|---------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|\n| Project where the target environment is located | Project | Environment's service account of the source environment | **Composer Worker** role (`composer.worker`) |\n| Project where the target environment is located | Project | Environment's service account of the source environment | A [custom role](/iam/docs/creating-custom-roles#creating) with the `composer.environments.executeAirflowCommand` permission |\n\nTrigger a DAG in another environment\n\nThe example DAG described in this section does the following:\n\n1. Trigger a DAG in another Cloud Composer environment.\n2. Checks if a DAG run is completed.\n\nAfter the DAG run in another environment is completed, the example DAG is\nmarked as successful.\n| **Note:** Google provides more [Airflow operators](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/cloud_composer/index.html), [sensors](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/cloud_composer/index.html), [triggers](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/triggers/cloud_composer/index.html), and [hooks](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/cloud_composer/index.html) that work with Cloud Composer environments. For example, you can create, update and delete environments with these operators.\n\nRun Airflow CLI commands with CloudComposerRunAirflowCLICommandOperator\n\nYou can use the\n[CloudComposerRunAirflowCLICommandOperator](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/cloud_composer/index.html#airflow.providers.google.cloud.operators.cloud_composer.CloudComposerRunAirflowCLICommandOperator)\noperator to run Airflow CLI commands in another Cloud Composer\nenvironment. The example DAG executes the `dags trigger` command, which\ntriggers a DAG.\n\nThis operator can run in the [deferrable mode](/composer/docs/composer-3/use-deferrable-operators), you\ncan enable it by setting the `deferrable` parameter to `True`.\n**Important:** This operator consumes the `environments.executeAirflowCommand` [quota](/composer/quotas). If you want to **trigger a large number of DAGs** , we recommend to do it [with Airflow REST API](/composer/docs/composer-3/access-airflow-api#call-api). Triggering DAGs through CloudComposerRunAirflowCLICommandOperator might cause your environment to reach the quota limit, and as a result, Airflow CLI commands will no longer be executed. \n\n run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(\n task_id=\"run_airflow_cli_cmd\",\n project_id=\"target-project\",\n environment_id=\"target-composer-environment\",\n region=\"us-central1\",\n command=\"dags trigger -- target_dag\",\n # You can run this operator in the deferrable mode:\n # deferrable=True\n )\n\nCheck if a DAG run is completed\n\nYou can use the [CloudComposerDAGRunSensor](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/cloud_composer/index.html#airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerDAGRunSensor)\nsensor to checks if a DAG run is completed in another Cloud Composer\nenvironment.\n\nThis sensor can run in the [deferrable mode](/composer/docs/composer-3/use-deferrable-operators), you can\nenable it by setting the `deferrable` parameter to `True`. \n\n dag_run_sensor = CloudComposerDAGRunSensor(\n task_id=\"dag_run_sensor\",\n project_id=\"target-project\",\n environment_id=\"target-composer-environment\",\n region=\"us-central1\",\n composer_dag_id=\"target_dag\",\n allowed_states=[\"success\"],\n # You can run this sensor in the deferrable mode:\n # deferrable=True\n )\n\nFull example code\n\nThe following is the full code example of a DAG that combines the two\npreviously described tasks.\n**Note:** If the target environment is located in another project, make sure to [configure IAM permissions](#iam-permissions) before running this DAG. \n\n from datetime import datetime, timedelta\n from airflow.models.dag import DAG\n from airflow.providers.google.cloud.operators.cloud_composer import (\n CloudComposerRunAirflowCLICommandOperator,\n )\n from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor\n\n DAG_ID = \"trigger_dag_in_another_composer_environment\"\n\n TARGET_PROJECT_ID = \"example-target-project\"\n TARGET_REGION = \"example-target-region\"\n TARGET_ENV_ID = \"example-target-composer-environment\"\n\n TARGET_DAG = \"example_target_dag_id\"\n COMMAND = f\"dags trigger -- {TARGET_DAG}\"\n\n with DAG(\n DAG_ID,\n schedule=\"@once\",\n start_date=datetime(2024, 1, 1),\n catchup=False,\n tags=[\"example\", \"composer\"],\n ) as dag:\n\n run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(\n task_id=\"run_airflow_cli_cmd\",\n project_id=TARGET_PROJECT_ID,\n environment_id=TARGET_ENV_ID,\n region=TARGET_REGION,\n command=COMMAND,\n # You can run this operator in the deferrable mode:\n # deferrable=True\n )\n\n dag_run_sensor = CloudComposerDAGRunSensor(\n task_id=\"dag_run_sensor\",\n project_id=TARGET_PROJECT_ID,\n environment_id=TARGET_ENV_ID,\n region=TARGET_REGION,\n composer_dag_id=TARGET_DAG,\n allowed_states=[\"success\"],\n execution_range=timedelta(minutes=5),\n # You can run this sensor in the deferrable mode:\n # deferrable=True\n )\n\n run_airflow_cli_cmd \u003e\u003e dag_run_sensor\n\nWhat's next\n\n- [Schedule and trigger DAGs](/composer/docs/composer-3/schedule-and-trigger-dags)\n- [Access control with IAM](/composer/docs/composer-3/access-control)"]]