このチュートリアルは、App Engine MapReduce ユーザーを対象としています。App Engine MapReduce を使用する環境を Apache Beam と Dataflow を使用する環境に移行する方法を示します。
App Engine MapReduce は、大量のデータを分散させて並列処理するためのプログラミング モデルです。1 つのリクエストの範囲内で処理できない、次のような大規模で時間がかかるタスクに適しています。
- アプリケーション ログの分析
- 外部ソースにある関連データの集計
- データ形式の変換
- 外部分析するためのデータのエクスポート
ただし、App Engine MapReduce は、App Engine サービスの上に構築された、コミュニティで維持されるオープンソース ライブラリであり、Google によるサポートはありません。
一方、Dataflow は Google が完全にサポートしており、App Engine MapReduce と比較して拡張機能が提供されます。
App Engine MapReduce から Apache Beam と Dataflow に移行することで利点が得られるいくつかの事例を次に示します。
- SQL を使用した分析処理のために、Datastore データベース アプリケーション データを BigQuery データ ウェアハウスに保存します。
- Datastore データセットのメンテナンスや更新のために、App Engine MapReduce の代わりに Dataflow を使用します。
- Datastore データベースの一部を Cloud Storage にバックアップします。
Dataflow と Apache Beam とは
Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。Apache Beam は、データ処理ワークフローを定義するための SDK を提供する統合プログラミング モデルです。Apache Beam を使用すると、バッチとストリーミングの両方に対応する複雑なパイプラインを作成して Dataflow で実行できます。
Dataflow と Apache Beam を使ってみる
App Engine MapReduce を使用する場合、データ処理クラスを作成し、MapReduce ライブラリを追加します。ジョブの仕様と設定が定義されたら、適切なジョブクラスで静的 start()
メソッドを使用して、1 ステップでジョブを作成して開始します。
Map ジョブでは、Input
クラスと Output
クラス、およびマッピングを行う Map
クラスを作成します。App Engine MapReduce ジョブでは、Input
クラスと Output
クラスを作成し、データ変換用の Mapper
クラスと Reducer
Apache Beam では少し異なり、パイプラインを作成します。入力および出力コネクタを使用して、データソースおよびシンクから読み取りと書き込みを行います。事前定義されたデータ変換を使用(または独自に作成)して、データ変換を実装します。次に、コードが準備できたら、パイプラインを Dataflow サービスにデプロイします。
App Engine MapReduce ジョブの Apache Beam パイプラインへの変換
次の表に、App Engine MapReduce モデルのマップ、シャッフル、リデュース ステップに相当する Apache Beam のステップを示します。
App Engine MapReduce | Apache Beam での同等のステップ |
マップ | MapElements<InputT,OutputT> |
シャッフル | GroupByKey<K,V> |
リデュース | Combine.GroupedValues<K,InputT,OutputT> |
を GroupByKey
と CombineValues
App Engine MapReduce | Apache Beam での同等のステップ |
マップ | beam.Map |
シャッフル | beam.GroupByKey |
リデュース | beam.CombineValues |
を beam.GroupByKey
と beam.CombineValues
App Engine MapReduce | Apache Beam での同等のステップ |
マップ | beam.ParDo |
シャッフル | beam.GroupByKey |
リデュース | beam.Combine |
次のサンプルコードは、App Engine MapReduce モデルを Apache Beam に実装する方法を示しています。
MinimalWordCount.java より引用:p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) // Apply a ParDo that returns a PCollection, where each element is an // individual word in Shakespeare's texts. .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) { if (!word.isEmpty()) { c.output(word); } } } })) // Apply the Count transform that returns a new PCollection of key/value pairs, // where each key represents a unique word in the text. .apply(Count.perElement()) // Apply a MapElements transform that formats our PCollection of word counts // into a printable string, suitable for writing to an output file. .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> input) { return input.getKey() + ": " + input.getValue(); } })) // Apply a write transform that writes the contents of the PCollection to a // series of text files. .apply(TextIO.write().to("wordcounts"));
wordcount_minimal.py より引用:# Read the text file[pattern] into a PCollection. lines = p | ReadFromText(known_args.input) # Count the occurrences of each word. counts = ( lines | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Format the counts into a PCollection of strings. output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned output | WriteToText(known_args.output)
minimal_wordcount.go より引用:// beam.Init() is an initialization hook that must be called on startup. beam.Init() // Create the Pipeline object and root scope. p := beam.NewPipeline() s := p.Root() lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") // Invoke a ParDo transform on our PCollection of text lines. // This ParDo invokes a DoFn (defined in-line) on each element that // tokenizes the text line into individual words. The ParDo returns a // PCollection of type string, where each element is an individual word in // Shakespeare's collected texts. words := beam.ParDo(s, func(line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { emit(word) } }, lines) // Invoke the stats.Count transform on our PCollection of // individual words. The Count transform returns a new PCollection of // key/value pairs, where each key represents a unique word in the text. // The associated value is the occurrence count for that word. counted := stats.Count(s, words) // Use a ParDo to format our PCollection of word counts into a printable // string, suitable for writing to an output file. When each element // produces exactly one element, the DoFn can simply return it. formatted := beam.ParDo(s, func(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) }, counted) // Invoke textio.Write at the end of the pipeline to write // the contents of a PCollection (in this case, our PCollection of // formatted strings) to a text file. textio.Write(s, "wordcounts.txt", formatted)
Apache Beam と Dataflow で得られるその他の利点
App Engine MapReduce ジョブを Apache Beam パイプラインに移行すると、Apache Beam と Dataflow で提供される機能を利用できます。
Cloud Dataflow ジョブのスケジュール設定
App Engine タスクキューを十分に理解していれば、cron を使用して繰り返しジョブをスケジュールできます。この例は、App Engine cron を使用して Apache Beam パイプラインをスケジュールする方法を示しています。
- Dataflow テンプレートを使用して、Cloud Storage 上のパイプラインをステージングし、さまざまな環境からそれらを実行する。
- App Engine cron サービスまたは Cloud Run 関数を使用する。
- Apache Airflow を使用します。
Cloud Dataflow ジョブのモニタリング
App Engine MapReduce ジョブをモニタリングするには、appspot.com
でホストされる URL を使用します。
Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用してパイプラインをモニタリングできます。Cloud Monitoring を使用して、パイプラインに関する追加情報を取得することもできます。
App Engine MapReduce のリーダーとライターは、Apache Beam ではデータソースとシンク(I/O コネクタ)と呼ばれます。
Apache Beam には、Bigtable、BigQuery、Datastore、Cloud SQL などの Google Cloud サービス用の I/O コネクタが多数あります。また、Google 以外のサービス(Apache Cassandra や MongoDB など)用に Apache Beam の貢献者によって作成された I/O コネクタもあります。