Orchestration des pipelines

Cette page explique l'orchestration de pipeline avec Cloud Composer et les déclencheurs. Cloud Data Fusion recommande d'utiliser Cloud Composer pour orchestrer les pipelines. Si vous avez besoin d'un moyen plus simple de gérer l'orchestration, utilisez des déclencheurs.

Composer

Orchestration de pipelines avec Cloud Composer

L'orchestration de l'exécution du pipeline dans Cloud Data Fusion avec Cloud Composer présente les avantages suivants:

  • Gestion centralisée des workflows:gérez de manière uniforme l'exécution de plusieurs pipelines Cloud Data Fusion.
  • Gestion des dépendances:pour garantir un ordre d'exécution correct, définissez des dépendances entre les pipelines.
  • Surveillance et alerte:Cloud Composer fournit des fonctionnalités de surveillance et des alertes en cas de défaillance.
  • Intégration à d'autres services:Cloud Composer vous permet d'orchestrer des workflows couvrant Cloud Data Fusion et d'autres servicesGoogle Cloud .

Pour orchestrer des pipelines Cloud Data Fusion à l'aide de Cloud Composer, procédez comme suit:

  1. Configurez l'environnement Cloud Composer.

    • Créez un environnement Cloud Composer. Si vous n'en avez pas, provisionnez l'environnement dans votre projet Google Cloud . Cet environnement est votre espace de travail d'orchestration.
    • Accordez des autorisations. Assurez-vous que le compte de service Cloud Composer dispose des autorisations nécessaires pour accéder à Cloud Data Fusion (par exemple, l'autorisation de démarrer, d'arrêter et de lister des pipelines).
  2. Définissez des graphes orientés acycliques (DAG) pour l'orchestration.

    • Créez un DAG:dans Cloud Composer, créez un DAG qui définit le workflow d'orchestration de vos pipelines Cloud Data Fusion.
    • Opérateurs Cloud Data Fusion:utilisez les opérateurs Cloud Data Fusion de Cloud Composer dans votre DAG. Ces opérateurs vous permettent d'interagir de manière programmatique avec Cloud Data Fusion.

Opérateurs Cloud Data Fusion

L'orchestration des pipelines Cloud Data Fusion dispose des opérateurs suivants:

CloudDataFusionStartPipelineOperator

Déclenche l'exécution d'un pipeline Cloud Data Fusion par son ID. Elle comporte les paramètres suivants:

  • ID du pipeline
  • Emplacement (régionGoogle Cloud )
  • Espace de noms du pipeline
  • Arguments d'exécution (facultatif)
  • Attendre la fin (facultatif)
  • Délai avant expiration (facultatif)
CloudDataFusionStopPipelineOperator

Vous permet d'arrêter un pipeline Cloud Data Fusion en cours d'exécution.

CloudDataFusionDeletePipelineOperator

Supprime un pipeline Cloud Data Fusion.

Créer le workflow DAG

Lorsque vous créez le workflow DAG, tenez compte des points suivants:

  • Définir des dépendances:utilisez la structure DAG pour définir les dépendances entre les tâches. Par exemple, une tâche peut attendre la fin d'un pipeline dans un espace de noms avant de déclencher un autre pipeline dans un autre espace de noms.
  • Planification:planifiez l'exécution du DAG à des intervalles spécifiques, par exemple quotidiens ou horaires, ou définissez-le pour qu'il soit déclenché manuellement.

Pour en savoir plus, consultez la présentation de Cloud Composer.

Déclencheurs

Orchestration de pipelines avec des déclencheurs

Les déclencheurs Cloud Data Fusion vous permettent d'exécuter automatiquement un pipeline en aval à la fin (succès, échec ou toute condition spécifiée) d'un ou de plusieurs pipelines en amont.

Les déclencheurs sont utiles pour les tâches suivantes:

  • Nettoyer vos données une seule fois, puis les mettre à disposition de plusieurs pipelines en aval pour utilisation.
  • Partager des informations, telles que des arguments d'exécution et des configurations de plug-ins, entre les pipelines Cette tâche est appelée configuration de la charge utile.
  • Disposer d'un ensemble de pipelines dynamiques qui s'exécutent à l'aide des données de l'heure, du jour, de la semaine ou du mois, au lieu d'un pipeline statique qui doit être mis à jour à chaque exécution.

