本教學課程以 App Engine MapReduce 使用者為目標對象,說明如何從使用 App Engine MapReduce 遷移為使用 Apache Beam 和 Dataflow。
為何要遷移?
App Engine MapReduce 是透過平行且分散的方式,處理大量資料的程式設計模型,適用於無法在單一要求範圍內處理完成且長時間執行的大量工作,例如:
- 分析應用程式記錄
- 匯總外部來源的相關資料
- 轉換資料格式
- 匯出資料進行外部分析
不過,App Engine MapReduce 是由社群維護的開放原始碼程式庫,以 App Engine 服務為建構基礎,不再受 Google 支援。
另一方面,Dataflow 享有 Google 的全面支援,而且相較於 App Engine MapReduce,可提供更多延伸功能。
遷移案例
以下是您將可因為從 App Engine MapReduce 遷移為 Apache Beam 和 Dataflow 而受惠的部分範例情境:
- 將 Datastore 資料庫應用程式資料儲存於 BigQuery 資料倉儲,以使用 SQL 進行分析處理。
- 使用 Dataflow 取代 App Engine MapReduce 來維護及/或更新 Datastore 資料集。
- 將 Datastore 資料庫備份到 Cloud Storage。
什麼是 Dataflow 和 Apache Beam?
Dataflow 是可用於執行各種資料處理模式的代管服務。Apache Beam 則是整合式的程式設計模型,可提供用於定義資料處理工作流程的 SDK。請使用 Apache Beam 建立批次和串流的複合管道,並在 Dataflow 上執行這些管道。
開始使用 Dataflow 和 Apache Beam
如要開始使用,請按照您選擇的快速入門導覽課程進行操作:
建立並執行管道
使用 App Engine MapReduce 時,您可以建立資料處理類別、新增 MapReduce 程式庫,並在工作的規格與設定定義完成後,在適當的工作類別上使用靜態的 start()
方法,透過單一步驟建立並啟動工作。
針對「對應」工作,您必須建立 Input
和 Output
類別以及執行對應的 Map
類別。針對 App Engine MapReduce 工作,您必須建立 Input
和 Output
類別,並且定義 Mapper
和 Reducer
類別進行資料轉換。
Apache Beam 的做法則稍有不同:您必須建立管道。您將使用輸入與輸出連接器,從資料來源與接收器進行讀取與寫入。您會使用預先定義的資料轉換作業 (或撰寫您的自訂作業) 實作資料轉換。接著,在程式碼可供使用時,將管道部署到 Dataflow 服務。
將 App Engine MapReduce 工作轉換為 Apache Beam 管道
下表顯示的是 Apache Beam 對應於 App Engine MapReduce 模型的對應、重組及縮減步驟。
Java
App Engine MapReduce | Apache Beam 對等項目 |
---|---|
對應 | MapElements<InputT,OutputT> |
重組 | GroupByKey<K,V> |
減少 | Combine.GroupedValues<K,InputT,OutputT> |
常見的做法是使用 Combine.PerKey<K,InputT,OutputT>
取代 GroupByKey
和 CombineValues
。
Python
App Engine MapReduce | Apache Beam 對等項目 |
---|---|
地圖 | beam.Map |
重組 | beam.GroupByKey |
減少 | beam.CombineValues |
常見的做法是使用 beam.CombinePerKey
取代 beam.GroupByKey
和 beam.CombineValues
。
Go
App Engine MapReduce | Apache Beam 對等項目 |
---|---|
地圖 | beam.ParDo |
重組 | beam.GroupByKey |
減少 | beam.Combine |
以下範例程式碼顯示如何在 Apache Beam 中實作 App Engine MapReduce 模型:
Java
從 MinimalWordCount.java 擷取:p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) // Apply a ParDo that returns a PCollection, where each element is an // individual word in Shakespeare's texts. .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) { if (!word.isEmpty()) { c.output(word); } } } })) // Apply the Count transform that returns a new PCollection of key/value pairs, // where each key represents a unique word in the text. .apply(Count.perElement()) // Apply a MapElements transform that formats our PCollection of word counts // into a printable string, suitable for writing to an output file. .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> input) { return input.getKey() + ": " + input.getValue(); } })) // Apply a write transform that writes the contents of the PCollection to a // series of text files. .apply(TextIO.write().to("wordcounts"));
Python
從 wordcount_minimal.py 擷取:# Read the text file[pattern] into a PCollection. lines = p | ReadFromText(known_args.input) # Count the occurrences of each word. counts = ( lines | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Format the counts into a PCollection of strings. output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned output | WriteToText(known_args.output)
Go
從 minimal_wordcount.go 擷取:// beam.Init() is an initialization hook that must be called on startup. beam.Init() // Create the Pipeline object and root scope. p := beam.NewPipeline() s := p.Root() lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") // Invoke a ParDo transform on our PCollection of text lines. // This ParDo invokes a DoFn (defined in-line) on each element that // tokenizes the text line into individual words. The ParDo returns a // PCollection of type string, where each element is an individual word in // Shakespeare's collected texts. words := beam.ParDo(s, func(line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { emit(word) } }, lines) // Invoke the stats.Count transform on our PCollection of // individual words. The Count transform returns a new PCollection of // key/value pairs, where each key represents a unique word in the text. // The associated value is the occurrence count for that word. counted := stats.Count(s, words) // Use a ParDo to format our PCollection of word counts into a printable // string, suitable for writing to an output file. When each element // produces exactly one element, the DoFn can simply return it. formatted := beam.ParDo(s, func(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) }, counted) // Invoke textio.Write at the end of the pipeline to write // the contents of a PCollection (in this case, our PCollection of // formatted strings) to a text file. textio.Write(s, "wordcounts.txt", formatted)
Apache Beam 和 Dataflow 的其他優勢
如果您選擇將 App Engine MapReduce 工作遷移為 Apache Beam 管道,您將享有 Apache Beam 和 Dataflow 提供的多種功能。
Cloud Dataflow 工作排程
如果熟悉 App Engine 工作佇列,您可以使用 Cron 排定週期性工作。 這個範例說明如何使用 App Engine Cron 進行 Apache Beam 管道排程。
管道執行作業有許多其他排定方式。您可以:
- 使用 Dataflow 範本將管道暫存於 Cloud Storage,並從各種環境執行這些管道。
- 使用 App Engine Cron 服務或 Cloud Run 函式。
- 使用 Apache Airflow。
監控 Cloud Dataflow 工作
如要監控 App Engine MapReduce 工作,您必須使用 appspot.com
託管的網址。
使用 Dataflow 代管服務執行管道時,您可以使用便利的 Dataflow 網路型監控使用者介面來監控管道。 您也可以使用 Cloud Monitoring 以取得管道的額外資訊。
讀取和寫入
在 Apache Beam 中,App Engine MapReduce 的讀取器和寫入器稱為資料來源和接收器,或 I/O 連接器。
Apache Beam 針對多種 Google Cloud 服務提供許多 I/O 連接器,例如 Bigtable、BigQuery、Datastore、Cloud SQL 等等。另外還有 Apache Beam 協作者針對非 Google 服務建立的 I/O 連接器,例如 Apache Cassandra 和 MongoDB。