Migrer à partir du SDK Cloud Dataflow 1.x pour Java

Ce document décrit les principales modifications existant entre les versions 1.x et 2.x du SDK Cloud Dataflow pour Java.

Avis d'obsolescence concernant le SDK Cloud Dataflow : La version 2.5.0 du SDK Cloud Dataflow est la dernière version distincte du SDK Apache Beam. Le service Cloud Dataflow est entièrement compatible avec les versions officielles du SDK Apache Beam. Le service Cloud Dataflow est également compatible avec les versions précédentes du SDK Apache Beam depuis la version 2.0.0. Reportez-vous à la page d'aide de Cloud Dataflow pour connaître l'état de compatibilité de divers SDK. La page de téléchargements d'Apache Beam contient les notes de version pour les différentes versions du SDK Apache Beam.

Migrer de la version 1.x à la version 2.x

Pour installer et utiliser le SDK Apache Beam pour Java 2.x, reportez-vous au Guide d'installation du SDK Apache Beam.

Changements majeurs entre les versions 1.x et 2.x

REMARQUE : Tous les utilisateurs doivent être au courant de ces modifications pour pouvoir effectuer une mise à niveau vers les versions 2.x.

Renommage et restructuration des packages

Apache Beam a été étendu pour améliorer son fonctionnement avec des environnements autres que Google Cloud Platform. Dans ce cadre, le code du SDK a été renommé et restructuré.

Remplacement du nom com.google.cloud.dataflow par org.apache.beam

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-78

Le SDK est maintenant déclaré dans le package org.apache.beam au lieu de com.google.cloud.dataflow. Vous devez mettre à jour toutes vos instructions d'importation pour refléter cette modification.

Nouveaux sous-packages : runners.dataflow, runners.direct et io.gcp

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-77

Les fichiers exécuteurs ont été réorganisés dans leurs propres packages. Ainsi, de nombreux éléments de com.google.cloud.dataflow.sdk.runners ont été transférés vers org.apache.beam.runners.direct ou org.apache.beam.runners.dataflow.

Les options de pipeline spécifiques à l'exécution sur le service Cloud Dataflow ont été transférées de com.google.cloud.dataflow.sdk.options vers org.apache.beam.runners.dataflow.options.

La plupart des connecteurs d'E/S vers les services Google Cloud Platform ont été déplacés dans des sous-packages. Par exemple, BigQueryIO a été transféré de com.google.cloud.dataflow.sdk.io vers org.apache.beam.sdk.io.gcp.bigquery.

La plupart des IDE pourront vous aider à identifier les nouveaux emplacements. Pour vérifier le nouvel emplacement de fichiers spécifiques, vous pouvez utiliser t pour rechercher dans le code sur GitHub. Les versions 1.x du SDK Cloud Dataflow pour Java sont compilées à partir du code du dépôt GoogleCloudPlatform/DataflowJavaSDK (branche master-1.x). Les versions 2.x du SDK Cloud Dataflow pour Java correspondent au code du dépôt apache/beam.

Exécuteurs

Suppression de Pipeline dans les noms des exécuteurs

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1185

Les noms de tous les exécuteurs ont été raccourcis en supprimant l'élément Pipeline de chacun des noms. Par exemple, DirectPipelineRunner est désormais intitulé DirectRunner et DataflowPipelineRunner est maintenant intitulé DataflowRunner.

Obligation de paramétrer --tempLocation sur un chemin d'accès à Google Cloud Storage

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-430

Auparavant, Cloud Dataflow vous autorisait à ne spécifier que l'une des valeurs --stagingLocation ou --tempLocation et en déduisait automatiquement l'autre. Désormais, le service Cloud Dataflow exige que --gcpTempLocation soit défini et pointe vers un chemin d'accès Google Cloud Storage, mais celui-ci peut-être déduit du chemin plus général --tempLocation. Cette valeur sera également utilisée pour --stagingLocation, sauf si vous lui transmettez une valeur explicite.

Suppression de InProcessPipelineRunner

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-243

DirectRunner continue de fonctionner sur l'ordinateur local de l'utilisateur, mais prend désormais aussi en charge l'exécution multithread, les PCollections illimitées, et les déclencheurs pour les sorties présumées et retardées. Il s'aligne plus précisément sur le modèle Beam documenté et peut provoquer (à juste titre) davantage d'échecs dans les tests unitaires.

Comme cette fonctionnalité fait maintenant partie de DirectRunner, l'exécuteur InProcessPipelineRunner (SDK Cloud Dataflow 1.6+ pour Java) a été supprimé.

