Mettre à jour un pipeline existant

Les SDK Apache Beam permettent de mettre à jour une tâche de streaming en continu sur le service géré Cloud Dataflow avec un nouveau code de pipeline.

Différentes raisons peuvent justifier la mise à jour d'une tâche Cloud Dataflow existante :

  • Vous souhaitez retoucher ou améliorer le code de votre pipeline.
  • Vous voulez corriger des bugs dans le code de votre pipeline.
  • Vous souhaitez mettre à jour votre pipeline pour gérer des modifications au niveau du format des données, ou pour tenir compte d'une nouvelle version ou d'autres modifications au niveau de votre source de données.

Lorsque vous mettez à jour votre tâche, le service Cloud Dataflow effectue une vérification de compatibilité entre votre tâche en cours d'exécution et la tâche potentiellement amenée à la remplacer. La vérification de compatibilité garantit que des éléments tels que les informations d'état intermédiaires et les données en mémoire tampon peuvent être transférés de votre tâche précédente vers la tâche de remplacement.

Le processus de mise à jour et ses effets

Lorsque vous mettez à jour une tâche sur le service Cloud Dataflow, vous remplacez la tâche existante par une nouvelle tâche exécutant le code de votre pipeline mis à jour. Le service Cloud Dataflow conserve le nom de la tâche, mais exécute la tâche de remplacement avec un jobId mis à jour.

La tâche de remplacement conserve toutes les données d'état intermédiaires de la tâche précédente, ainsi que tous les enregistrements de données ou métadonnées en mémoire tampon actuellement en cours de transfert depuis la tâche précédente. Par exemple, certains enregistrements de votre pipeline peuvent être mis en mémoire tampon en attendant la résolution d'une fenêtre.

Données en cours de transfert

Les données en cours de transfert seront toujours traitées par les transformations de votre nouveau pipeline. Toutefois, les transformations supplémentaires que vous intégrez au code de votre pipeline de remplacement peuvent être appliquées ou non, suivant l'endroit où les enregistrements sont mis en mémoire tampon. Par exemple, supposons que votre pipeline existant comporte les transformations suivantes :

Java : SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Java : SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(FormatStrings().named("Format"));

Vous pouvez remplacer votre tâche par un code de pipeline mis à jour, comme ci-dessous :

Java : SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Java : SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(RemoveStringsStartingWithA().named("Remove"))
   .apply(FormatStrings().named("Format"));

Même si vous avez ajouté une transformation pour filtrer les chaînes commençant par la lettre "A", la transformation suivante (FormatStrings) peut toujours voir des chaînes en mémoire tampon ou en cours de transfert qui commencent par la lettre "A" et qui proviennent de la tâche précédente.

Modifier le fenêtrage

Vous pouvez modifier les stratégies de fenêtrage et de déclenchement des PCollections dans votre pipeline de remplacement, mais restez prudent. La modification des stratégies de fenêtrage ou de déclenchement n'affecte pas les données déjà mises en mémoire tampon ou en cours de transfert.

