Von App Engine MapReduce zu Apache Beam und Dataflow migrieren

Diese Anleitung richtet sich an Nutzer von Google App Engine MapReduce. Es wird gezeigt, wie Sie von App Engine MapReduce zu Apache Beam und Dataflow migrieren.

Warum migrieren?

App Engine MapReduce ist ein Programmiermodell für die parallele und verteilte Verarbeitung großer Mengen von Daten. Es ist sehr nützlich für umfangreiche und lange andauernde Aufgaben, die nicht im Rahmen einer einzelnen Anfrage verarbeitet werden können, wie z. B.:

  • Anwendungsprotokolle analysieren
  • Zugehörige Daten von externen Quellen aggregieren
  • Daten von einem Format in ein anderes umwandeln
  • Daten für externe Analyse exportieren

App Engine MapReduce ist eine von der Community gepflegte Open-Source-Bibliothek, die auf App Engine-Diensten basiert und von Google nicht mehr unterstützt wird.

Im Gegensatz dazu wird Dataflow von Google vollständig unterstützt und bietet im Vergleich zu App Engine MapReduce erweiterte Funktionen.

Migrationsfälle

Es folgen einige Beispielfälle, in denen Sie durch die Migration von App Engine MapReduce zu Apache Beam und Dataflow profitieren können:

  • Sie möchten Ihre Datastore-Datenbankanwendungsdaten in einem BigQuery Data Warehouse für die Analyseverarbeitung mit SQL speichern.
  • Verwenden Sie Dataflow als Alternative zu App Engine MapReduce für die Wartung und/oder Aktualisierung Ihres Datastore-Datasets.
  • Sichern Sie Teile Ihrer Datastore-Datenbank in Cloud Storage.

Was ist Dataflow und Apache Beam?

Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Apache Beam ist ein einheitliches Programmiermodell, das SDKs für die Definition von Workflows zur Datenverarbeitung bereitstellt. Verwenden Sie Apache Beam, um komplexe Pipelines für Batches und Streaming zu erstellen und sie in Dataflow auszuführen.

Erste Schritte mit Dataflow und Apache Beam

Richten Sie sich beim Einstieg nach der zutreffenden Schnellstartanleitung:

Pipeline erstellen und ausführen

Wenn Sie App Engine MapReduce verwenden, erstellen Sie Datenverarbeitungsklassen und fügen die MapReduce-Bibliothek hinzu. Nachdem Sie dann die Spezifikation und die Einstellungen des Jobs definiert haben, erstellen und starten Sie den Job in einem Schritt mithilfe der statischen Methode start() in der entsprechenden Jobklasse.

Für Map-Jobs erstellen Sie die Klassen Input und Output und die Klasse Map für die Zuordnung. Für App Engine MapReduce-Jobs erstellen Sie die Klassen Input und Output und definieren die Klassen Mapper und Reducer für Datentransformationen.

In Apache Beam gehen Sie etwas anders vor: Sie erstellen eine Pipeline. Sie verwenden Eingangs- und Ausgangs-Connectors zum Lesen und Schreiben aus Ihren Datenquellen und -senken. Sie verwenden vordefinierte Datentransformationen (oder schreiben eigene) zur Implementierung Ihrer Datentransformationen. Wenn Ihr Code fertig ist, stellen Sie die Pipeline für den Dataflow-Dienst bereit.

App Engine MapReduce-Jobs in Apache Beam-Pipelines konvertieren

Die folgende Tabelle enthält die Apache Beam-Entsprechungen für die Schritte map, shuffle und reduce des App Engine MapReduce-Modells.

Java

App Engine MapReduce Apache Beam-Entsprechung
Map MapElements<InputT,OutputT>
Shuffle GroupByKey<K,V>
Reduce Combine.GroupedValues<K,InputT,OutputT>

Eine gängige Praxis ist die Verwendung von Combine.PerKey<K,InputT,OutputT> anstelle von GroupByKey und CombineValues.

Python

App Engine MapReduce Apache Beam-Entsprechung
Map beam.Map
Shuffle beam.GroupByKey
Reduce beam.CombineValues

Eine gängige Praxis ist die Verwendung von beam.CombinePerKey anstelle von beam.GroupByKey und beam.CombineValues.

Go

App Engine MapReduce Apache Beam-Entsprechung
Map beam.ParDo
Shuffle beam.GroupByKey
Reduce beam.Combine


Der folgende Beispielcode zeigt die Implementierung des App Engine MapReduce-Modells in Apache Beam:

Java

Von MinimalWordCount.java:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

Von wordcount_minimal.py:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

Von minimal_wordcount.go:
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

Zusätzliche Vorteile von Apache Beam und Dataflow

Wenn Sie Ihre App Engine MapReduce-Jobs in Apache Beam-Pipelines migrieren, profitieren Sie von mehreren Features, die Apache Beam und Dataflow bieten.

Cloud Dataflow-Jobs planen

Wenn Sie mit Aufgabenwarteschlangen in App Engine vertraut sind, können Sie wiederkehrende Jobs mit Cron planen. Dieses Beispiel zeigt, wie Sie mithilfe von App Engine Cron Ihre Apache Beam-Pipelines planen können.

Es gibt mehrere weitere Methoden, die Ausführung Ihrer Pipeline zu planen. Sie können:

Cloud Dataflow-Jobs überwachen

Zum Überwachen Ihrer App Engine MapReduce-Jobs benötigen Sie eine appspot.com-gehostete URL.

Wenn Sie Ihre Pipelines mit dem verwalteten Dienst von Dataflow verwenden, können Sie die Pipelines mithilfe der webbasierten Monitoring-Benutzeroberfläche von Dataflow überwachen. Weitere Informationen zu Ihren Pipelines erhalten Sie mit Cloud Monitoring.

Lesen und Schreiben

Die Reader und Writer von App Engine MapReduce werden in Apache Beam als Datenquellen und -senken oder E/A-Connectors bezeichnet.

Apache Beam hat zahlreiche E/A-Connectors für verschiedene Google Cloud-Dienste, z. B. Bigtable, BigQuery, Datastore, Cloud SQL und andere. Es gibt auch E/A-Connectors, die von Apache Beam-Contributors für Nicht-Google-Dienste wie Apache Cassandra und MongoDB verwendet werden.

Weitere Informationen