Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Cette page explique l'orchestration de pipeline avec Cloud Composer et les déclencheurs. Cloud Data Fusion recommande d'utiliser Cloud Composer pour orchestrer les pipelines. Si vous avez besoin d'un moyen plus simple de gérer l'orchestration, utilisez des déclencheurs.
Composer
Orchestration de pipelines avec Cloud Composer
L'orchestration de l'exécution du pipeline dans Cloud Data Fusion avec Cloud Composer présente les avantages suivants:
Gestion centralisée des workflows:gérez de manière uniforme l'exécution de plusieurs pipelines Cloud Data Fusion.
Gestion des dépendances:pour garantir un ordre d'exécution correct, définissez des dépendances entre les pipelines.
Surveillance et alerte:Cloud Composer fournit des fonctionnalités de surveillance et des alertes en cas de défaillance.
Intégration à d'autres services:Cloud Composer vous permet d'orchestrer des workflows couvrant Cloud Data Fusion et d'autres servicesGoogle Cloud .
Pour orchestrer des pipelines Cloud Data Fusion à l'aide de Cloud Composer, procédez comme suit:
Configurez l'environnement Cloud Composer.
Créez un environnement Cloud Composer. Si vous n'en avez pas, provisionnez l'environnement dans votre projet Google Cloud .
Cet environnement est votre espace de travail d'orchestration.
Accordez des autorisations. Assurez-vous que le compte de service Cloud Composer dispose des autorisations nécessaires pour accéder à Cloud Data Fusion (par exemple, l'autorisation de démarrer, d'arrêter et de lister des pipelines).
Définissez des graphes orientés acycliques (DAG) pour l'orchestration.
Créez un DAG:dans Cloud Composer, créez un DAG qui définit le workflow d'orchestration de vos pipelines Cloud Data Fusion.
Opérateurs Cloud Data Fusion:utilisez les opérateurs Cloud Data Fusion de Cloud Composer dans votre DAG. Ces opérateurs vous permettent d'interagir de manière programmatique avec Cloud Data Fusion.
Opérateurs Cloud Data Fusion
L'orchestration des pipelines Cloud Data Fusion dispose des opérateurs suivants:
CloudDataFusionStartPipelineOperator
Déclenche l'exécution d'un pipeline Cloud Data Fusion par son ID. Elle comporte les paramètres suivants:
ID du pipeline
Emplacement (régionGoogle Cloud )
Espace de noms du pipeline
Arguments d'exécution (facultatif)
Attendre la fin (facultatif)
Délai avant expiration (facultatif)
CloudDataFusionStopPipelineOperator
Vous permet d'arrêter un pipeline Cloud Data Fusion en cours d'exécution.
CloudDataFusionDeletePipelineOperator
Supprime un pipeline Cloud Data Fusion.
Créer le workflow DAG
Lorsque vous créez le workflow DAG, tenez compte des points suivants:
Définir des dépendances:utilisez la structure DAG pour définir les dépendances entre les tâches. Par exemple, une tâche peut attendre la fin d'un pipeline dans un espace de noms avant de déclencher un autre pipeline dans un autre espace de noms.
Planification:planifiez l'exécution du DAG à des intervalles spécifiques, par exemple quotidiens ou horaires, ou définissez-le pour qu'il soit déclenché manuellement.
Les déclencheurs Cloud Data Fusion vous permettent d'exécuter automatiquement un pipeline en aval à la fin (succès, échec ou toute condition spécifiée) d'un ou de plusieurs pipelines en amont.
Les déclencheurs sont utiles pour les tâches suivantes:
Nettoyer vos données une seule fois, puis les mettre à disposition de plusieurs pipelines en aval pour utilisation.
Partager des informations, telles que des arguments d'exécution et des configurations de plug-ins, entre les pipelines Cette tâche est appelée configuration de la charge utile.
Disposer d'un ensemble de pipelines dynamiques qui s'exécutent à l'aide des données de l'heure, du jour, de la semaine ou du mois, au lieu d'un pipeline statique qui doit être mis à jour à chaque exécution.
Par exemple, vous disposez d'un ensemble de données contenant toutes les informations sur les expéditions de votre entreprise. Sur la base de ces données, vous souhaitez répondre à plusieurs questions d'ordre commercial. Pour ce faire, vous créez un pipeline qui nettoie les données brutes sur les expéditions, appelé Nettoyage des données d'expédition. Vous créez ensuite un deuxième pipeline, Delayed Shipments USA (Expéditions retardées aux États-Unis), qui lit les données nettoyées et trouve les expéditions aux États-Unis retardées de plus d'un seuil spécifié. Le pipeline Delayed Shipments USA peut être déclenché dès que le pipeline Shipments Data Cleaning en amont est terminé.
De plus, comme le pipeline en aval consomme la sortie du pipeline en amont, vous devez spécifier que lorsque le pipeline en aval s'exécute à l'aide de ce déclencheur, il reçoit également le répertoire d'entrée à lire (qui est le répertoire dans lequel le pipeline en amont a généré sa sortie). Ce processus est appelé transmission de la configuration de la charge utile, que vous définissez avec des arguments d'exécution. Il vous permet de disposer d'un ensemble de pipelines dynamiques qui s'exécutent à l'aide des données de l'heure, du jour, de la semaine ou du mois (et non d'un pipeline statique, qui doit être mis à jour à chaque exécution).
Pour orchestrer des pipelines avec des déclencheurs, procédez comme suit:
Créez des pipelines en amont et en aval.
Dans Cloud Data Fusion Studio, concevez et déployez les pipelines qui constituent votre chaîne d'orchestration.
Déterminez quel pipeline doit être terminé pour activer le pipeline suivant (en aval) de votre workflow.
Facultatif: transmettez des arguments d'exécution pour les pipelines en amont.
Créez un déclencheur entrant sur le pipeline en aval.
Dans Cloud Data Fusion Studio, accédez à la page Liste. Dans l'onglet Déployé, cliquez sur le nom du pipeline en aval. La vue "Déployer" de ce pipeline s'affiche.
Au milieu de la page, cliquez sur Déclencheurs entrants.
La liste des pipelines disponibles s'affiche.
Cliquez sur le pipeline en amont. Sélectionnez un ou plusieurs états d'achèvement du pipeline en amont (Réussite, Échec ou Arrêts) comme condition de le pipeline en aval doit s'exécuter.
Si vous souhaitez que le pipeline en amont partage des informations (appelées configuration de la charge utile) avec le pipeline en aval, cliquez sur Configuration du déclencheur, puis suivez les étapes pour transmettre la configuration de la charge utile en tant qu'arguments d'exécution.
Sinon, cliquez sur Activer le déclencheur.
Testez le déclencheur.
Démarrez l'exécution du pipeline en amont.
Si le déclencheur est correctement configuré, le pipeline en aval s'exécute automatiquement à la fin des pipelines en amont, en fonction de la condition que vous avez configurée.
Transmettre la configuration de la charge utile en tant qu'arguments d'exécution
La configuration de la charge utile permet de partager des informations du pipeline en amont dans le pipeline en aval. Ces informations peuvent être, par exemple, le répertoire de sortie, le format des données ou le jour de l'exécution du pipeline. Ces informations sont ensuite utilisées par le pipeline en aval pour prendre des décisions telles que la détermination de l'ensemble de données approprié à lire.
Pour transmettre des informations du pipeline en amont au pipeline en aval, vous devez définir les arguments d'exécution du pipeline en aval avec les valeurs des arguments d'exécution ou la configuration de n'importe quel plug-in dans le pipeline en amont.
Chaque fois que le pipeline en aval se déclenche et s'exécute, sa configuration de charge utile est définie à l'aide des arguments d'exécution de l'exécution spécifique du pipeline en amont qui a déclenché le pipeline en aval.
Pour transmettre la configuration de la charge utile en tant qu'arguments d'exécution, procédez comme suit:
Reprenez là où vous en étiez dans la section Créer un déclencheur entrant, après avoir cliqué sur Configuration du déclencheur, tous les arguments d'exécution que vous avez définis précédemment pour le pipeline en amont. Choisissez les arguments d'exécution à transmettre du pipeline en amont au pipeline en aval lorsque ce déclencheur est exécuté.
Cliquez sur l'onglet Configuration du plug-in pour afficher la liste des éléments qui seront transmis de votre pipeline en amont à votre pipeline en aval lorsqu'il est déclenché.
Cliquez sur Configurer et activer le déclencheur.
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2025/09/04 (UTC).
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Difficile à comprendre","hardToUnderstand","thumb-down"],["Informations ou exemple de code incorrects","incorrectInformationOrSampleCode","thumb-down"],["Il n'y a pas l'information/les exemples dont j'ai besoin","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],["Dernière mise à jour le 2025/09/04 (UTC)."],[[["\u003cp\u003eCloud Composer can orchestrate multiple Cloud Data Fusion pipelines, offering centralized workflow and dependency management, monitoring, alerting, and integration with other Google Cloud services.\u003c/p\u003e\n"],["\u003cp\u003eCloud Composer uses Directed Acyclic Graphs (DAGs) and Cloud Data Fusion Operators to define and manage pipeline orchestration, including starting, stopping, and deleting pipelines.\u003c/p\u003e\n"],["\u003cp\u003eTriggers in Cloud Data Fusion allow automatic execution of downstream pipelines upon completion of upstream pipelines, based on success, failure, or other conditions.\u003c/p\u003e\n"],["\u003cp\u003eTriggers facilitate dynamic pipelines by enabling the sharing of runtime arguments and plugin configurations (payload configuration) between upstream and downstream pipelines.\u003c/p\u003e\n"],["\u003cp\u003eUsing payload configuration with triggers, the downstream pipeline can receive information, such as output directory and data format, from the upstream pipeline.\u003c/p\u003e\n"]]],[],null,["# Orchestrate pipelines\n\nThis page explains pipeline orchestration with Cloud Composer and\ntriggers. Cloud Data Fusion recommends using Cloud Composer to\norchestrate pipelines. If you require a simpler way to manage orchestration, use\ntriggers. \n\n### Composer\n\nOrchestrate pipelines with Cloud Composer\n-----------------------------------------\n\nOrchestrating pipeline execution in Cloud Data Fusion with\nCloud Composer provides following benefits:\n\n- **Centralized workflow management:** uniformly manage the execution of multiple Cloud Data Fusion pipelines.\n- **Dependency management:** to ensure proper execution order, define dependencies between pipelines.\n- **Monitoring and alerting:** Cloud Composer provides monitoring capabilities and alerts for failures.\n- **Integration with other services:** Cloud Composer lets you orchestrate workflows that span across Cloud Data Fusion and other Google Cloud services.\n\nTo orchestrate Cloud Data Fusion pipelines using\nCloud Composer, follow this process:\n\n1. **Set up the Cloud Composer environment.**\n\n - **Create a Cloud Composer environment.** If you don't have one, provision the environment in your Google Cloud project. This environment is your orchestration workspace.\n - **Give permissions.** Ensure the Cloud Composer service account has the necessary permissions to access Cloud Data Fusion (such as permission to start, stop, and list pipelines).\n2. **Define Directed Acyclic Graphs (DAG) for orchestration.**\n\n - **Create a DAG:** In Cloud Composer, create a DAG that defines the orchestration workflow for your Cloud Data Fusion pipelines.\n - **Cloud Data Fusion Operators:** Use Cloud Composer's Cloud Data Fusion Operators within your DAG. These operators let you interact programmatically with Cloud Data Fusion.\n\n### Cloud Data Fusion operators\n\nCloud Data Fusion pipeline orchestration has the following operators:\n\n`CloudDataFusionStartPipelineOperator`\n\n: Triggers the execution of a Cloud Data Fusion pipeline by its ID. It\n has the following parameters:\n\n - Pipeline ID\n - Location (Google Cloud region)\n - Pipeline namespace\n - Runtime arguments (optional)\n - Wait for completion (optional)\n - Timeout (optional)\n\n`CloudDataFusionStopPipelineOperator`\n\n: Lets you stop a running Cloud Data Fusion pipeline.\n\n`CloudDataFusionDeletePipelineOperator`\n\n: Deletes a Cloud Data Fusion pipeline.\n\n### Build the DAG workflow\n\nWhen you build the DAG workflow, consider the following:\n\n- **Defining dependencies:** Use the DAG structure to define dependencies between tasks. For example, you might have a task that waits for a pipeline in one namespace to complete successfully before triggering another pipeline in a different namespace.\n- **Scheduling:** Schedule the DAG to run at specific intervals, such as daily or hourly, or set it to be triggered manually.\n\nFor more information, see the\n[Cloud Composer overview](/composer/docs/concepts/overview).\n\n### Triggers\n\nOrchestrate pipelines with triggers\n-----------------------------------\n\nCloud Data Fusion triggers let you automatically execute a downstream\npipeline upon the completion (success, failure, or any specified condition)\nof one or more upstream pipelines.\n\nTriggers are useful for the following tasks:\n\n- Cleaning your data once, and then making it available to multiple downstream pipelines for consumption.\n- Sharing information, such as runtime arguments and plugin configurations, between pipelines. This task is called *payload\n configuration*.\n- Having a set of dynamic pipelines that run using the data from the hour, day, week, or month, instead of a static pipeline that must be updated for every run.\n\nFor example, you have a dataset that contains all information about your\ncompany's shipments. Based on this data, you want to answer several business\nquestions. To do this, you create one pipeline that cleanses the raw data\nabout shipments, called *Shipments Data Cleaning* . Then you create a second\npipeline, *Delayed Shipments USA* , which reads the cleansed data and finds\nthe shipments within the USA that were delayed by more than a specified\nthreshold. The *Delayed Shipments USA* pipeline can be triggered as soon as\nthe upstream *Shipments Data Cleaning* pipeline successfully completes.\n\nAdditionally, since the downstream pipeline consumes the output of the\nupstream pipeline, you must specify that when the downstream pipeline runs\nusing this trigger, it also receives the input directory to read from (which\nis the directory where the upstream pipeline generated its output). This\nprocess is called *passing payload configuration*, which you define with\nruntime arguments. It lets you have a set of dynamic pipelines that\nrun using the data of the hour, day, week, or month (not a static pipeline,\nwhich must be updated for every run).\n| **Note:** Don't trigger upgrades with Terraform. For more information, see the [limitations for Cloud Data Fusion upgrades](/data-fusion/docs/how-to/upgrading#limitations).\n\nTo orchestrate pipelines with triggers, follow this process:\n\n1. **Create upstream and downstream pipelines.**\n\n - In the Cloud Data Fusion Studio, design and deploy the pipelines that form your orchestration chain.\n - Consider which pipeline's completion will activate the next pipeline (downstream) in your workflow.\n2. **Optional: pass runtime arguments for upstream pipelines.**\n\n - If you need to [pass payload configuration as runtime arguments](#pass-payload-configs) between pipelines, configure runtime arguments. These arguments can be passed to the downstream pipeline during execution.\n3. **Create an inbound trigger on the downstream pipeline.**\n\n - In the Cloud Data Fusion Studio, go to the **List** page. In the **Deployed** tab, click the name of the downstream pipeline. The Deploy view for that pipeline appears.\n - On the middle left side of the page, click **Inbound triggers**. A list of available pipelines appears.\n - Click the upstream pipeline. Select one or more of the upstream pipeline completion states (**Succeeds** , **Fails** , or **Stops**) as the condition for when the downstream pipeline should run.\n - If you want the upstream pipeline to share information (called *payload configuration* ) with the downstream pipeline, click **Trigger config** and then follow the steps to [pass payload configuration as runtime arguments](#pass-payload-configs). Otherwise, click **Enable trigger**.\n4. **Test the trigger.**\n\n - Initiate a run of the upstream pipeline.\n - If the trigger is configured correctly, the downstream pipeline automatically executes upon completion of the upstream pipelines, based on your configured condition.\n\n### Pass payload configuration as runtime arguments\n\nPayload configuration allows sharing of information from the upstream\npipeline to the downstream pipeline. This information can be, for example,\nthe output directory, the data format, or the day the pipeline was run. This\ninformation is then used by the downstream pipeline for decisions such as\ndetermining the right dataset to read from.\n\nTo pass information from the upstream pipeline to the downstream pipeline,\nyou set the runtime arguments of the downstream pipeline with the values of\neither the runtime arguments or the configuration of any plugin in the\nupstream pipeline.\n\nWhenever the downstream pipeline triggers and runs, its payload\nconfiguration is set using the runtime arguments of the particular run of\nthe upstream pipeline that triggered the downstream pipeline.\n\nTo pass payload configuration as runtime arguments, follow these steps:\n\n1. Picking up where you left off in the [Creating an inbound trigger](/data-fusion/docs/how-to/using-triggers#create_inbound_trigger), after clicking **Trigger config** , any runtime arguments you [previously set](/data-fusion/docs/how-to/using-triggers#before_you_begin) for your upstream pipeline will appear. Choose the runtime arguments to pass from the upstream pipeline to the downstream pipeline when this trigger executes.\n2. Click the **Plugin config** tab to see a list of what will be passed from your upstream pipeline to your downstream pipeline when it is triggered.\n3. Click **Configure and Enable Trigger**."]]