このチュートリアルは、App Engine MapReduce ユーザーを対象としています。App Engine MapReduce を使用する環境を Apache Beam と Dataflow を使用する環境に移行する方法を示します。
移行の理由
App Engine MapReduce は、大量のデータを分散させて並列処理するためのプログラミング モデルです。1 つのリクエストの範囲内で処理できない、次のような大規模で時間がかかるタスクに適しています。
- アプリケーション ログの分析
- 外部ソースにある関連データの集計
- データ形式の変換
- 外部分析するためのデータのエクスポート
ただし、App Engine MapReduce は、App Engine サービスの上に構築された、コミュニティで維持されるオープンソース ライブラリであり、Google によるサポートはありません。
一方、Dataflow は Google が完全にサポートしており、App Engine MapReduce と比較して拡張機能が提供されます。
移行事例
App Engine MapReduce から Apache Beam と Dataflow に移行することで利点が得られるいくつかの事例を次に示します。
- SQL を使用した分析処理のために、Datastore データベース アプリケーション データを BigQuery データ ウェアハウスに保存します。
- Datastore データセットのメンテナンスや更新のために、App Engine MapReduce の代わりに Dataflow を使用します。
- Datastore データベースの一部を Cloud Storage にバックアップします。
Dataflow と Apache Beam とは
Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。Apache Beam は、データ処理ワークフローを定義するための SDK を提供する統合プログラミング モデルです。Apache Beam を使用すると、バッチとストリーミングの両方に対応する複雑なパイプラインを作成して Dataflow で実行できます。
Dataflow と Apache Beam を使ってみる
開始するには、選択したクイックスタートに従います。
パイプラインの作成と実行
App Engine MapReduce を使用する場合、データ処理クラスを作成し、MapReduce ライブラリを追加します。ジョブの仕様と設定が定義されたら、適切なジョブクラスで静的 start()
メソッドを使用して、1 ステップでジョブを作成して開始します。
Map ジョブでは、Input
クラスと Output
クラス、およびマッピングを行う Map
クラスを作成します。App Engine MapReduce ジョブでは、Input
クラスと Output
クラスを作成し、データ変換用の Mapper
クラスと Reducer
クラスを定義します。
Apache Beam では少し異なり、パイプラインを作成します。入力および出力コネクタを使用して、データソースおよびシンクから読み取りと書き込みを行います。事前定義されたデータ変換を使用(または独自に作成)して、データ変換を実装します。次に、コードが準備できたら、パイプラインを Dataflow サービスにデプロイします。
App Engine MapReduce ジョブの Apache Beam パイプラインへの変換
次の表に、App Engine MapReduce モデルのマップ、シャッフル、リデュース ステップに相当する Apache Beam のステップを示します。
Java
App Engine MapReduce | Apache Beam での同等のステップ |
---|---|
マップ | MapElements<InputT,OutputT> |
シャッフル | GroupByKey<K,V> |
リデュース | Combine.GroupedValues<K,InputT,OutputT> |
一般には、Combine.PerKey<K,InputT,OutputT>
を GroupByKey
と CombineValues
の代わりに使用します。
Python
App Engine MapReduce | Apache Beam での同等のステップ |
---|---|
マップ | beam.Map |
シャッフル | beam.GroupByKey |
リデュース | beam.CombineValues |
一般には、beam.CombinePerKey
を beam.GroupByKey
と beam.CombineValues
の代わりに使用します。
Go
App Engine MapReduce | Apache Beam での同等のステップ |
---|---|
マップ | beam.ParDo |
シャッフル | beam.GroupByKey |
リデュース | beam.Combine |
次のサンプルコードは、App Engine MapReduce モデルを Apache Beam に実装する方法を示しています。
Java
MinimalWordCount.java より引用:p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) // Apply a ParDo that returns a PCollection, where each element is an // individual word in Shakespeare's texts. .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) { if (!word.isEmpty()) { c.output(word); } } } })) // Apply the Count transform that returns a new PCollection of key/value pairs, // where each key represents a unique word in the text. .apply(Count.perElement()) // Apply a MapElements transform that formats our PCollection of word counts // into a printable string, suitable for writing to an output file. .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> input) { return input.getKey() + ": " + input.getValue(); } })) // Apply a write transform that writes the contents of the PCollection to a // series of text files. .apply(TextIO.write().to("wordcounts"));
Python
wordcount_minimal.py より引用:# Read the text file[pattern] into a PCollection. lines = p | ReadFromText(known_args.input) # Count the occurrences of each word. counts = ( lines | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Format the counts into a PCollection of strings. output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned output | WriteToText(known_args.output)
Go
minimal_wordcount.go より引用:// beam.Init() is an initialization hook that must be called on startup. beam.Init() // Create the Pipeline object and root scope. p := beam.NewPipeline() s := p.Root() lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") // Invoke a ParDo transform on our PCollection of text lines. // This ParDo invokes a DoFn (defined in-line) on each element that // tokenizes the text line into individual words. The ParDo returns a // PCollection of type string, where each element is an individual word in // Shakespeare's collected texts. words := beam.ParDo(s, func(line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { emit(word) } }, lines) // Invoke the stats.Count transform on our PCollection of // individual words. The Count transform returns a new PCollection of // key/value pairs, where each key represents a unique word in the text. // The associated value is the occurrence count for that word. counted := stats.Count(s, words) // Use a ParDo to format our PCollection of word counts into a printable // string, suitable for writing to an output file. When each element // produces exactly one element, the DoFn can simply return it. formatted := beam.ParDo(s, func(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) }, counted) // Invoke textio.Write at the end of the pipeline to write // the contents of a PCollection (in this case, our PCollection of // formatted strings) to a text file. textio.Write(s, "wordcounts.txt", formatted)
Apache Beam と Dataflow で得られるその他の利点
App Engine MapReduce ジョブを Apache Beam パイプラインに移行すると、Apache Beam と Dataflow で提供される機能を利用できます。
Cloud Dataflow ジョブのスケジュール設定
App Engine タスクキューを十分に理解していれば、cron を使用して繰り返しジョブをスケジュールできます。この例は、App Engine cron を使用して Apache Beam パイプラインをスケジュールする方法を示しています。
パイプラインの実行をスケジュールするその他の方法がいくつかあります。例を挙げてみます。
- Dataflow テンプレートを使用して、Cloud Storage 上のパイプラインをステージングし、さまざまな環境からそれらを実行する。
- App Engine cron サービスまたは Cloud Run 関数を使用する。
- Apache Airflow を使用します。
Cloud Dataflow ジョブのモニタリング
App Engine MapReduce ジョブをモニタリングするには、appspot.com
でホストされる URL を使用します。
Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用してパイプラインをモニタリングできます。Cloud Monitoring を使用して、パイプラインに関する追加情報を取得することもできます。
読み取りと書き込み
App Engine MapReduce のリーダーとライターは、Apache Beam ではデータソースとシンク(I/O コネクタ)と呼ばれます。
Apache Beam には、Bigtable、BigQuery、Datastore、Cloud SQL などの Google Cloud サービス用の I/O コネクタが多数あります。また、Google 以外のサービス(Apache Cassandra や MongoDB など)用に Apache Beam の貢献者によって作成された I/O コネクタもあります。