从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow

本教程面向 App Engine MapReduce 用户,介绍了如何从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow。

为什么迁移?

App Engine MapReduce 是一种以并行分布式方式处理大量数据的编程模型。该模型适用于长时间运行且无法在单个请求范围内处理的大型任务,例如:

  • 分析应用日志
  • 汇总来自外部来源的相关数据
  • 将数据从一种格式转换为另一种格式
  • 导出数据以供外部分析

不过,App Engine MapReduce 是由社区维护的开源库,它基于 App Engine 服务而构建,并且不再受 Google 的支持。

另一方面,Dataflow 受到 Google 的全面支持,并且可以提供比 App Engine MapReduce 更多的功能。

迁移场景

从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow 有许多好处,下面列举了一些场景示例:

  • 将 Datastore 数据库应用数据存储在 BigQuery 数据仓库中,以便使用 SQL 进行分析处理。
  • 将 Dataflow 用作 App Engine MapReduce 的替代产品,以便维护和/或更新您的 Datastore 数据集。
  • 将 Datastore 数据库的某些部分备份到 Cloud Storage。

什么是 Dataflow 和 Apache Beam?

Dataflow 是一种用于执行各种数据处理模式的托管式服务。Apache Beam 是一种统一的编程模型,提供用于定义数据处理工作流的 SDK。您可以使用 Apache Beam 为批处理和流处理创建复杂的流水线,并在 Dataflow 上运行。

Dataflow 和 Apache Beam 使用入门

要开始使用,请选择相应的快速入门,并据此进行操作:

创建和运行流水线

在使用 App Engine MapReduce 时,您需要创建数据处理类并添加 MapReduce 库;此外,在定义作业的规范和设置后,您需要针对相应的作业类使用静态 start() 方法一步创建和启动作业。

对于映射作业,您需要创建 InputOutput 类以及执行映射的 Map 类。对于 App Engine MapReduce 作业,您需要创建 InputOutput 类,并定义用于数据转换的 MapperReducer 类。

使用 Apache Beam 时,需要执行的操作略有不同:您需要创建流水线。使用输入和输出连接器从数据源中读取数据并向接收器中写入数据;使用预定义的数据转换(或自行编写转换)来实现数据转换;最后,在代码准备就绪后,您便可以将流水线部署到 Dataflow 服务。

将 App Engine MapReduce 作业转换为 Apache Beam 流水线

下表显示了与 App Engine MapReduce 模型的 mapshufflereduce 步骤等效的 Apache Beam 方法。

Java

App Engine MapReduce Apache Beam 等效方法
Map MapElements<InputT,OutputT>
Shuffle GroupByKey<K,V>
Reduce Combine.GroupedValues<K,InputT,OutputT>

常见做法是使用 Combine.PerKey<K,InputT,OutputT>,而不是 GroupByKeyCombineValues

Python

App Engine MapReduce Apache Beam 等效方法
Map beam.Map
Shuffle beam.GroupByKey
Reduce beam.CombineValues

常见做法是使用 beam.CombinePerKey,而不是 beam.GroupByKeybeam.CombineValues


以下示例代码演示了如何在 Apache Beam 中实现 App Engine MapReduce 模型:

Java

以下代码取自 MinimalWordCount.java
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction, String>() {
                   @Override
                   public String apply(KV input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

以下代码取自 wordcount_minimal.py
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

其他 Apache Beam 和 Dataflow 优势

如果您选择将 App Engine MapReduce 作业迁移到 Apache Beam 流水线,就可以从 Apache Beam 和 Dataflow 提供的多项功能中受益。

安排 Cloud Dataflow 作业

如果您熟悉 App Engine 任务队列,则可以使用 Cron 安排周期性作业。此示例演示了如何使用 App Engine Cron 安排 Apache Beam 流水线。

安排流水线的执行还有其他几种方式。您可以:

监控 Cloud Dataflow 作业

如需监控 App Engine MapReduce 作业,您需要依赖于 appspot.com 托管的网址。

使用 Dataflow 托管式服务执行流水线时,您可以使用便捷的 Dataflow 网页监控界面来监控流水线。您还可以使用 Cloud Monitoring 来获取有关流水线的更多信息。

读取和写入

App Engine MapReduce 的 Reader 和 Writer 在 Apache Beam 中称为数据源和接收器,或者 I/O 连接器

Apache Beam 具有多个 I/O 连接器,适用于多种 Google Cloud 服务,例如 Bigtable、BigQuery、Datastore 和 Cloud SQL 等。Apache Beam 贡献者还创建了适用于非 Google 服务(如 Apache Cassandra 和 MongoDB)的 I/O 连接器。

后续步骤