Mettre à niveau un pipeline de traitement par flux

Cette page fournit des conseils et des recommandations pour mettre à niveau vos pipelines de traitement par flux. Par exemple, vous devrez peut-être passer à une version plus récente du SDK Apache Beam ou mettre à jour le code de votre pipeline. Différentes options sont proposées pour répondre à différents scénarios.

Contrairement aux pipelines de traitement par lot qui s'arrêtent une fois le job terminé, les pipelines de traitement par flux doivent souvent s'exécuter en continu pour assurer un traitement sans interruption. Par conséquent, lorsque vous mettez à niveau des pipelines de traitement par flux, vous devez prendre en compte les éléments suivants :

  • Il peut s'avérer nécessaire de minimiser ou d'éviter les interruptions du pipeline. Dans certains cas, il est possible de tolérer une interruption temporaire du traitement pendant le déploiement d'une nouvelle version du pipeline. Dans d'autres cas, votre application ne pourra tolérer aucune interruption.
  • Les processus de mise à jour de pipeline doivent gérer les modifications de schéma de manière à minimiser les interruptions du traitement des messages et des autres systèmes associés. Par exemple, si le schéma des messages dans un pipeline de traitement des événements change, il peut être nécessaire de reporter également ces modifications de schéma dans les récepteurs de données en aval.

Vous pouvez utiliser l'une des méthodes suivantes pour mettre à jour les pipelines de traitement par flux, en fonction de vos exigences de pipeline et de mise à jour :

Pour plus d'informations sur les problèmes que vous pouvez rencontrer lors d'une mise à jour et sur la façon de les éviter, consultez les sections Valider un job de remplacement et Vérifier la compatibilité des jobs.

Bonnes pratiques

  • Mettez à niveau la version du SDK Apache Beam séparément de toute modification du code du pipeline.
  • Testez votre pipeline après chaque modification avant d'effectuer des mises à jour supplémentaires.
  • Mettez régulièrement à niveau la version du SDK Apache Beam utilisée par votre pipeline.

Effectuer des mises à jour en cours

Vous pouvez mettre à jour certains pipelines de traitement par flux en cours sans arrêter le job. Ce scénario est appelé une mise à jour de job en cours. Les mises à jour de job en cours ne sont disponibles que dans des circonstances limitées :

  • Le job doit utiliser Streaming Engine.
  • Le job doit être à l'état "en cours d'exécution".
  • Vous modifiez uniquement le nombre de nœuds de calcul utilisés par le job.

Pour plus d'informations, consultez la section Définir la plage d'autoscaling de la page "Autoscaling horizontal".

Pour obtenir des instructions sur la façon d'effectuer une mise à jour de job en cours, consultez la page Mettre à jour un pipeline existant.

Lancer un job de remplacement

Si le job mis à jour est compatible avec le job existant, vous pouvez mettre à jour votre pipeline à l'aide de l'option update. Lorsque vous remplacez un job existant, un nouveau job exécute le code de votre pipeline mis à jour. Le service Dataflow conserve le nom du job, mais exécute le job de remplacement avec un ID de job mis à jour. Ce processus peut entraîner des temps d'arrêt pendant l'arrêt de la tâche existante, l'exécution de la vérification de compatibilité et le démarrage de la nouvelle tâche. Pour en savoir plus, consultez la section Effets du remplacement d'un job.

Dataflow effectue une vérification de compatibilité pour s'assurer que le code du pipeline mis à jour peut être déployé en toute sécurité sur le pipeline en cours d'exécution. Certaines modifications de code entraînent l'échec de la vérification de compatibilité, par exemple lorsque des entrées secondaires sont ajoutées ou supprimées d'une étape existante. Lorsque la vérification de compatibilité échoue, vous ne pouvez pas effectuer de mise à jour de job sur place.

Pour obtenir des instructions expliquant comment lancer un job de remplacement, consultez la section Lancer un job de remplacement.

Si la mise à jour du pipeline n'est pas compatible avec le job actuel, vous devez arrêter et remplacer le pipeline. Si votre pipeline ne peut pas tolérer de temps d'arrêt, exécutez des pipelines en parallèle.

Arrêter et remplacer des pipelines

Si vous pouvez interrompre temporairement le traitement, vous pouvez annuler ou drainer le pipeline, puis le remplacer par le pipeline mis à jour. L'annulation d'un pipeline oblige Dataflow à interrompre immédiatement le traitement et à arrêter les ressources le plus rapidement possible, ce qui peut entraîner une perte des données en cours de traitement, appelées données en cours de transfert. Dans la plupart des cas, le drainage est l'action recommandée pour éviter ce problème. Vous pouvez également utiliser des instantanés Dataflow pour enregistrer l'état d'un pipeline de traitement par flux, ce qui vous permet de démarrer une nouvelle version de votre tâche Dataflow sans perdre l'état. Pour en savoir plus, consultez la page Utiliser des instantanés Dataflow.

