本教程面向 App Engine MapReduce 用户,介绍了如何从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow。
为什么迁移?
App Engine MapReduce 是一种以并行分布式方式处理大量数据的编程模型。该模型适用于长时间运行且无法在单个请求范围内处理的大型任务,例如:
- 分析应用日志
- 汇总来自外部来源的相关数据
- 将数据从一种格式转换为另一种格式
- 导出数据以供外部分析
不过,App Engine MapReduce 是由社区维护的开源库,它基于 App Engine 服务而构建,并且不再受 Google 的支持。
另一方面,Dataflow 受到 Google 的全面支持,并且可以提供比 App Engine MapReduce 更多的功能。
迁移场景
从 App Engine MapReduce 迁移到 Apache Beam 和 Dataflow 有许多好处,下面列举了一些场景示例:
- 将 Datastore 数据库应用数据存储在 BigQuery 数据仓库中,以便使用 SQL 进行分析处理。
- 将 Dataflow 用作 App Engine MapReduce 的替代产品,以便维护和/或更新您的 Datastore 数据集。
- 将 Datastore 数据库的某些部分备份到 Cloud Storage。
什么是 Dataflow 和 Apache Beam?
Dataflow 是一种用于执行各种数据处理模式的托管式服务。Apache Beam 是一种统一的编程模型,提供用于定义数据处理工作流的 SDK。您可以使用 Apache Beam 为批处理和流处理创建复杂的流水线,并在 Dataflow 上运行。
Dataflow 和 Apache Beam 使用入门
要开始使用,请选择相应的快速入门,并据此进行操作:
创建和运行流水线
在使用 App Engine MapReduce 时,您需要创建数据处理类并添加 MapReduce 库;此外,在定义作业的规范和设置后,您需要针对相应的作业类使用静态 start()
方法一步创建和启动作业。
对于映射作业,您需要创建 Input
和 Output
类以及执行映射的 Map
类。对于 App Engine MapReduce 作业,您需要创建 Input
和 Output
类,并定义用于数据转换的 Mapper
和 Reducer
类。
使用 Apache Beam 时,需要执行的操作略有不同:您需要创建流水线。使用输入和输出连接器从数据源中读取数据并向接收器中写入数据;使用预定义的数据转换(或自行编写转换)来实现数据转换;最后,在代码准备就绪后,您便可以将流水线部署到 Dataflow 服务。
将 App Engine MapReduce 作业转换为 Apache Beam 流水线
下表显示了与 App Engine MapReduce 模型的 map、shuffle 和 reduce 步骤等效的 Apache Beam 方法。
Java
App Engine MapReduce | Apache Beam 等效方法 |
---|---|
Map | MapElements<InputT,OutputT> |
Shuffle | GroupByKey<K,V> |
Reduce | Combine.GroupedValues<K,InputT,OutputT> |
常见做法是使用 Combine.PerKey<K,InputT,OutputT>
,而不是 GroupByKey
和 CombineValues
。
Python
App Engine MapReduce | Apache Beam 等效方法 |
---|---|
Map | beam.Map |
Shuffle | beam.GroupByKey |
Reduce | beam.CombineValues |
常见做法是使用 beam.CombinePerKey
,而不是 beam.GroupByKey
和 beam.CombineValues
。
Go
App Engine MapReduce | Apache Beam 等效方法 |
---|---|
Map | beam.ParDo |
Shuffle | beam.GroupByKey |
Reduce | beam.Combine |
以下示例代码演示了如何在 Apache Beam 中实现 App Engine MapReduce 模型:
Java
以下代码取自 MinimalWordCount.java:p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) // Apply a ParDo that returns a PCollection, where each element is an // individual word in Shakespeare's texts. .apply("ExtractWords", ParDo.of(new DoFn<string, string>() { @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) { if (!word.isEmpty()) { c.output(word); } } } })) // Apply the Count transform that returns a new PCollection of key/value pairs, // where each key represents a unique word in the text. .apply(Count.perElement()) // Apply a MapElements transform that formats our PCollection of word counts // into a printable string, suitable for writing to an output file. .apply("FormatResults", MapElements.via(new SimpleFunction string,><string, long>, String>() { @Override public String apply(KV<string, long> input) { return input.getKey() + ": " + input.getValue(); } })) // Apply a write transform that writes the contents of the PCollection to a // series of text files. .apply(TextIO.write().to("wordcounts")); string,>string,>
Python
以下代码取自 wordcount_minimal.py:# Read the text file[pattern] into a PCollection. lines = p | ReadFromText(known_args.input) # Count the occurrences of each word. counts = ( lines | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Format the counts into a PCollection of strings. output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned output | WriteToText(known_args.output)
Go
以下代码取自 minimal_wordcount.go:// beam.Init() is an initialization hook that must be called on startup. beam.Init() // Create the Pipeline object and root scope. p := beam.NewPipeline() s := p.Root() lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") // Invoke a ParDo transform on our PCollection of text lines. // This ParDo invokes a DoFn (defined in-line) on each element that // tokenizes the text line into individual words. The ParDo returns a // PCollection of type string, where each element is an individual word in // Shakespeare's collected texts. words := beam.ParDo(s, func(line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { emit(word) } }, lines) // Invoke the stats.Count transform on our PCollection of // individual words. The Count transform returns a new PCollection of // key/value pairs, where each key represents a unique word in the text. // The associated value is the occurrence count for that word. counted := stats.Count(s, words) // Use a ParDo to format our PCollection of word counts into a printable // string, suitable for writing to an output file. When each element // produces exactly one element, the DoFn can simply return it. formatted := beam.ParDo(s, func(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) }, counted) // Invoke textio.Write at the end of the pipeline to write // the contents of a PCollection (in this case, our PCollection of // formatted strings) to a text file. textio.Write(s, "wordcounts.txt", formatted)
其他 Apache Beam 和 Dataflow 优势
如果您选择将 App Engine MapReduce 作业迁移到 Apache Beam 流水线,就可以从 Apache Beam 和 Dataflow 提供的多项功能中受益。
安排 Cloud Dataflow 作业
如果您熟悉 App Engine 任务队列,则可以使用 Cron 安排周期性作业。此示例演示了如何使用 App Engine Cron 安排 Apache Beam 流水线。
安排流水线的执行还有其他几种方式。您可以:
- 使用 Dataflow 模板将流水线暂存在 Cloud Storage 上,然后从各种环境中执行这些流水线。
- 使用 App Engine Cron 服务或 Cloud Functions。
- 使用 Apache Airflow。
监控 Cloud Dataflow 作业
如需监控 App Engine MapReduce 作业,您需要依赖于 appspot.com
托管的网址。
使用 Dataflow 托管式服务执行流水线时,您可以使用便捷的 Dataflow 网页监控界面来监控流水线。您还可以使用 Cloud Monitoring 来获取有关流水线的更多信息。
读取和写入
App Engine MapReduce 的 Reader 和 Writer 在 Apache Beam 中称为数据源和接收器,或者 I/O 连接器。
Apache Beam 具有多个 I/O 连接器,适用于多种 Google Cloud 服务,例如 Bigtable、BigQuery、Datastore 和 Cloud SQL 等。Apache Beam 贡献者还创建了适用于非 Google 服务(如 Apache Cassandra 和 MongoDB)的 I/O 连接器。