Remplacement de BlockingDataflowPipelineRunner par PipelineResult.waitToFinish()

Utilisateurs concernés : Tous | Impact : Erreur de compilation

BlockingDataflowPipelineRunner est désormais supprimé. Si votre code prévoit d'exécuter un pipeline et d'attendre sa conclusion, il doit alors utiliser DataflowRunner et appeler explicitement pipeline.run().waitToFinish().

Si vous avez utilisé --runner BlockingDataflowPipelineRunner dans la ligne de commande pour indiquer à votre programme principal d'attendre la fin du pipeline, c'est au programme principal de s'en préoccuper. Il devrait donc fournir une option telle que --blockOnRun qui lui indiquera d'appeler waitToFinish().

Remplacement de TemplatingDataflowPipelineRunner par --templateLocation

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-551

La fonctionnalité de TemplatingDataflowPipelineRunner (SDK Cloud Dataflow 1.9+ pour Java) a été remplacée par l'utilisation de --templateLocation avec DataflowRunner.

ParDo et DoFn

DoFn utilise des annotations au lieu des substitutions de méthodes

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-37

Pour améliorer les capacités de flexibilité et de personnalisation, DoFn utilise désormais des annotations de méthodes pour personnaliser le traitement au lieu de demander aux utilisateurs de remplacer des méthodes spécifiques.

Les différences entre la nouvelle et l'ancienne version de DoFn sont illustrées dans l'exemple de code suivant. Auparavant (avec le SDK Cloud Dataflow 1.x pour Java), votre code se présentait ainsi :

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

Désormais (avec le SDK Apache Beam 2.x pour Java), votre code ressemblera à ceci :

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

Si votre DoFn accède à ProcessContext#window(), une modification supplémentaire intervient. Au lieu de ceci :

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

vous écrirez désormais :

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

ou :

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

Le moteur d'exécution fournira automatiquement la fenêtre pour votre DoFn.

Réutilisation des DoFn pour plusieurs lots

Utilisateurs concernés : Tous | Impact : Peut provoquer des résultats inattendus en mode silencieux | Référence JIRA : BEAM-38

Afin d'améliorer les performances, le même DoFn peut maintenant être réutilisé pour traiter plusieurs lots d'éléments (bundles) au lieu de garantir une nouvelle instance par lot. Tout DoFn qui conserve l'état local (par exemple, les variables d'instance) au-delà de la fin d'un lot peut subir des changements de comportement : en effet, le lot suivant démarre dans cet état au lieu d'un état vierge.