Le drainage d'un pipeline ferme immédiatement toutes les fenêtres en cours et actionne tous les déclencheurs. Bien que les données en cours de transfert ne soient pas perdues, le drainage peut entraîner des données incomplètes pour les fenêtres. Dans ce cas, les fenêtres en cours de traitement émettent des résultats partiels ou incomplets. Pour plus d'informations, consultez la section Effets liés au drainage d'un job. Une fois le job existant terminé, vous pouvez lancer un nouveau job de traitement par flux contenant le code de votre pipeline mis à jour, ce qui permet de reprendre le traitement.

Avec cette méthode, vous subissez un temps d'arrêt entre le moment où le job de traitement par flux existant s'arrête et le moment où le pipeline de remplacement est prêt à prendre le relais. Cependant, le fait d'annuler ou de drainer un pipeline existant puis de lancer un nouveau job avec le pipeline mis à jour est moins compliqué que d'exécuter des pipelines en parallèle.

Pour obtenir des instructions plus détaillées, consultez la section Drainer un job Dataflow. Après avoir drainé le job actuel, démarrez un nouveau job portant le même nom.

Relancer le traitement des messages avec des fonctionnalités d'instantanés et de recherche Pub/Sub

Dans certains cas, après avoir remplacé ou annulé un pipeline drainé, vous devrez peut-être relancer le traitement des messages Pub/Sub déjà distribués. Par exemple, vous devrez peut-être utiliser une logique métier mise à jour pour relancer le traitement des données. La recherche Pub/Sub est une fonctionnalité qui vous permet de relire les messages d'un instantané Pub/Sub. Vous pouvez utiliser la recherche Pub/Sub avec Dataflow pour relancer le traitement des messages à partir du moment où l'instantané d'abonnement est créé.

Pendant le développement et les tests, vous pouvez également utiliser la fonctionnalité de recherche Pub/Sub pour relire plusieurs fois les messages connus afin de vérifier le résultat de votre pipeline. Lorsque vous utilisez la recherche Pub/Sub, ne recherchez pas d'instantané d'abonnement lorsque l'abonnement est utilisé par un pipeline. Si vous le faites, la recherche pourrait invalider la logique du filigrane de Dataflow et affecter le traitement "exactement une fois" des messages Pub/Sub.

Voici un workflow recommandé de gcloud CLI pour utiliser les fonctionnalités d'instantané et de recherche Pub/Sub avec des pipelines Dataflow dans une fenêtre de terminal :

  1. Pour créer un instantané de l'abonnement, exécutez la commande gcloud pubsub snapshots create :

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Pour drainer ou annuler le pipeline, utilisez la commande gcloud dataflow jobs drain ou la commande gcloud dataflow jobs cancel :

    gcloud dataflow jobs drain JOB_ID
    

    ou

    gcloud dataflow jobs cancel JOB_ID
    
  3. Pour rechercher l'instantané, exécutez la commande gcloud pubsub subscriptions seek :

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Déployez un nouveau pipeline qui utilise l'abonnement.

Exécuter des pipelines en parallèle

Pour éviter toute interruption de votre pipeline de traitement par flux lors d'une mise à jour, exécutez des pipelines en parallèle. Créez un nouveau job de traitement par flux contenant le code du pipeline mis à jour, puis exécutez le nouveau pipeline en parallèle avec le pipeline existant.

Lorsque vous créez le pipeline, utilisez la même stratégie de fenêtrage que celle utilisée pour le pipeline existant. Laissez le pipeline existant s'exécuter jusqu'à ce que son filigrane dépasse l'horodatage de la première fenêtre complète traitée par le pipeline mis à jour. Ensuite, drainez ou annulez le pipeline existant. Le pipeline mis à jour continue de s'exécuter pour prendre le relais et s'occuper du traitement de manière autonome.

Le diagramme suivant illustre ce processus.

Le pipeline A chevauche le pipeline B pendant une fenêtre de 5 minutes

Dans le diagramme, le Pipeline B correspond à la tâche mise à jour qui relaye le Pipeline A. La valeur t correspond à l'horodatage de la fenêtre complète la plus ancienne traitée par le pipeline B. La valeur w correspond au filigrane du Pipeline A. Pour des raisons de simplicité, un filigrane est considéré comme étant parfait sans données tardives. Le traitement et la durée d'exécution sont représentés sur l'axe horizontal. Les deux pipelines utilisent des fenêtres à durée fixe (fenêtres bascules) de cinq minutes. Les résultats sont déclenchés une fois que le filigrane dépasse la fin de chaque fenêtre.

