从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow

本教程面向 App Engine MapReduce 用户,介绍了如何从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow。

为什么迁移?

App Engine MapReduce 是一种以并行分布式方式处理大量数据的编程模型。该模型适用于长时间运行且无法在单个请求范围内处理的大型任务,例如:

  • 分析应用日志
  • 汇总来自外部来源的相关数据
  • 将数据从一种格式转换为另一种格式
  • 导出数据以供外部分析

不过,App Engine MapReduce 是由社区维护的开源库,它基于 App Engine 服务而构建,并且不再受 Google 的支持。

另一方面,Dataflow 受到 Google 的全面支持,并且可以提供比 App Engine MapReduce 更多的功能。

迁移场景

从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow 有许多好处,下面列举了一些场景示例:

  • 将 Datastore 数据库应用数据存储在 BigQuery 数据仓库中,以便使用 SQL 进行分析处理。
  • 将 Dataflow 用作 App Engine MapReduce 的替代产品,以便维护和/或更新您的 Datastore 数据集。
  • 将 Datastore 数据库的某些部分备份到 Cloud Storage。

什么是 Dataflow 和 Apache Beam?

Dataflow 是一种用于执行各种数据处理模式的托管式服务。Apache Beam 是一种统一的编程模型,提供用于定义数据处理工作流的 SDK。您可以使用 Apache Beam 为批处理和流处理创建复杂的流水线,并在 Dataflow 上运行。

Dataflow 和 Apache Beam 使用入门

要开始使用,请选择相应的快速入门,并据此进行操作:

创建和运行流水线

在使用 App Engine MapReduce 时,您需要创建数据处理类并添加 MapReduce 库;此外,在定义作业的规范和设置后,您需要针对相应的作业类使用静态 start() 方法一步创建和启动作业。

对于映射作业,您需要创建 InputOutput 类以及执行映射的 Map 类。对于 App Engine MapReduce 作业,您需要创建 InputOutput 类,并定义用于数据转换的 MapperReducer 类。

使用 Apache Beam 时,需要执行的操作略有不同:您需要创建流水线。使用输入和输出连接器从数据源中读取数据并向接收器中写入数据;使用预定义的数据转换(或自行编写转换)来实现数据转换;最后,在代码准备就绪后,您便可以将流水线部署到 Dataflow 服务。

将 App Engine MapReduce 作业转换为 Apache Beam 流水线

下表显示了与 App Engine MapReduce 模型的 mapshufflereduce 步骤等效的 Apache Beam 方法。

Java

App Engine MapReduce Apache Beam 等效方法
Map MapElements<InputT,OutputT>
Shuffle GroupByKey<K,V>
Reduce Combine.GroupedValues<K,InputT,OutputT>

常见做法是使用 Combine.PerKey<K,InputT,OutputT>,而不是 GroupByKeyCombineValues

Python

App Engine MapReduce Apache Beam 等效方法
Map beam.Map
Shuffle beam.GroupByKey
Reduce beam.CombineValues

常见做法是使用 beam.CombinePerKey,而不是 beam.GroupByKeybeam.CombineValues

Go

App Engine MapReduce Apache Beam 等效方法
Map beam.ParDo
Shuffle beam.GroupByKey
Reduce beam.Combine


以下示例代码演示了如何在 Apache Beam 中实现 App Engine MapReduce 模型:

Java

以下代码取自 MinimalWordCount.java
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<string, string>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<string, long>, String>() {
                   @Override
                   public String apply(KV<string, long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));
string,>string,>string,>

Python

以下代码取自 wordcount_minimal.py
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

以下代码取自 minimal_wordcount.go
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

其他 Apache Beam 和 Dataflow 优势

如果您选择将 App Engine MapReduce 作业迁移到 Apache Beam 流水线,就可以从 Apache Beam 和 Dataflow 提供的多项功能中受益。

安排 Cloud Dataflow 作业

如果您熟悉 App Engine 任务队列,则可以使用 Cron 安排周期性作业。此示例演示了如何使用 App Engine Cron 安排 Apache Beam 流水线。

安排流水线的执行还有其他几种方式。您可以:

监控 Cloud Dataflow 作业

如需监控 App Engine MapReduce 作业,您需要依赖于 appspot.com 托管的网址。

使用 Dataflow 托管式服务执行流水线时,您可以使用便捷的 Dataflow 网页监控界面来监控流水线。您还可以使用 Cloud Monitoring 来获取有关流水线的更多信息。

读取和写入

App Engine MapReduce 的 Reader 和 Writer 在 Apache Beam 中称为数据源和接收器,或者 I/O 连接器

Apache Beam 具有多个 I/O 连接器,适用于多种 Google Cloud 服务,例如 Bigtable、BigQuery、Datastore 和 Cloud SQL 等。Apache Beam 贡献者还创建了适用于非 Google 服务(如 Apache Cassandra 和 MongoDB)的 I/O 连接器。

后续步骤