Orchestrer des pipelines

Cette page explique l'orchestration de pipelines avec Cloud Composer et les déclencheurs. Cloud Data Fusion recommande d'utiliser Cloud Composer pour orchestrer les pipelines. Si vous avez besoin d'un moyen plus simple de gérer l'orchestration, utilisez des déclencheurs.

Composer

Orchestrer des pipelines avec Cloud Composer

Orchestrer l'exécution d'un pipeline dans Cloud Data Fusion avec Cloud Composer offre les avantages suivants:

  • Gestion centralisée des workflows:gérez uniformément l'exécution de plusieurs pipelines Cloud Data Fusion.
  • Gestion des dépendances:pour garantir un ordre d'exécution correct, définissez des dépendances entre les pipelines.
  • Surveillance et alertes:Cloud Composer fournit des fonctionnalités de surveillance et des alertes en cas de défaillance.
  • Intégration à d'autres services:Cloud Composer vous permet d'orchestrer des workflows qui s'étendent sur Cloud Data Fusion et d'autres services Google Cloud.

Pour orchestrer des pipelines Cloud Data Fusion à l'aide de Cloud Composer, procédez comme suit:

  1. Configurez l'environnement Cloud Composer.

    • Créez un environnement Cloud Composer. Si vous n'en avez pas, provisionnez l'environnement dans votre projet Google Cloud. Cet environnement est votre espace de travail d'orchestration.
    • Accordez des autorisations. Assurez-vous que le compte de service Cloud Composer dispose des autorisations nécessaires pour accéder à Cloud Data Fusion (telles que l'autorisation de démarrer, d'arrêter et de répertorier les pipelines).
  2. Définir des graphes orientés acycliques (DAG) pour l'orchestration

    • Créez un DAG:dans Cloud Composer, créez un DAG qui définit le workflow d'orchestration pour vos pipelines Cloud Data Fusion.
    • Opérateurs Cloud Data Fusion:utilisez les opérateurs Cloud Data Fusion de Cloud Composer dans votre DAG. Ces opérateurs vous permettent d'interagir de manière programmatique avec Cloud Data Fusion.

Opérateurs Cloud Data Fusion

L'orchestration de pipelines Cloud Data Fusion comporte les opérateurs suivants:

CloudDataFusionStartPipelineOperator

Déclenche l'exécution d'un pipeline Cloud Data Fusion par son ID. Il comporte les paramètres suivants:

  • ID du pipeline
  • Emplacement (région Google Cloud)
  • Espace de noms du pipeline
  • Arguments d'exécution (facultatif)
  • Attendre la fin de l'opération (facultatif)
  • Délai avant expiration (facultatif)
CloudDataFusionStopPipelineOperator

Vous permet d'arrêter un pipeline Cloud Data Fusion en cours d'exécution.

CloudDataFusionDeletePipelineOperator

Supprime un pipeline Cloud Data Fusion.

Créer le workflow du DAG

Lorsque vous créez le workflow DAG, tenez compte des points suivants:

  • Définir des dépendances:utilisez la structure du DAG pour définir des dépendances entre les tâches. Par exemple, une tâche peut attendre qu'un pipeline dans un espace de noms se termine correctement avant de déclencher un autre pipeline dans un espace de noms différent.
  • Planification:planifiez l'exécution du DAG à des intervalles spécifiques (par exemple, tous les jours ou toutes les heures), ou configurez-le pour qu'il se déclenche manuellement.

Pour en savoir plus, consultez la présentation de Cloud Composer.

Déclencheurs

Orchestrer des pipelines avec des déclencheurs

Les déclencheurs Cloud Data Fusion vous permettent d'exécuter automatiquement un pipeline en aval après l'achèvement (réussite, échec ou toute condition spécifiée) d'un ou de plusieurs pipelines en amont.

Les déclencheurs sont utiles pour les tâches suivantes:

  • Nettoyer vos données une seule fois, puis les mettre à la disposition de plusieurs pipelines en aval pour utilisation
  • partager des informations entre les pipelines, telles que les arguments d'exécution et les configurations de plug-in ; Cette tâche est appelée configuration de la charge utile.
  • Disposer d'un ensemble de pipelines dynamiques qui s'exécutent à l'aide des données de l'heure, du jour, de la semaine ou du mois, au lieu d'un pipeline statique qui doit être mis à jour à chaque exécution.