Comme des résultats simultanés se produisent pendant la période où les deux pipelines se chevauchent, configurez les deux pipelines pour écrire les résultats dans différentes destinations. Les systèmes en aval peuvent ainsi utiliser un extrait des deux récepteurs de destination, par exemple une vue de base de données, pour interroger les résultats combinés. Ces systèmes peuvent également utiliser l'extrait pour dédupliquer les résultats de la période de chevauchement.

L'exemple suivant montre comment utiliser un pipeline qui lit les données d'entrée à partir de Pub/Sub, effectue un traitement et écrit les résultats dans BigQuery.

  1. Dans l'état initial, le pipeline de streaming existant (Pipeline A) s'exécute et lit des messages à partir d'un sujet Pub/Sub (Topic) à l'aide d'un abonnement (Subscription A). Les résultats sont écrits dans une table BigQuery (Table A). Ils sont utilisés via une vue BigQuery, qui sert de façade pour masquer les modifications de table sous-jacentes. Il s'agit d'une méthode de conception appelée modèle de façade. Le diagramme suivant illustre l'état initial.

    Un pipeline avec un abonnement et écriture dans une seule table BigQuery

  2. Créez un nouvel abonnement (Subscription B) pour le pipeline mis à jour. Déployez le pipeline mis à jour (Pipeline B), qui lit le sujet Pub/Sub (Topic) à l'aide de l'abonnement Subscription B et écrit dans une table BigQuery distincte (Table B). Le diagramme suivant illustre ce flux.

    Deux pipelines, chacun avec un abonnement. Chaque pipeline écrit dans une table BigQuery distincte Une vue de façade lit les deux tables

    À ce stade, les pipelines Pipeline A et Pipeline B s'exécutent en parallèle et écrivent les résultats dans des tables distinctes. Vous enregistrez le temps t en tant qu'horodatage de la fenêtre complète la plus ancienne traitée par le pipeline B.

  3. Lorsque le filigrane du Pipeline A dépasse le temps t, drainez le Pipeline A. Lorsque vous drainez le pipeline, toutes les fenêtres ouvertes se ferment et le traitement des données en cours se termine. Si le pipeline contient des fenêtres et que des fenêtres complètes sont importantes (en supposant qu'elles ne contiennent aucune donnée tardive), laissez les deux pipelines s'exécuter jusqu'au chevauchement des fenêtres complètes avant de drainer le Pipeline A. Arrêtez le job de traitement par flux du Pipeline A une fois que toutes les données en cours ont été traitées et écrites dans la Table A. Le diagramme suivant illustre ce stade.

    Le pipeline A se draine et ne lit plus l'abonnement A. Il n'envoie plus de données à la table A une fois le drainage terminé L'intégralité du traitement est gérée par le second pipeline

  4. À ce stade, seul le pipeline B est en cours d'exécution. Vous pouvez effectuer une requête à partir d'une vue BigQuery (vue de façade), qui sert de façade pour masquer la table A et la table B. Pour les lignes ayant le même horodatage dans les deux tables, configurez la vue de façon à renvoyer les lignes de la Table B ou revenir à la Table A si les lignes n'existent pas dans la Table B. Le diagramme suivant montre la vue (vue de façade) lue à la fois dans la table A et la table B.

    Le pipeline A a été supprimé et seul le pipeline B est exécuté

    À ce stade, vous pouvez supprimer l'abonnement Subscription A.

Lorsque des problèmes sont détectés avec un nouveau déploiement de pipeline, l'utilisation de pipelines en parallèle peut simplifier le rollback. Dans cet exemple, vous souhaiterez peut-être maintenir l'exécution du Pipeline A pendant que vous surveillez le bon fonctionnement du Pipeline B. En cas de problème avec le Pipeline B, vous pouvez effectuer un rollback et restaurer le Pipeline A.

Limites

Cette approche présente les limites suivantes :

  • Exécuter deux pipelines sur la même entrée risque de générer des données en double à la sortie. Le système en aval doit être informé des données en double et pouvoir les tolérer.
  • Lors de la lecture à partir d'une source Pub/Sub, il n'est pas recommandé d'utiliser le même abonnement pour plusieurs pipelines et peut entraîner des problèmes d'exactitude. Toutefois, dans certains cas d'utilisation, comme les pipelines d'extraction, de transformation et de chargement (ETL), utiliser le même abonnement dans deux pipelines peut réduire la duplication. Dans ce scénario, des problèmes d'autoscaling sont susceptibles de se produire, mais ils peuvent être atténués en utilisant la fonctionnalité de mise à jour des jobs en cours. Pour en savoir plus, consultez la section Affiner l'autoscaling pour vos pipelines de traitement par flux Pub/Sub.
  • Lors de la lecture à partir d'une source Pub/Sub, l'utilisation d'un deuxième abonnement génère des doublons, mais ne pose aucun problème d'exactitude et d'autoscaling des données.

Gérer les mutations de schéma

Les systèmes de gestion de données doivent souvent s'adapter aux mutations de schéma au fil du temps, parfois en raison de l'évolution des besoins métier ou pour des raisons techniques. L'application de mises à jour de schéma nécessite généralement une planification et une exécution minutieuses pour éviter de perturber les systèmes d'information de l'entreprise.

Prenons l'exemple d'un pipeline qui lit les messages contenant des charges utiles JSON à partir d'un sujet Pub/Sub. Le pipeline convertit chaque message en instance TableRow, puis écrit les lignes dans une table BigQuery. Le schéma de la table de sortie est semblable aux messages traités par le pipeline. Dans le diagramme suivant, le schéma est nommé Schéma A.

Pipeline qui lit un abonnement et écrit dans une table de sortie BigQuery à l'aide du schéma A

Au fil du temps, le schéma du message peut subir des mutations complexes. Par exemple, des champs sont ajoutés, supprimés ou remplacés. Le Schéma A évolue en un nouveau schéma. Dans la discussion qui suit, le nouveau schéma est nommé Schéma B. Dans ce cas, le pipeline A doit être mis à jour et le schéma de la table de sortie doit être compatible avec le schéma B.

Pour la table de sortie, vous pouvez effectuer certaines mutations de schéma sans temps d'arrêt. Pour donner un exemple, vous pouvez ajouter de nouveaux champs ou assouplir les modes de colonne, par exemple en remplaçant REQUIRED par NULLABLE, sans temps d'arrêt. Ces mutations n'ont généralement pas d'incidence sur les requêtes existantes. Cependant, les mutations de schéma qui modifient ou suppriment des champs de schéma existants interrompent les requêtes ou entraînent d'autres interruptions. L'approche suivante permet d'adapter les modifications sans nécessiter de temps d'arrêt.

Séparez les données écrites par le pipeline dans une table principale et dans une ou plusieurs tables de préproduction. La table principale stocke les données historiques écrites par le pipeline. Les tables de préproduction stockent les derniers résultats du pipeline. Vous pouvez définir une vue de façade BigQuery pour les tables principales et les tables de préproduction. Cela permet aux utilisateurs d'interroger aussi bien des données historiques que des données mises à jour.

Le diagramme suivant montre comment modifier le flux de pipeline précédent pour inclure une table de préproduction (Staging Table A), une table principale et une vue de façade.

Pipeline qui lit un abonnement et écrit dans une table de préproduction BigQuery Une deuxième table (principale) génère des résultats à partir d'une version précédente du schéma Une vue de façade lit à la fois la table de préproduction et la table principale

Dans le flux révisé, le pipeline A traite les messages qui utilisent le schéma A et écrit le résultat dans la table de préproduction A, qui dispose d'un schéma compatible. La table principale contient des données historiques écrites par des versions précédentes du pipeline, ainsi que des résultats fusionnés périodiquement à partir de la table de préproduction. Les utilisateurs peuvent interroger des données à jour et des données historiques en temps réel, à l'aide de la vue de façade.

Lorsque le schéma du message évolue du schéma A au schéma B, vous pouvez mettre à jour le code du pipeline pour qu'il soit compatible avec les messages qui utilisent le schéma B. Le pipeline existant doit être mis à jour avec la nouvelle implémentation. En exécutant des pipelines en parallèle, vous avez l'assurance que le traitement des flux de données se poursuit sans interruption. L'arrêt et le remplacement de pipelines entraînent une interruption du traitement, car aucun pipeline ne s'exécute pendant un certain temps.

Le pipeline mis à jour écrit dans une table de préproduction supplémentaire (table de préproduction B) qui utilise le schéma B. Vous pouvez utiliser un workflow orchestré pour créer la nouvelle table de préproduction avant de mettre à jour le pipeline. Mettez à jour la vue de façade de façon à inclure les résultats de la nouvelle table de préproduction, en utilisant éventuellement une étape de workflow associée.

Le diagramme suivant montre le flux mis à jour, la table de préproduction B avec le schéma B et la façon dont la vue de façade est mise à jour pour inclure le contenu de la table principale et des deux tables de préproduction.

Le pipeline utilise désormais le schéma B et écrit dans la table de préproduction B Une vue de façade est lue à partir de la table principale, de la table de préproduction A et de la table de préproduction B

Dans le cadre d'un processus distinct de la mise à jour du pipeline, vous pouvez fusionner les tables de préproduction dans la table principale, périodiquement ou selon les besoins. Le diagramme suivant montre la fusion de la table de préproduction A dans la table principale.

La table de préproduction A est fusionnée dans la table principale La vue de façade lit à partir de la table de préproduction B et de la table principale

Étapes suivantes