Ce document décrit les principales modifications existant entre les versions 1.x et 2.x du SDK Dataflow pour Java.
Avis d'obsolescence concernant le SDK Dataflow : La version 2.5.0 du SDK Dataflow est la dernière version distincte du SDK Apache Beam. Le service Dataflow est entièrement compatible avec les versions officielles du SDK Apache Beam. Il est également compatible avec les versions précédentes du SDK Apache Beam depuis la version 2.0.0. Reportez-vous à la page d'assistance de Dataflow pour connaître l'état de compatibilité des différentes versions du SDK. La page de téléchargements d'Apache Beam contient les notes de version des différentes versions du SDK Apache Beam.
Migrer de la version 1.x à la version 2.x
Pour installer et utiliser le SDK Apache Beam pour Java 2.x, reportez-vous au Guide d'installation du SDK Apache Beam.
Changements majeurs entre les versions 1.x et 2.x
REMARQUE : Tous les utilisateurs doivent être au courant de ces modifications pour pouvoir effectuer une mise à niveau vers les versions 2.x.
Renommage et restructuration des packages
Apache Beam a été étendu pour améliorer son fonctionnement avec des environnements autres que Google Cloud Platform. Dans ce cadre, le code du SDK a été renommé et restructuré.
Remplacement du nom com.google.cloud.dataflow
par org.apache.beam
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-78
Le SDK est maintenant déclaré dans le package org.apache.beam
au lieu de com.google.cloud.dataflow
. Vous devez mettre à jour toutes vos instructions d'importation pour refléter cette modification.
Nouveaux sous-packages : runners.dataflow
, runners.direct
et io.gcp
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-77
Les fichiers exécuteurs ont été réorganisés dans leurs propres packages. Ainsi, de nombreux éléments de com.google.cloud.dataflow.sdk.runners
ont été transférés vers org.apache.beam.runners.direct
ou org.apache.beam.runners.dataflow
.
Les options de pipeline spécifiques à l'exécution sur le service Dataflow ont été transférées de com.google.cloud.dataflow.sdk.options
vers org.apache.beam.runners.dataflow.options
.
La plupart des connecteurs d'E/S vers les services Google Cloud Platform ont été déplacés dans des sous-packages. Par exemple, BigQueryIO a été transféré de com.google.cloud.dataflow.sdk.io
vers org.apache.beam.sdk.io.gcp.bigquery
.
La plupart des IDE pourront vous aider à identifier les nouveaux emplacements. Pour vérifier le nouvel emplacement de certains fichiers, vous pouvez utiliser t
pour rechercher le code sur GitHub. Les versions 1.x du SDK Dataflow pour Java sont compilées à partir du code du dépôt GoogleCloudPlatform/DataflowJavaSDK (branche master-1.x
). Les versions 2.x du SDK Dataflow pour Java correspondent au code du dépôt apache/beam.
Exécuteurs
Suppression de Pipeline
dans les noms des exécuteurs
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1185
Les noms de tous les exécuteurs ont été raccourcis en supprimant l'élément Pipeline
de chacun des noms. Par exemple, DirectPipelineRunner
est désormais intitulé DirectRunner
et DataflowPipelineRunner
est maintenant intitulé DataflowRunner
.
Obligation de paramétrer --tempLocation
sur un chemin d'accès à Google Cloud Storage
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-430
Auparavant, Dataflow vous autorisait à ne spécifier que l'une des valeurs --stagingLocation
et --tempLocation
et en déduisait automatiquement l'autre. Désormais, le service Dataflow exige que --gcpTempLocation
soit défini et pointe vers un chemin d'accès Google Cloud Storage, mais celui-ci peut-être déduit du chemin plus général --tempLocation
.
Cette valeur sera également utilisée pour --stagingLocation
, sauf si vous lui transmettez une valeur explicite.
Suppression de InProcessPipelineRunner
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-243
DirectRunner
continue de fonctionner sur l'ordinateur local de l'utilisateur, mais prend désormais aussi en charge l'exécution multithread, les PCollections illimitées, et les déclencheurs pour les sorties présumées et retardées. Il s'aligne plus précisément sur le modèle Beam documenté et peut provoquer (à juste titre) davantage d'échecs dans les tests unitaires.
Comme cette fonctionnalité fait maintenant partie de DirectRunner
, l'exécuteur InProcessPipelineRunner
(SDK Dataflow 1.6+ pour Java) a été supprimé.
Texte BlockingDataflowPipelineRunner
remplacé par PipelineResult.waitToFinish()
Utilisateurs concernés : Tous | Impact : Erreur de compilation
BlockingDataflowPipelineRunner
est désormais supprimé. Si votre code prévoit d'exécuter un pipeline et d'attendre sa conclusion, il doit alors utiliser DataflowRunner
et appeler explicitement pipeline.run().waitToFinish()
.
Si vous avez utilisé --runner BlockingDataflowPipelineRunner
dans la ligne de commande pour indiquer à votre programme principal d'attendre la fin du pipeline, c'est au programme principal de s'en préoccuper. Il devrait donc fournir une option telle --blockOnRun
qui lui indiquera d'appeler waitToFinish()
.
Texte TemplatingDataflowPipelineRunner
remplacé par --templateLocation
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-551
La fonctionnalité de TemplatingDataflowPipelineRunner
(SDK Dataflow 1.9+ pour Java) a été remplacée par l'utilisation de --templateLocation
avec DataflowRunner
.
ParDo et DoFn
DoFn
utilise des annotations au lieu des substitutions de méthodes
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-37
Pour plus de flexibilité et de personnalisation, DoFn
utilise désormais des annotations de méthode pour personnaliser le traitement au lieu d'obliger les utilisateurs à substituer des méthodes spécifiques.
Les différences entre la nouvelle et l'ancienne version de DoFn
sont illustrées dans l'exemple de code suivant. Auparavant (avec le Cloud Dataflow 1.x pour Java), votre code se présentait ainsi :
new DoFn<Foo, Baz>() {
@Override
public void processElement(ProcessContext c) { … }
}
Désormais (avec le SDK Apache Beam 2.x pour Java), votre code ressemblera à ceci :
new DoFn<Foo, Baz>() {
@ProcessElement // <-- This is the only difference
public void processElement(ProcessContext c) { … }
}
Si votre DoFn
accède à ProcessContext#window()
, une modification supplémentaire intervient. Au lieu de ceci :
public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) {
… (MyWindowType) c.window() …
}
}
vous écrirez désormais :
public class MyDoFn extends DoFn<Foo, Baz> {
@ProcessElement
public void processElement(ProcessContext c, MyWindowType window) {
… window …
}
}
ou :
return new DoFn<Foo, Baz>() {
@ProcessElement
public void processElement(ProcessContext c, MyWindowType window) {
… window …
}
}
Le moteur d'exécution fournira automatiquement la fenêtre pour votre DoFn
.
Réutilisation des DoFn
pour plusieurs lots
Utilisateurs concernés : Tous | Impact : Peut provoquer des résultats inattendus en mode silencieux | Référence JIRA : BEAM-38
Pour améliorer les performances, le même DoFn
peut désormais être réutilisé pour traiter plusieurs lots d'éléments, au lieu de garantir une nouvelle instance par lot. Tout DoFn
qui conserve un état local (par exemple, des variables d'instance) au-delà de la fin d'un lot peut entraîner des changements de comportement, car le lot suivant débutera dans cet état plutôt qu'avec une nouvelle copie.
Pour gérer le cycle de vie, de nouvelles méthodes @Setup
et @Teardown
ont été introduites.
Le cycle de vie complet fonctionne comme indiqué ci-dessous (notez qu'une défaillance peut interrompre ce cycle de vie à tout moment) :
@Setup
: initialisation duDoFn
par instance (par exemple : ouverture de connexions réutilisables).- Un nombre quelconque de répétitions de la séquence suivante :
@StartBundle
: initialisation par lot (par exemple : réinitialisation de l'état duDoFn
).@ProcessElement
: traitement habituel de l'élément.@FinishBundle
: étapes finales par lot (par exemple : élimination des effets secondaires).
@Teardown
: suppression par instance des ressources détenues par leDoFn
(par exemple : fermeture des connexions réutilisables).
Remarque : En pratique, ce changement ne devrait avoir qu'un impact limité. Toutefois, il ne génère pas d'erreur à la compilation, ce qui peut entraîner des résultats inattendus sans avertissement.
Modification de l'ordre des paramètres à la spécification des entrées ou sorties secondaires
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1422
Le DoFn
doit désormais systématiquement être spécifié en premier lors de l'application de ParDo. Au lieu d'écrire :
foos.apply(ParDo
.withSideInputs(sideA, sideB)
.withOutputTags(mainTag, sideTag)
.of(new MyDoFn()))
vous écrirez désormais :
foos.apply(ParDo
.of(new MyDoFn())
.withSideInputs(sideA, sideB)
.withOutputTags(mainTag, sideTag))
PTransforms
.named()
a été supprimé
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-370
Supprimez les méthodes .named()
des PTransforms et de leurs sous-classes. Utilisez désormais PCollection.apply(“name”, PTransform)
.
Remplacement du nom PTransform.apply()
par PTransform.expand()
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-438
PTransform.apply()
a été renommé PTransform.expand()
pour éviter toute confusion avec PCollection.apply()
. Si vous avez réécrit certaines transformations composites, vous devrez remplacer toutes les méthodes apply()
que vous avez modifiées par expand()
. La façon de construire les pipelines n'est pas affectée.
Modifications majeures supplémentaires
Vous trouverez ci-dessous une liste des modifications majeures supplémentaires et des modifications à venir.
Modifications individuelles de l'API
Suppression de --credentialDir
, --tokenServerUrl
et des options associées
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-725
Suppression des GcpOptions
suivantes : TokenServerUrl
, CredentialDir
, CredentialId
, SecretsFile
, ServiceAccountName
, ServiceAccountKeyFile
.
Utilisez GoogleCredentials.fromStream(InputStream for credential)
. Le flux peut contenir un fichier de clé de compte de service au format JSON provenant de Google Developers Console ou bien des informations d'identification d'utilisateur enregistrées au format pris en charge par le SDK Cloud.
Remplacement de --enableProfilingAgent
par --saveProfilesToGcs
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1122
Déplacement de --update
vers DataflowPipelineOptions
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-81
Déplacez --update PipelineOption
vers DataflowPipelineOptions
de DataflowPipelineDebugOptions
.
BoundedSource.producesSortedKeys()
a été supprimé
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1201
Supprimez producesSortedKeys()
de BoundedSource
.
Modification de l'API PubsubIO
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-974, BEAM-1415
À partir de la version 2.0.0-beta2, PubsubIO.Read
et PubsubIO.Write
doivent être instanciés à l'aide de PubsubIO.<T>read()
et PubsubIO.<T>write()
au lieu des méthodes de fabrique statiques telles que PubsubIO.Read.topic(String)
.
Les méthodes de configuration de PubsubIO
ont été renommées. Par exemple, PubsubIO.read().topic(String)
est devenue PubsubIO.read().fromTopic()
. De même : subscription()
devient fromSubscription()
, timestampLabel
et idLabel
deviennent respectivement withTimestampAttribute
et withIdAttribute
, et PubsubIO.write().topic()
devient PubsubIO.write().to()
.
Au lieu de spécifier Coder
pour l'analyse de la charge de messages, PubsubIO
offre des fonctions de lecture et d'écriture de chaînes, de messages Avro et de messages Protobuf, par exemple PubsubIO.readStrings()
et PubsubIO.writeAvros()
. Pour lire et écrire des types personnalisés, utilisez PubsubIO.read/writeMessages()
(et PubsubIO.readMessagesWithAttributes
si les attributs de message doivent être inclus) et utilisez ParDo
ou MapElements
pour convertir votre type personnalisé en PubsubMessage
et vice-versa.
Suppression de la compatibilité DatastoreIO pour l'API v1beta2 non compatible
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-354
DatastoreIO repose désormais sur l'API Cloud Datastore v1.
Modification de DisplayData.Builder
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-745
DisplayData.Builder.include(..)
possède un nouveau paramètre de chemin requis pour l'enregistrement des données d'affichage des sous-composants. Les API Builder renvoient maintenant un objet DisplayData.ItemSpec<>
plutôt que DisplayData.Item
Obligation de fournir FileBasedSink.getWriterResultCoder()
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-887
FileBasedSink.getWriterResultCoder
a été transformé en méthode abstraite, qui doit être fournie.
Remplacement du nom Filter.byPredicate()
par Filter.by()
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-342
IntraBundleParallelization
a été supprimé
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-414
Remplacement du nom RemoveDuplicates
par Distinct
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-239
Modification de TextIO
pour utiliser une syntaxe différente et ne fonctionner que sur des chaînes
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1354
TextIO.Read.from()
devient TextIO.read().from()
, de même que TextIO.Write.to()
devient TextIO.write().to()
.
Désormais, TextIO.Read
renvoie toujours une PCollection<String>
et ne prend pas .withCoder()
pour analyser les chaînes. Au lieu de cela, analysez les chaînes en appliquant ParDo
ou MapElements
à la collection.
De même, TextIO.Write
prend désormais toujours en entrée une PCollection<String>
. Pour écrire autre chose dans TextIO
, convertissez-la en String
à l'aide de ParDo
ou MapElements
.
Modification de AvroIO
pour utiliser une syntaxe différente
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1402
Pour lire et écrire des types générés par Avro, AvroIO.Read.from().withSchema(Foo.class)
est remplacé par AvroIO.read(Foo.class).from()
, et il en va de même pour AvroIO.Write
.
Pour lire et écrire des enregistrements génériques Avro à l'aide d'un schéma spécifié, AvroIO.Read.from().withSchema(Schema or String)
est remplacé par AvroIO.readGenericRecords().from()
, et il en va de même pour AvroIO.Write
.
Modification de KafkaIO
pour spécifier explicitement les paramètres de type et utiliser les sérialiseurs/désérialiseurs Kafka
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1573, BEAM-2221
Dans KafkaIO, vous devez maintenant spécifier explicitement les paramètres de type clé et valeur. Exemple : KafkaIO.<Foo, Bar>read()
et KafkaIO.<Foo, Bar>write()
.
Au lieu d'utiliser Coder
pour interpréter les octets de clé et de valeur, utilisez les classes Kafka standards Serializer
et Deserializer
. Par exemple : au lieu de KafkaIO.read().withKeyCoder(StringUtf8Coder.of())
, utilisez KafkaIO.read().withKeyDeserializer(StringDeserializer.class)
, et il en va de même pour KafkaIO.write()
.
Changement de syntaxe pour BigQueryIO
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1427
Au lieu de BigQueryIO.Read.from()
et BigQueryIO.Write.to()
, utilisez BigQueryIO.read().from()
et BigQueryIO.write().to()
.
Changement de syntaxe pour KinesisIO.Read
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1428
Au lieu de KinesisIO.Read.from().using()
, utilisez KinesisIO.read().from().withClientProvider()
.
Changement de syntaxe pour TFRecordIO
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1913
Au lieu de TFRecordIO.Read.from()
et TFRecordIO.Write.to()
, utilisez TFRecordIO.read().from()
et TFRecordIO.write().to()
.
Consolidation de XmlSource
et XmlSink
sous XmlIO
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1914
Au lieu d'utiliser directement XmlSource
et XmlSink
, utilisez XmlIO
.
Exemple : au lieu de Read.from(XmlSource.from())
, utilisez XmlIO.read().from()
; au lieu de Write.to(XmlSink.writeOf())
, utilisez XmlIO.write().to()
.
Remplacement du nom CountingInput
par GenerateSequence
et généralisation
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1414
Au lieu de CountingInput.unbounded()
, utilisez GenerateSequence.from(0)
. Au lieu de CountingInput.upTo(n)
, utilisez GenerateSequence.from(0).to(n)
.
Modification de Count
, Latest
et Sample
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1417, BEAM-1421, BEAM-1423
Les classes Count.PerElement
, Count.PerKey
et Count.Globally
sont désormais privées. Vous devez donc utiliser les fonctions de fabrique telles que Count.perElement()
(là où vous pouviez précédemment utiliser new Count.PerElement()
). De plus, si vous voulez par exemple utiliser .withHotKeyFanout()
sur le résultat de la transformation, vous ne pouvez plus le faire directement sur le résultat de fonctions telles que .apply(Count.perElement())
. Au lieu de cela, Count
expose sa fonction de combinaison en tant que Count.combineFn()
et vous devez appliquer Combine.globally(Count.combineFn())
vous-même.
Des modifications similaires s'appliquent aux transformations Latest
et Sample
.
Modification de l'ordre des paramètres pour MapElements
et FlatMapElements
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1418
Lorsque vous utilisez MapElements
et FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor)
, le descripteur doit maintenant être spécifié en premier. Par exemple, FlatMapElements.into(descriptor).via(fn)
.
Modification de Window
pour la configuration de paramètres supplémentaires
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1425
Lorsque vous utilisez Window
pour configurer autre chose que WindowFn
proprement dit (Window.into()
), utilisez Window.configure()
. Par exemple : au lieu de Window.triggering(...)
, utilisez Window.configure().triggering(...)
.
Remplacement du nom Write.Bound
par Write
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1416
La classe Write.Bound
est désormais simplement appelée Write
. Cela n'a d'importance que si vous extrayiez des applications de Write.to(Sink)
vers une variable : son type était précédemment Write.Bound<...>
, il s'agit maintenant de Write<...>
.
Renommage des classes de transformation Flatten
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1419
Les classes Flatten.FlattenIterables
et Flatten.FlattenPCollectionList
sont renommées respectivement Flatten.Iterables
et Flatten.PCollections
.
Séparation de GroupByKey.create(boolean)
en deux méthodes
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1420
GroupByKey.create(boolean fewKeys)
est désormais simplement séparée en GroupByKey.create()
et GroupByKey.createWithFewKeys()
.
Modification de SortValues
Utilisateurs concernés : Tous | Impact : Erreur de compilation | Référence JIRA : BEAM-1426
Le nom des mutateurs de BufferedExternalSorter.Options
passe de setSomeProperty
à withSomeProperty
.
Dépendance supplémentaire à l'API Google
À partir de la version 2.0.0 du SDK, vous devez également activer l'API Cloud Resource Manager.
Dépendances mises à niveau
La version 2.x met à niveau les versions épinglées de la plupart des dépendances, y compris Avro, protobuf et gRPC. Certaines de ces dépendances peuvent avoir apporté leurs propres modifications destructives, ce qui peut poser problème si votre code dépend également directement de la dépendance. Vous pouvez obtenir la liste complète des versions utilisées dans la version 2.0.0 dans le fichier pom.xml
ou via mvn dependency:tree
.
Refactorisations internes
Des changements importants ont été apportés à la structure interne du SDK. Tous les utilisateurs qui s'appuyaient sur des éléments allant au-delà de l'API publique (tels que les classes ou méthodes se terminant en Internal
ou figurant dans des paquets util
) trouveront sans doute que les choses ont changé de manière significative.
Si vous utilisiez StateInternals
ou TimerInternals
, sachez que ces API internes ont été supprimées. Vous pouvez désormais utiliser les API expérimentales State
et Timer
pour DoFn
.