App Engine MapReduce から Apache Beam と Cloud Dataflow への移行

このチュートリアルは、App Engine MapReduce ユーザーを対象としています。App Engine MapReduce を使用する環境を Apache Beam と Cloud Dataflow を使用する環境に移行する方法を示します。

移行の理由

App Engine MapReduce は、大量のデータを分散させて並列処理するためのプログラミング モデルです。1 つのリクエストの範囲内で処理できない、次のような大規模で時間がかかるタスクに適しています。

  • アプリケーション ログの分析
  • 外部ソースにある関連データの集計
  • データ形式の変換
  • 外部分析するためのデータのエクスポート

ただし、App Engine MapReduce は、App Engine サービスの上に構築された、コミュニティで維持されるオープンソース ライブラリであり、Google によるサポートはありません。

一方、Cloud Dataflow は Google が完全にサポートしており、App Engine MapReduce と比較して拡張機能が提供されます。

移行事例

App Engine MapReduce から Apache Beam と Cloud Dataflow に移行することで利点が得られるいくつかの事例を次に示します。

  • SQL を使用した分析処理のために、Cloud Datastore データベース アプリケーション データを BigQuery データ ウェアハウスに保存します。
  • Cloud Datastore データセットのメンテナンスや更新のために、App Engine MapReduce の代わりに Cloud Dataflow を使用します。
  • Cloud Datastore データベースの一部を Cloud Storage にバックアップします。

Cloud Dataflow と Apache Beam とは

Cloud Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。Apache Beam は、データ処理ワークフローを定義するための SDK を提供する統合プログラミング モデルです。Apache Beam を使用すると、バッチとストリーミングの両方に対応する複雑なパイプラインを作成して Cloud Dataflow で実行できます。

Cloud Dataflow と Apache Beam を使ってみる

開始するには、選択したクイックスタートに従います。

パイプラインの作成と実行

App Engine MapReduce を使用する場合、データ処理クラスを作成し、MapReduce ライブラリを追加します。ジョブの仕様と設定が定義されたら、適切なジョブクラスで静的 start() メソッドを使用して、1 ステップでジョブを作成して開始します。

Map ジョブでは、Input クラスと Output クラス、およびマッピングを行う Map クラスを作成します。App Engine MapReduce ジョブでは、Input クラスと Output クラスを作成し、データ変換用の Mapper クラスと Reducer クラスを定義します。

Apache Beam では少し異なり、パイプラインを作成します。入力および出力コネクタを使用して、データソースおよびシンクから読み取りと書き込みを行います。事前定義されたデータ変換を使用(または独自に作成)して、データ変換を実装します。次に、コードが準備できたら、パイプラインを Cloud Dataflow サービスにデプロイします。

App Engine MapReduce ジョブの Apache Beam パイプラインへの変換

次の表に、App Engine MapReduce モデルのマップシャッフルリデュース ステップに相当する Apache Beam のステップを示します。

Java: SDK 2.x

App Engine MapReduce Apache Beam での同等のステップ
マップ MapElements<InputT,OutputT>
シャッフル GroupByKey<K,V>
リデュース Combine.GroupedValues<K,InputT,OutputT>

一般には、Combine.PerKey<K,InputT,OutputT>GroupByKeyCombineValues の代わりに使用します。

Python

App Engine MapReduce Apache Beam での同等のステップ
マップ beam.Map
シャッフル beam.GroupByKey
リデュース beam.CombineValues

一般には、beam.CombinePerKeybeam.GroupByKeybeam.CombineValues の代わりに使用します。


次のサンプルコードは、App Engine MapReduce モデルを Apache Beam に実装する方法を示しています。

Java: SDK 2.x

MinimalWordCount.java より引用:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction, String>() {
                   @Override
                   public String apply(KV input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

wordcount_minimal.py より引用:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Apache Beam と Cloud Dataflow で得られるその他の利点

App Engine MapReduce ジョブを Apache Beam パイプラインに移行すると、Apache Beam と Cloud Dataflow で提供される機能を利用できます。

Cloud Dataflow ジョブのスケジュール設定

App Engine タスクキューを十分に理解していれば、cron を使用して繰り返しジョブをスケジュールできます。このは、App Engine cron を使用して Apache Beam パイプラインをスケジュールする方法を示しています。

パイプラインの実行をスケジュールするその他の方法がいくつかあります。次のことが可能です。

Cloud Dataflow ジョブのモニタリング

App Engine MapReduce ジョブをモニタリングするには、appspot.com でホストされる URL を使用します。

Cloud Dataflow マネージド サービスを使用してパイプラインを実行する場合、Cloud Dataflow の便利なウェブベース モニタリング ユーザー インターフェースを使用してパイプラインをモニタリングできます。Stackdriver Monitoring を使用して、パイプラインに関する追加情報を取得することもできます。

読み取りと書き込み

App Engine MapReduce のリーダーとライターは、Apache Beam ではデータソースとシンク(I/O コネクタ)と呼ばれます。

Apache Beam には、Cloud Bigtable、BigQuery、Cloud Datastore、Cloud SQL などの Google Cloud サービス用の I/O コネクタが多数あります。また、Google 以外のサービス(Apache Cassandra や MongoDB など)用に Apache Beam の貢献者によって作成された I/O コネクタもあります。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。