從 App Engine MapReduce 遷移至 Apache Beam 和 Dataflow

本教學課程以 App Engine MapReduce 使用者為目標對象,說明如何從使用 App Engine MapReduce 遷移為使用 Apache Beam 和 Dataflow。

為何要遷移?

App Engine MapReduce 是透過平行且分散的方式,處理大量資料的程式設計模型,適用於無法在單一要求範圍內處理完成且長時間執行的大量工作,例如:

  • 分析應用程式記錄
  • 匯總外部來源的相關資料
  • 轉換資料格式
  • 匯出資料進行外部分析

不過,App Engine MapReduce 是由社群維護的開放原始碼程式庫,以 App Engine 服務為建構基礎,不再受 Google 支援。

另一方面,Dataflow 享有 Google 的全面支援,而且相較於 App Engine MapReduce,可提供更多延伸功能。

遷移案例

以下是您將可因為從 App Engine MapReduce 遷移為 Apache Beam 和 Dataflow 而受惠的部分範例情境:

  • 將 Datastore 資料庫應用程式資料儲存於 BigQuery 資料倉儲,以使用 SQL 進行分析處理。
  • 使用 Dataflow 取代 App Engine MapReduce 來維護及/或更新 Datastore 資料集。
  • 將 Datastore 資料庫備份到 Cloud Storage。

什麼是 Dataflow 和 Apache Beam?

Dataflow 是可用於執行各種資料處理模式的代管服務。Apache Beam 則是整合式的程式設計模型,可提供用於定義資料處理工作流程的 SDK。請使用 Apache Beam 建立批次和串流的複合管道,並在 Dataflow 上執行這些管道。

開始使用 Dataflow 和 Apache Beam

如要開始使用,請按照您選擇的快速入門導覽課程進行操作:

建立並執行管道

使用 App Engine MapReduce 時,您可以建立資料處理類別、新增 MapReduce 程式庫,並在工作的規格與設定定義完成後,在適當的工作類別上使用靜態的 start() 方法,透過單一步驟建立並啟動工作。

針對「對應」工作,您必須建立 InputOutput 類別以及執行對應的 Map 類別。針對 App Engine MapReduce 工作,您必須建立 InputOutput 類別,並且定義 MapperReducer 類別進行資料轉換。

Apache Beam 的做法則稍有不同:您必須建立管道。您將使用輸入與輸出連接器,從資料來源與接收器進行讀取與寫入。您會使用預先定義的資料轉換作業 (或撰寫您的自訂作業) 實作資料轉換。接著,在程式碼可供使用時,將管道部署到 Dataflow 服務。

將 App Engine MapReduce 工作轉換為 Apache Beam 管道

下表顯示的是 Apache Beam 對應於 App Engine MapReduce 模型的對應重組縮減步驟。

Java

App Engine MapReduce Apache Beam 對等項目
對應 MapElements<InputT,OutputT>
重組 GroupByKey<K,V>
減少 Combine.GroupedValues<K,InputT,OutputT>

常見的做法是使用 Combine.PerKey<K,InputT,OutputT> 取代 GroupByKeyCombineValues

Python

App Engine MapReduce Apache Beam 對等項目
地圖 beam.Map
重組 beam.GroupByKey
減少 beam.CombineValues

常見的做法是使用 beam.CombinePerKey 取代 beam.GroupByKeybeam.CombineValues

Go

App Engine MapReduce Apache Beam 對等項目
地圖 beam.ParDo
重組 beam.GroupByKey
減少 beam.Combine


以下範例程式碼顯示如何在 Apache Beam 中實作 App Engine MapReduce 模型:

Java

MinimalWordCount.java 擷取:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

wordcount_minimal.py 擷取:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

minimal_wordcount.go 擷取:
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

Apache Beam 和 Dataflow 的其他優勢

如果您選擇將 App Engine MapReduce 工作遷移為 Apache Beam 管道,您將享有 Apache Beam 和 Dataflow 提供的多種功能。

Cloud Dataflow 工作排程

如果熟悉 App Engine 工作佇列,您可以使用 Cron 排定週期性工作。 這個範例說明如何使用 App Engine Cron 進行 Apache Beam 管道排程。

管道執行作業有許多其他排定方式。您可以:

監控 Cloud Dataflow 工作

如要監控 App Engine MapReduce 工作,您必須使用 appspot.com 託管的網址。

使用 Dataflow 代管服務執行管道時,您可以使用便利的 Dataflow 網路型監控使用者介面來監控管道。 您也可以使用 Cloud Monitoring 以取得管道的額外資訊。

讀取和寫入

在 Apache Beam 中,App Engine MapReduce 的讀取器和寫入器稱為資料來源和接收器,或 I/O 連接器

Apache Beam 針對多種 Google Cloud 服務提供許多 I/O 連接器,例如 Bigtable、BigQuery、Datastore、Cloud SQL 等等。另外還有 Apache Beam 協作者針對非 Google 服務建立的 I/O 連接器,例如 Apache Cassandra 和 MongoDB。

相關資源