App Engine MapReduce から Apache Beam と Dataflow に移行する

このチュートリアルは、App Engine MapReduce ユーザーを対象としています。App Engine MapReduce を使用する環境を Apache Beam と Dataflow を使用する環境に移行する方法を示します。

移行の理由

App Engine MapReduce は、大量のデータを分散させて並列処理するためのプログラミング モデルです。1 つのリクエストの範囲内で処理できない、次のような大規模で時間がかかるタスクに適しています。

  • アプリケーション ログの分析
  • 外部ソースにある関連データの集計
  • データ形式の変換
  • 外部分析するためのデータのエクスポート

ただし、App Engine MapReduce は、App Engine サービスの上に構築された、コミュニティで維持されるオープンソース ライブラリであり、Google によるサポートはありません。

一方、Dataflow は Google が完全にサポートしており、App Engine MapReduce と比較して拡張機能が提供されます。

移行事例

App Engine MapReduce から Apache Beam と Dataflow に移行することで利点が得られるいくつかの事例を次に示します。

  • SQL を使用した分析処理のために、Datastore データベース アプリケーション データを BigQuery データ ウェアハウスに保存します。
  • Datastore データセットのメンテナンスや更新のために、App Engine MapReduce の代わりに Dataflow を使用します。
  • Datastore データベースの一部を Cloud Storage にバックアップします。

Dataflow と Apache Beam とは

Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。Apache Beam は、データ処理ワークフローを定義するための SDK を提供する統合プログラミング モデルです。Apache Beam を使用すると、バッチとストリーミングの両方に対応する複雑なパイプラインを作成して Dataflow で実行できます。

Dataflow と Apache Beam を使ってみる

開始するには、選択したクイックスタートに従います。

パイプラインの作成と実行

App Engine MapReduce を使用する場合、データ処理クラスを作成し、MapReduce ライブラリを追加します。ジョブの仕様と設定が定義されたら、適切なジョブクラスで静的 start() メソッドを使用して、1 ステップでジョブを作成して開始します。

Map ジョブでは、Input クラスと Output クラス、およびマッピングを行う Map クラスを作成します。App Engine MapReduce ジョブでは、Input クラスと Output クラスを作成し、データ変換用の Mapper クラスと Reducer クラスを定義します。

Apache Beam では少し異なり、パイプラインを作成します。入力および出力コネクタを使用して、データソースおよびシンクから読み取りと書き込みを行います。事前定義されたデータ変換を使用(または独自に作成)して、データ変換を実装します。次に、コードが準備できたら、パイプラインを Dataflow サービスにデプロイします。

App Engine MapReduce ジョブの Apache Beam パイプラインへの変換

次の表に、App Engine MapReduce モデルのマップシャッフルリデュース ステップに相当する Apache Beam のステップを示します。

Java

App Engine MapReduce Apache Beam での同等のステップ
マップ MapElements<InputT,OutputT>
シャッフル GroupByKey<K,V>
リデュース Combine.GroupedValues<K,InputT,OutputT>

一般には、Combine.PerKey<K,InputT,OutputT>GroupByKeyCombineValues の代わりに使用します。

Python

App Engine MapReduce Apache Beam での同等のステップ
マップ beam.Map
シャッフル beam.GroupByKey
リデュース beam.CombineValues

一般には、beam.CombinePerKeybeam.GroupByKeybeam.CombineValues の代わりに使用します。

Go

App Engine MapReduce Apache Beam での同等のステップ
マップ beam.ParDo
シャッフル beam.GroupByKey
リデュース beam.Combine


次のサンプルコードは、App Engine MapReduce モデルを Apache Beam に実装する方法を示しています。

Java

MinimalWordCount.java より引用:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

wordcount_minimal.py より引用:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

minimal_wordcount.go より引用:
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

Apache Beam と Dataflow で得られるその他の利点

App Engine MapReduce ジョブを Apache Beam パイプラインに移行すると、Apache Beam と Dataflow で提供される機能を利用できます。

Cloud Dataflow ジョブのスケジュール設定

App Engine タスクキューを十分に理解していれば、cron を使用して繰り返しジョブをスケジュールできます。このは、App Engine cron を使用して Apache Beam パイプラインをスケジュールする方法を示しています。

パイプラインの実行をスケジュールするその他の方法がいくつかあります。例を挙げてみます。

Cloud Dataflow ジョブのモニタリング

App Engine MapReduce ジョブをモニタリングするには、appspot.com でホストされる URL を使用します。

Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用してパイプラインをモニタリングできます。Cloud Monitoring を使用して、パイプラインに関する追加情報を取得することもできます。

読み取りと書き込み

App Engine MapReduce のリーダーとライターは、Apache Beam ではデータソースとシンク(I/O コネクタ)と呼ばれます。

Apache Beam には、Bigtable、BigQuery、Datastore、Cloud SQL などの Google Cloud サービス用の I/O コネクタが多数あります。また、Google 以外のサービス(Apache Cassandra や MongoDB など)用に Apache Beam の貢献者によって作成された I/O コネクタもあります。

次のステップ