Par exemple, vous disposez d'un ensemble de données contenant toutes les informations sur les expéditions de votre entreprise. Sur la base de ces données, vous souhaitez répondre à plusieurs questions d'ordre commercial. Pour ce faire, vous créez un pipeline qui nettoie les données brutes sur les expéditions, appelé Nettoyage des données d'expédition. Vous créez ensuite un deuxième pipeline, Delayed Shipments USA (Expéditions retardées aux États-Unis), qui lit les données nettoyées et trouve les expéditions aux États-Unis retardées de plus d'un seuil spécifié. Le pipeline Delayed Shipments USA peut être déclenché dès que le pipeline Shipments Data Cleaning en amont est terminé.

De plus, comme le pipeline en aval consomme la sortie du pipeline en amont, vous devez spécifier que lorsque le pipeline en aval s'exécute à l'aide de ce déclencheur, il reçoit également le répertoire d'entrée à lire (qui est le répertoire dans lequel le pipeline en amont a généré sa sortie). Ce processus est appelé transmission de la configuration de la charge utile, que vous définissez avec des arguments d'exécution. Il vous permet de disposer d'un ensemble de pipelines dynamiques qui s'exécutent à l'aide des données de l'heure, du jour, de la semaine ou du mois (et non d'un pipeline statique, qui doit être mis à jour à chaque exécution).

Pour orchestrer des pipelines avec des déclencheurs, procédez comme suit:

  1. Créez des pipelines en amont et en aval.

    • Dans Cloud Data Fusion Studio, concevez et déployez les pipelines qui constituent votre chaîne d'orchestration.
    • Déterminez quel pipeline doit être terminé pour activer le pipeline suivant (en aval) de votre workflow.
  2. Facultatif: transmettez des arguments d'exécution pour les pipelines en amont.

  3. Créez un déclencheur entrant sur le pipeline en aval.

    • Dans Cloud Data Fusion Studio, accédez à la page Liste. Dans l'onglet Déployé, cliquez sur le nom du pipeline en aval. La vue "Déployer" de ce pipeline s'affiche.
    • Au milieu de la page, cliquez sur Déclencheurs entrants. La liste des pipelines disponibles s'affiche.
    • Cliquez sur le pipeline en amont. Sélectionnez un ou plusieurs états d'achèvement du pipeline en amont (Réussite, Échec ou Arrêts) comme condition de le pipeline en aval doit s'exécuter.
    • Si vous souhaitez que le pipeline en amont partage des informations (appelées configuration de la charge utile) avec le pipeline en aval, cliquez sur Configuration du déclencheur, puis suivez les étapes pour transmettre la configuration de la charge utile en tant qu'arguments d'exécution. Sinon, cliquez sur Activer le déclencheur.
  4. Testez le déclencheur.

    • Démarrez l'exécution du pipeline en amont.
    • Si le déclencheur est correctement configuré, le pipeline en aval s'exécute automatiquement à la fin des pipelines en amont, en fonction de la condition que vous avez configurée.

Transmettre la configuration de la charge utile en tant qu'arguments d'exécution

La configuration de la charge utile permet de partager des informations du pipeline en amont dans le pipeline en aval. Ces informations peuvent être, par exemple, le répertoire de sortie, le format des données ou le jour de l'exécution du pipeline. Ces informations sont ensuite utilisées par le pipeline en aval pour prendre des décisions telles que la détermination de l'ensemble de données approprié à lire.

Pour transmettre des informations du pipeline en amont au pipeline en aval, vous devez définir les arguments d'exécution du pipeline en aval avec les valeurs des arguments d'exécution ou la configuration de n'importe quel plug-in dans le pipeline en amont.

Chaque fois que le pipeline en aval se déclenche et s'exécute, sa configuration de charge utile est définie à l'aide des arguments d'exécution de l'exécution spécifique du pipeline en amont qui a déclenché le pipeline en aval.

Pour transmettre la configuration de la charge utile en tant qu'arguments d'exécution, procédez comme suit:

  1. Reprenez là où vous en étiez dans la section Créer un déclencheur entrant, après avoir cliqué sur Configuration du déclencheur, tous les arguments d'exécution que vous avez définis précédemment pour le pipeline en amont. Choisissez les arguments d'exécution à transmettre du pipeline en amont au pipeline en aval lorsque ce déclencheur est exécuté.
  2. Cliquez sur l'onglet Configuration du plug-in pour afficher la liste des éléments qui seront transmis de votre pipeline en amont à votre pipeline en aval lorsqu'il est déclenché.
  3. Cliquez sur Configurer et activer le déclencheur.