Nous vous recommandons de ne tenter que des modifications mineures sur le fenêtrage de votre pipeline, par exemple changer la durée des fenêtres à durée fixe ou flexible. Des changements majeurs sur le fenêtrage ou les déclencheurs (par exemple des modifications de l'algorithme de fenêtrage) peuvent avoir des résultats imprévisibles sur la sortie de votre pipeline.

Lancer votre tâche de remplacement

Pour mettre à jour votre tâche, vous devez lancer une nouvelle tâche remplaçant celle en cours. Lorsque vous lancez votre tâche de remplacement, l'exécution du processus de mise à jour nécessite de définir les options de pipeline suivantes (en plus des options habituelles de la tâche) :

  • Passez l'option --update.
  • Définissez l'option --jobName (dans PipelineOptions) pour qu'elle porte le même nom que la tâche devant être mise à jour.
  • Si les noms de certaines transformations de votre pipeline ont changé, vous devez fournir un mappage des transformations par le biais de l'option --transformNameMapping.

Spécifier le nom de la tâche (jobName)

Lorsque vous lancez votre tâche de remplacement, la valeur que vous transmettez pour l'option --jobName doit correspondre exactement au nom de la tâche que vous souhaitez remplacer. Pour identifier la valeur exacte de ce nom de tâche, sélectionnez votre tâche précédente dans l'interface de surveillance de Cloud Dataflow et recherchez le champ Job Name (Nom de la tâche) dans l'onglet Summary (Résumé) :

Figure 1 : Onglet "Summary" (Résumé) pour une tâche Cloud Dataflow en cours d'exécution, avec le champ "Job Name" (Nom de la tâche) mis en exergue.

Vous pouvez également lancer une requête sur une liste de tâches existantes à l'aide de l'interface de ligne de commande Cloud Dataflow. Saisissez la commande gcloud alpha dataflow jobs list dans votre fenêtre d'interface système ou de terminal pour obtenir la liste des tâches Cloud Dataflow de votre projet Google Cloud Platform. Ensuite, recherchez le champ NAME de la tâche que vous souhaitez remplacer :

ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
2015-07-28_17_02_27-7257409117866690674   windowedwordcount-johndoe-0729000214 Streaming  2015-07-28 17:02:28  Running

Créer le mappage des transformations

Si votre pipeline de remplacement a modifié le nom de certaines transformations par rapport aux noms figurant dans le pipeline précédent, le service Cloud Dataflow nécessite un mappage des transformations. Le mappage des transformations crée une correspondance entre les noms des transformations dans le code de votre pipeline précédent et les noms figurant dans le code de votre pipeline de remplacement. Vous pouvez transmettre ce mappage au moyen de l'option de ligne de commande --transformNameMapping, en utilisant le format général suivant :

  --transformNameMapping={"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Dans --transformNameMapping, vous devez uniquement fournir les entrées de mappage pour les noms de transformations qui ont changé entre votre pipeline précédent et votre pipeline de remplacement.

Déterminer les noms des transformations

Le nom de la transformation dans chaque instance du mappage est le nom que vous avez fourni lorsque vous avez appliqué la transformation dans votre pipeline. Exemple :

Java : SDK 2.x

.apply("FormatResults", ParDo
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Java : SDK 1.x

.apply(ParDo
  .named("FormatResults")
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Vous pouvez également obtenir les noms des transformations de votre tâche précédente en examinant le graphique d'exécution de la tâche dans l'interface de surveillance de Cloud Dataflow :

Figure  2 : Graphique d'exécution d'un pipeline WordCount tel qu'affiché dans l'interface de surveillance de Cloud Dataflow.

Attributions des noms dans les transformations composites

Les noms des transformations sont hiérarchiques et basés sur la hiérarchie des transformations de votre pipeline. Si votre pipeline comporte une transformation composite, les transformations imbriquées sont nommées en fonction de la transformation qui les contient. Par exemple, supposons que votre pipeline contient une transformation composite nommée CountWidgets, qui contient elle-même une transformation interne nommée Parse. Le nom complet de la transformation interne est CountWidgets/Parse, et vous devez spécifier ce nom complet dans votre mappage des transformations.

Si votre nouveau pipeline mappe une transformation composite vers un nom différent, toutes les transformations imbriquées sont automatiquement renommées. Vous devez spécifier les noms modifiés pour les transformations internes dans votre mappage des transformations.

Refactoriser la hiérarchie des transformations

Si votre pipeline de remplacement utilise une hiérarchie de transformations différente de celle de votre pipeline précédent (par exemple, parce que vous avez refactorisé vos transformations composites ou que votre pipeline dépend d'une transformation composite provenant d'une bibliothèque qui a été modifiée), vous devez déclarer explicitement le mappage.

Par exemple, supposons que votre pipeline précédent ait appliqué une transformation composite, CountWidgets, qui contenait une transformation interne nommée Parse. Maintenant, supposons que votre pipeline de remplacement refactorise CountWidgets et imbrique Parse dans une autre transformation appelée Scan. Pour que votre mise à jour réussisse, vous devez mapper explicitement le nom complet de la transformation dans le pipeline précédent (CountWidgets/Parse) vers le nom de la transformation dans le nouveau pipeline (CountWidgets/Scan/Parse) :

  --transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Si vous supprimez entièrement une transformation dans votre pipeline de remplacement, vous devez fournir un mappage null. Supposons que notre pipeline de remplacement supprime entièrement la transformation CountWidgets/Parse :

  --transformNameMapping={"CountWidgets/Parse":""}

Vérifier la compatibilité des tâches

Lorsque vous lancez votre tâche de remplacement, le service Cloud Dataflow effectue une vérification de compatibilité entre votre tâche de remplacement et la tâche précédente. Si la vérification de compatibilité réussit, la tâche précédente est arrêtée. Votre tâche de remplacement est alors lancée sur le service Cloud Dataflow avec le même nom de tâche. Si la vérification de compatibilité échoue, l'exécution de la tâche précédente continue sur le service Cloud Dataflow et la tâche de remplacement renvoie une erreur.

La vérification de compatibilité garantit que le service Cloud Dataflow peut transférer des données d'état intermédiaires provenant des étapes de votre tâche précédente vers votre tâche de remplacement, en suivant les spécifications du mappage des transformations que vous avez fournies. La vérification de compatibilité garantit également que les PCollections de votre pipeline utilisent les mêmes codeurs. Changer un Coder peut entraîner l'échec de la vérification de compatibilité, car les données en cours de transfert ou les enregistrements en mémoire tampon risquent de ne pas être sérialisés correctement dans le pipeline de remplacement.

Prévenir les ruptures de compatibilité

Certaines différences entre votre pipeline précédent et votre pipeline de remplacement peuvent entraîner l'échec de la vérification de compatibilité, Ces différences incluent les suivantes :

  • Changer le graphique du pipeline sans fournir de mappage. Lorsque vous mettez à jour une tâche, le service Cloud Dataflow tente de faire correspondre les transformations de votre tâche précédente aux transformations de la tâche de remplacement, afin de transférer les données d'état intermédiaires pour chaque étape. Si vous avez renommé ou supprimé des étapes, vous devez fournir un mappage des transformations pour que Cloud Dataflow soit en mesure de faire correspondre les données d'état.
  • Changer les entrées secondaires d'une étape. L'ajout ou la suppression d'entrées secondaires pour une transformation dans votre pipeline de remplacement entraîne l'échec de la vérification de compatibilité.
  • Changer le codeur pour une étape. Lorsque vous mettez à jour une tâche, le service Cloud Dataflow conserve tous les enregistrements de données actuellement en mémoire tampon (par exemple, pendant la résolution du fenêtrage) et les gère dans la tâche de remplacement. Si la tâche de remplacement utilise un encodage de données différent ou incompatible, le service Cloud Dataflow ne peut pas sérialiser ou désérialiser ces enregistrements.
  • Vous avez supprimé de votre pipeline une opération "avec état". La vérification de compatibilité Cloud Dataflow de votre tâche de remplacement peut échouer si vous supprimez certaines opérations avec état de votre pipeline. Le service Cloud Dataflow peut fusionner plusieurs étapes pour plus d'efficacité. Si vous avez supprimé une opération dépendante de l'état d'une étape fusionnée, la vérification échoue. Les opérations avec état comprennent :

    • les transformations qui produisent ou consomment des entrées secondaires ;
    • les lectures d'E/S ;
    • les transformations utilisant des états à clés ;
    • les transformations avec fusion de fenêtres.
  • Vous essayez d'exécuter votre tâche de remplacement dans une zone géographique différente. Vous devez exécuter votre tâche de remplacement dans la même zone que votre tâche précédente.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.