Pour gérer le cycle de vie, de nouvelles méthodes @Setup et @Teardown ont été introduites. Le cycle de vie complet fonctionne comme indiqué ci-dessous (notez qu'une défaillance peut interrompre ce cycle de vie à tout moment) :

  • @Setup : initialisation du DoFn par instance (par exemple : ouverture de connexions réutilisables).
  • Un nombre quelconque de répétitions de la séquence suivante :
    • @StartBundle : initialisation par lot (par exemple : réinitialisation de l'état du DoFn).
    • @ProcessElement : traitement usuel de l'élément.
    • @FinishBundle : étapes finales par lot (par exemple : élimination des effets secondaires).
  • @Teardown : suppression par instance des ressources détenues par le DoFn (par exemple : fermeture des connexions réutilisables).

Remarque : En pratique, ce changement ne devrait avoir qu'un impact limité. Toutefois, il ne génère pas d'erreur à la compilation, ce qui peut entraîner des résultats inattendus sans avertissement.

Modification de l'ordre des paramètres à la spécification des entrées ou sorties secondaires

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1422

Le DoFn doit désormais systématiquement être spécifié en premier lors de l'application de ParDo. Au lieu d'écrire :

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

vous écrirez désormais :

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransforms

Suppression de .named()

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-370

Supprimez les méthodes .named() des PTransforms et de leurs sous-classes. Utilisez désormais PCollection.apply(“name”, PTransform).

Remplacement du nom PTransform.apply() par PTransform.expand()

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-438

PTransform.apply() s'appelle désormais PTransform.expand() pour éviter toute confusion avec PCollection.apply(). Si vous avez réécrit certaines transformations composites, vous devrez remplacer toutes les méthodes apply() que vous avez modifiées par expand(). La façon de construire les pipelines n'est pas affectée.

Modifications majeures supplémentaires

Vous trouverez ci-dessous une liste des modifications majeures supplémentaires et des modifications à venir.

Modifications individuelles de l'API

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-725

Suppression des GcpOptions suivantes : TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

Utilisez GoogleCredentials.fromStream(InputStream for credential). Le flux peut contenir un fichier de clé de compte de service au format JSON provenant de Google Developers Console ou bien des informations d'identification d'utilisateur enregistrées au format pris en charge par le SDK Cloud.

Remplacement de --enableProfilingAgent par --saveProfilesToGcs

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1122

Déplacement de --update vers DataflowPipelineOptions

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-81

Déplacez --update PipelineOption vers DataflowPipelineOptions de DataflowPipelineDebugOptions.

Suppression de BoundedSource.producesSortedKeys()

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1201

Supprimez producesSortedKeys() de BoundedSource.

Modification de l'API PubsubIO

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-974, BEAM-1415

À partir de la version 2.0.0-beta2, PubsubIO.Read et PubsubIO.Write doivent être instanciés à l'aide de PubsubIO.<T>read() et PubsubIO.<T>write() au lieu des méthodes de fabrique statiques telles que PubsubIO.Read.topic(String).

Les méthodes de configuration de PubsubIO ont été renommées. Par exemple, PubsubIO.read().topic(String) est devenue PubsubIO.read().fromTopic(). De même : subscription() devient fromSubscription(), timestampLabel et idLabel deviennent respectivement withTimestampAttribute et withIdAttribute, et PubsubIO.write().topic() devient PubsubIO.write().to().

Au lieu de spécifier Coder pour l'analyse de la charge de messages, PubsubIO offre des fonctions de lecture et d'écriture de chaînes, de messages Avro et de messages Protobuf, par exemple PubsubIO.readStrings() et PubsubIO.writeAvros(). Pour lire et écrire des types personnalisés, utilisez PubsubIO.read/writeMessages() (et PubsubIO.readMessagesWithAttributes si les attributs de message doivent être inclus) et utilisez ParDo ou MapElements pour convertir votre type personnalisé en PubsubMessage et vice-versa.

Suppression de la compatibilité DatastoreIO pour l'API v1beta2 non compatible

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-354

DatastoreIO repose désormais sur l'API Cloud Datastore v1.

Modification de DisplayData.Builder

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-745

DisplayData.Builder.include(..) possède un nouveau paramètre de chemin requis pour l'enregistrement des données d'affichage des sous-composants. Les API Builder renvoient maintenant un objet DisplayData.ItemSpec<> plutôt que DisplayData.Item.

Obligation de fournir FileBasedSink.getWriterResultCoder()

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-887

FileBasedSink.getWriterResultCoder a été transformé en méthode abstraite, qui doit être fournie.

Remplacement du nom Filter.byPredicate() par Filter.by()

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-342

Suppression de IntraBundleParallelization

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-414

Remplacement du nom RemoveDuplicates par Distinct

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-239

Modification de TextIO pour utiliser une syntaxe différente et fonctionner uniquement sur des chaînes

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1354

TextIO.Read.from() devient TextIO.read().from(), de même que TextIO.Write.to() devient TextIO.write().to().

Désormais, TextIO.Read renvoie toujours une PCollection<String> et ne prend pas .withCoder() pour analyser les chaînes. Au lieu de cela, analysez les chaînes en appliquant ParDo ou MapElements à la collection. De même, TextIO.Write prend désormais toujours en entrée une PCollection<String>. Pour écrire autre chose dans TextIO, convertissez-la en String à l'aide de ParDo ou MapElements.

Modification de AvroIO pour utiliser une syntaxe différente

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1402

Pour lire et écrire des types générés par Avro, AvroIO.Read.from().withSchema(Foo.class) est remplacé par AvroIO.read(Foo.class).from(), et il en va de même pour AvroIO.Write.

Pour lire et écrire des enregistrements génériques Avro à l'aide d'un schéma spécifié, AvroIO.Read.from().withSchema(Schema or String) est remplacé par AvroIO.readGenericRecords().from(), et il en va de même pour AvroIO.Write.

Modification de KafkaIO pour spécifier explicitement les paramètres de type et utiliser les sérialiseurs/désérialiseurs Kafka

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1573, BEAM-2221

Dans KafkaIO, vous devez maintenant spécifier explicitement les paramètres de type clé et valeur : par exemple, KafkaIO.<Foo, Bar>read() et KafkaIO.<Foo, Bar>write().

Au lieu d'utiliser Coder pour interpréter les octets de clé et de valeur, utilisez les classes Kafka standard Serializer et Deserializer. Par exemple : au lieu de KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), utilisez KafkaIO.read().withKeyDeserializer(StringDeserializer.class), et il en va de même pour KafkaIO.write().

Changement de syntaxe pour BigQueryIO

Utilisateurs concernés : Tous | Impact  Erreur de compilation | Référence JIRA : BEAM-1427

Au lieu de BigQueryIO.Read.from() et BigQueryIO.Write.to(), utilisez BigQueryIO.read().from() et BigQueryIO.write().to().

Changement de syntaxe pour KinesisIO.Read

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1428

Au lieu de KinesisIO.Read.from().using(), utilisez KinesisIO.read().from().withClientProvider().

Changement de syntaxe pour TFRecordIO

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1913

Au lieu de TFRecordIO.Read.from() et TFRecordIO.Write.to(), utilisez TFRecordIO.read().from() et TFRecordIO.write().to().

Consolidation de XmlSource et XmlSink sous XmlIO

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1914

Au lieu d'utiliser directement XmlSource et XmlSink, utilisez XmlIO.

Exemple : Au lieu de Read.from(XmlSource.from()), utilisez XmlIO.read().from() ; au lieu de Write.to(XmlSink.writeOf()), utilisez XmlIO.write().to().

Remplacement du nom CountingInput par GenerateSequence et généralisation

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1414

Au lieu de CountingInput.unbounded(), utilisez GenerateSequence.from(0). Au lieu de CountingInput.upTo(n), utilisez GenerateSequence.from(0).to(n).

Modification de Count, Latest, Sample

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1417, BEAM-1421, BEAM-1423

Les classes Count.PerElement, Count.PerKey, Count.Globally sont désormais privées. Vous devez donc utiliser les fonctions de fabrique telles que Count.perElement() (là où vous pouviez précédemment utiliser new Count.PerElement()). De plus, si vous voulez par exemple utiliser .withHotKeyFanout() sur le résultat de la transformation, vous ne pouvez plus le faire directement sur le résultat de fonctions telles que .apply(Count.perElement()). Au lieu de cela, Count expose sa fonction de combinaison en tant que Count.combineFn() et vous devez appliquer Combine.globally(Count.combineFn()) vous-même.

Des modifications similaires s'appliquent aux transformations Latest et Sample.

Modification de l'ordre des paramètres pour MapElements et FlatMapElements

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1418

Lorsque vous utilisez MapElements et FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), le descripteur doit maintenant être spécifié en premier : par exemple, FlatMapElements.into(descriptor).via(fn).

Modification de Window pour la configuration de paramètres supplémentaires

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1425

Lorsque vous utilisez Window pour configurer autre chose que WindowFn proprement dit (Window.into()), utilisez Window.configure(). Par exemple : au lieu de Window.triggering(...), utilisez Window.configure().triggering(...).

Remplacement du nom Write.Bound par Write

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1416

La classe Write.Bound est désormais simplement appelée Write. Cela n'a d'importance que si vous extrayiez des applications de Write.to(Sink) vers une variable : son type était précédemment Write.Bound<...>. Il s'agit désormais de Write<...>.

Renommage des classes de transformation Flatten

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1419

Les classes Flatten.FlattenIterables et Flatten.FlattenPCollectionList sont respectivement renommées Flatten.Iterables et Flatten.PCollections.

Séparation de GroupByKey.create(boolean) en deux méthodes

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1420

GroupByKey.create(boolean fewKeys) est désormais simplement séparée en GroupByKey.create() et GroupByKey.createWithFewKeys().

Modification de SortValues

Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1426

Le nom des mutateurs de BufferedExternalSorter.Options passe de setSomeProperty à withSomeProperty.

Dépendance supplémentaire à l'API Google

À partir de la version 2.0.0 du SDK, vous devez également activer l'API Cloud Resource Manager.

Dépendances mises à niveau

La version 2.x met à niveau les versions épinglées de la plupart des dépendances, y compris Avro, Protobuf et gRPC. Certaines de ces dépendances peuvent avoir apporté leurs propres modifications majeures, ce qui peut poser problème si votre code dépend également directement de la dépendance. Vous pouvez obtenir la liste complète des versions utilisées dans la version 2.0.0 dans le fichier pom.xml ou à travers mvn dependency:tree.

Refactorisations internes

Des changements importants ont été apportés à la structure interne du SDK. Tous les utilisateurs qui s'appuyaient sur des éléments allant au-delà de l'API publique (tels que les classes ou méthodes se terminant en Internal ou figurant dans des paquets util) trouveront sans doute que les choses ont changé de manière significative.

Si vous utilisiez StateInternals ou TimerInternals, sachez que ces API internes ont été supprimées. Vous pouvez désormais utiliser les API expérimentales State et Timer pour DoFn.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.