Par exemple, vous disposez d'un ensemble de données contenant toutes les informations sur les livraisons de votre entreprise. Sur la base de ces données, vous souhaitez répondre à plusieurs questions métier. Pour ce faire, vous devez créer un pipeline qui nettoie les données brutes sur les expéditions, appelé Nettoyage des données d'expédition. Vous créez ensuite un second pipeline, Delayed Shipments USA, qui lit les données nettoyées et trouve les expéditions aux États-Unis qui ont été retardées de plus d'un seuil spécifié. Le pipeline Delayed Shipments USA peut être déclenché dès que le pipeline Shipments Data Serving (Nettoyage des données en amont) se termine avec succès.

De plus, étant donné que le pipeline en aval utilise la sortie du pipeline en amont, vous devez spécifier que lorsque le pipeline en aval s'exécute à l'aide de ce déclencheur, il reçoit également le répertoire d'entrée à partir duquel lire les données (qui est le répertoire dans lequel le pipeline en amont a généré sa sortie). Ce processus est appelé transmission de la configuration de la charge utile, que vous définissez avec des arguments d'exécution. Il vous permet d'avoir un ensemble de pipelines dynamiques qui s'exécutent à l'aide des données de l'heure, du jour, de la semaine ou du mois (et non un pipeline statique, qui doit être mis à jour à chaque exécution).

Pour orchestrer des pipelines avec des déclencheurs, procédez comme suit:

  1. Créer des pipelines en amont et en aval

    • Dans Cloud Data Fusion Studio, concevez et déployez les pipelines qui constituent votre chaîne d'orchestration.
    • Réfléchissez à l'achèvement du pipeline qui activera le prochain pipeline (en aval) de votre workflow.
  2. Facultatif: transmettre des arguments d'exécution pour les pipelines en amont

  3. Créez un déclencheur entrant dans le pipeline en aval.

    • Dans Cloud Data Fusion Studio, accédez à la page Liste. Dans l'onglet Déployé, cliquez sur le nom du pipeline en aval. La vue "Déployer" de ce pipeline s'affiche.
    • Au milieu de la page, cliquez sur Déclencheurs entrants. La liste des pipelines disponibles s'affiche.
    • Cliquez sur le pipeline en amont. Sélectionnez un ou plusieurs états d'achèvement du pipeline en amont (Réussite, Échec ou Arrêts) comme condition d'exécution du pipeline en aval.
    • Si vous souhaitez que le pipeline en amont partage des informations (appelées configuration de la charge utile) avec le pipeline en aval, cliquez sur Configuration du déclencheur, puis suivez les étapes pour transmettre la configuration de la charge utile en tant qu'arguments d'exécution. Sinon, cliquez sur Activer le déclencheur.
  4. Testez le déclencheur.

    • Lancez une exécution du pipeline en amont.
    • Si le déclencheur est correctement configuré, le pipeline en aval s'exécute automatiquement à la fin des pipelines en amont, en fonction de la condition que vous avez configurée.

Transmettre la configuration de la charge utile en tant qu'arguments d'exécution

La configuration de la charge utile permet de partager des informations du pipeline en amont vers le pipeline en aval. Ces informations peuvent être, par exemple, le répertoire de sortie, le format de données ou la date à laquelle le pipeline a été exécuté. Ces informations sont ensuite utilisées par le pipeline en aval pour prendre des décisions, par exemple pour déterminer le bon ensemble de données à lire.

Pour transmettre des informations du pipeline en amont au pipeline en aval, vous devez définir les arguments d'exécution du pipeline en aval avec les valeurs de ces arguments ou la configuration de n'importe quel plug-in du pipeline en amont.

Chaque fois que le pipeline en aval se déclenche et s'exécute, sa configuration de charge utile est définie à l'aide des arguments d'exécution de l'exécution particulière du pipeline en amont qui a déclenché le pipeline en aval.

Pour transmettre la configuration de la charge utile en tant qu'arguments d'exécution, procédez comme suit:

  1. Reprenez là où vous en étiez dans la section Créer un déclencheur entrant. Après avoir cliqué sur Configuration du déclencheur, tous les arguments d'exécution que vous avez définis précédemment pour votre pipeline en amont apparaîtront. Choisissez les arguments d'exécution à transmettre du pipeline en amont au pipeline en aval lorsque ce déclencheur s'exécute.
  2. Cliquez sur l'onglet Configuration du plug-in pour afficher la liste des éléments qui seront transmis de votre pipeline en amont à votre pipeline en aval lorsqu'il sera déclenché.
  3. Cliquez sur Configurer et activer le déclencheur.