Migrer de MapReduce App Engine vers Apache Beam et Dataflow

Ce tutoriel est destiné aux utilisateurs de MapReduce App Engine. Il montre comment migrer de MapReduce App Engine vers Apache Beam et Dataflow.

Pourquoi migrer ?

MapReduce App Engine est un modèle de programmation qui permet de traiter de grands volumes de données de façon parallèle et distribuée. Il est utile pour exécuter des tâches importantes de longue durée ne pouvant pas être traitées en une seule requête, comme :

  • l'analyse de journaux d'application ;
  • l'agrégation de données connexes provenant de sources externes ;
  • la conversion de formats de données ;
  • Exportation de données à des fins d'analyse externe

Sachez toutefois que MapReduce App Engine est une bibliothèque Open Source gérée par la communauté. Elle repose sur les services App Engine et n'est plus couverte par Google.

À l'inverse, Dataflow est entièrement couvert par l'assistance Google et propose des fonctionnalités plus étendues que MapReduce App Engine.

Cas de migration

Vous trouverez ci-dessous des exemples de situations dans lesquelles il peut être intéressant de migrer de MapReduce App Engine vers Apache Beam et Dataflow :

  • Stocker vos données d'application de base de données Datastore dans un entrepôt de données BigQuery pour effectuer des analyses à l'aide de SQL
  • Exploiter Dataflow au lieu de MapReduce App Engine pour gérer et/ou mettre à jour votre ensemble de données Datastore
  • Sauvegarder des parties de votre base de données Datastore dans Cloud Storage

Qu'est-ce que Dataflow et Apache Beam ?

Dataflow est un service géré permettant d'exécuter une grande variété de schémas de traitement des données. Apache Beam est un modèle de programmation unifié qui propose des SDK servant à définir des workflows de traitement des données. Vous pouvez donc créer des pipelines complexes de traitement par lot et par flux à l'aide d'Apache Beam, puis les exécuter sur Dataflow.

Premiers pas avec Dataflow et Apache Beam

Pour débuter avec Cloud Dataflow et Apache Beam, suivez le guide de démarrage rapide de votre choix :

Créer et exécuter un pipeline

Lorsque vous utilisez MapReduce App Engine, vous créez des classes de traitement de données et ajoutez la bibliothèque MapReduce. Ensuite, une fois la spécification et les paramètres de la tâche définis, vous créez et démarrez celle-ci en une seule étape en employant la méthode start() statique sur la classe de tâche appropriée.

Pour exécuter des tâches Map, vous devez créer les classes Input et Output, ainsi que la classe Map qui se charge du mappage. Dans le cas des tâches MapReduce App Engine, vous devez créer les classes Input et Output, puis définir les classes Mapper et Reducer pour les transformations de données.

Apache Beam fonctionne toutefois différemment et nécessite la création d'un pipeline. Lors de l'utilisation du service, vous exploitez des connecteurs d'entrée et de sortie pour lire et écrire des données à partir de vos sources et récepteurs de données. Vous mettez ensuite en œuvre vos transformations de données à l'aide de transformations prédéfinies ou personnalisées. Et lorsque votre code est prêt, vous pouvez déployer votre pipeline sur le service Dataflow.

Convertir des tâches MapReduce App Engine en pipelines Apache Beam

Le tableau suivant présente les équivalents Apache Beam des étapes de mappage, de brassage et de réduction du modèle MapReduce App Engine.

Java

MapReduce App Engine Équivalent Apache Beam
Map MapElements<InputT,OutputT>
Shuffle GroupByKey<K,V>
Reduce Combine.GroupedValues<K,InputT,OutputT>

Une pratique courante consiste à utiliser Combine.PerKey<K,InputT,OutputT> plutôt que GroupByKey et CombineValues.

Python

MapReduce App Engine Équivalent Apache Beam
Map beam.Map
Shuffle beam.GroupByKey
Reduce beam.CombineValues

Une pratique courante consiste à utiliser beam.CombinePerKey plutôt que beam.GroupByKey et beam.CombineValues.

Go

MapReduce App Engine Équivalent Apache Beam
Map beam.ParDo
Shuffle beam.GroupByKey
Reduce beam.Combine


L'exemple de code suivant décrit comment mettre en œuvre le modèle MapReduce App Engine dans Apache Beam :

Java

Code issu du fichier MinimalWordCount.java :
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

Code issu du fichier wordcount_minimal.py :
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

Code issu du fichier minimal_wordcount.go :
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

Avantages supplémentaires d'Apache Beam et de Dataflow

Si vous choisissez de migrer vos tâches MapReduce App Engine vers des pipelines Apache Beam, vous bénéficierez de plusieurs fonctionnalités qu'Apache Beam et Dataflow peuvent vous offrir.

Programmer des tâches Cloud Dataflow

Si vous maîtrisez déjà les files d'attente de tâches d'App Engine, vous pouvez programmer vos tâches récurrentes à l'aide de Cron. Consultez cet exemple pour découvrir comment planifier des pipelines Apache Beam à l'aide du service Cron d'App Engine.

D'autres méthodes vous permettent également de planifier l'exécution de votre pipeline, Vous pouvez :

Surveiller les tâches Cloud Dataflow

Pour surveiller vos tâches MapReduce App Engine, vous dépendez d'une URL hébergée sur appspot.com.

Lorsque vous exécutez vos pipelines à l'aide du service géré Dataflow, vous pouvez les surveiller à l'aide de l'interface utilisateur Web de surveillance de Dataflow. Vous pouvez également obtenir des informations supplémentaires sur vos pipelines à l'aide de Cloud Monitoring.

Lecture et écriture

Dans Apache Beam, les lecteurs et rédacteurs de MapReduce App Engine sont appelés sources et récepteurs de données, ou connecteurs d'E/S.

Apache Beam dispose de nombreux connecteurs d'E/S pour plusieurs services Google Cloud tels que Bigtable, BigQuery, Datastore, Cloud SQL, etc. Des contributeurs Apache Beam ont également créé d'autres connecteurs d'E/S pour des services autres que Google, tels qu'Apache Cassandra et MongoDB.

Étapes suivantes