完全な例

GitHub の Complete Examples ディレクトリ(master-1.x ブランチ)に、サンプル パイプラインが数多く用意されています。これらのパイプラインは、サンプル シナリオを使用して一般的なエンドツーエンドのパイプラインのパターンを示しています。各サンプル シナリオは、トラフィック パターン データの解析、Twitter のハッシュタグの解析、Wikipedia の編集データの解析など、現実的なデータ処理ドメインをヒントに作成されています。このドキュメントでは、それぞれの例について簡単に説明し、ソースコードへのリンクを示します。

初めてのユーザーは、WordCount サンプルのチュートリアルから始めることをおすすめします。このサンプルは、入力されたテキスト ファイルに対して実行され、入力テキスト中に現れる各単語の個数を計算します。 WordCount では、非常に単純な例を通して Dataflow の主要な概念を理解できます。しかし、ここではより現実的なパイプラインを示します。

重要なキーワード

このドキュメントでは、以下の用語を使用します。

  • パイプライン - パイプラインは、データ処理ジョブを表すために作成するコードです。Dataflow はパイプラインのコードを取得し、それを使用してジョブを構築します。
  • PCollection - PCollection は、Dataflow SDK によって提供される特殊クラスであり、入力されたデータセットを表します。
  • 変換 - Dataflow パイプラインで、変換はステップ(データを変換する処理オペレーション)を表します。
  • 制限付き PCollection と制限なし PCollection - PCollection のサイズは、制限付きまたは制限なしのいずれかです。PCollection は、固定されたデータセットを表す場合は制限付きであり、継続的に更新されるデータセットを表す場合は制限なしです。
  • バッチモード - パイプラインが「バッチモードで実行」される場合、その入力は有限です。
  • ストリーミング モード - パイプラインが「ストリーミング モードで実行」される場合、その入力は無限です。
  • ウィンドウ処理 - ウィンドウ処理は、Dataflow SDK で PCollection を個々の要素のタイムスタンプに従って細分化するために使用されるコンセプトです。

AutoComplete

Java

AutoComplete パイプラインは、入力中の単語のすべての接頭辞に対する最も一般的なハッシュタグを計算します。AutoComplete パイプラインの結果を使用してオートコンプリートを実現できます。 AutoComplete は、入力として単語を受け取り、入力中の各単語の各接頭辞に対して最も一般的な候補を計算します。

AutoComplete は、(バッチモードで)有限データまたは(ストリーミング モードで)無限データに対して実行することができます。有限データを使用する場合、入力はテキスト ファイル内の単語のリストです。ストリーミング モードでは、Google Cloud Pub/Sub トピックから継続的に単語が流れ込みます。

AutoComplete パイプラインは、入力文字列に一連の変換を適用して先頭にハッシュタグが付いた単語を抽出し、単語の接頭辞を判定して、各接頭辞に対する最上位の候補を計算します。パイプラインの出力は、最上位の候補データです。

StreamingWordExtract

Java

StreamingWordExtract パイプラインは、ストリーミング データの操作方法を示す小規模なパイプラインの例です。このパイプラインは、Cloud Pub/Sub からテキスト ストリーミングの行を読み取り、各行を個々の単語にトークン化し、単語を大文字にします。その後、大文字にした単語を BigQuery テーブルの行としてフォーマットし、BigQuery テーブルに対するストリーミング書き込みを行います。

TfIdf

Java

TfIdf は、用語頻度 - 逆文書頻度(Term frequency - Inverse document frequency)の略であり、1 つまたは複数のドキュメントにおける単語の重要度を算出します。

Tf-idf パイプラインは、ディレクトリまたは Google Cloud Storage から一連のドキュメントを読み取り、各単語の tf-idf ランクのコンポーネントを算出する一連の変換を適用します。tf-idf 計算のコンポーネントの 1 つは、単語が出現するドキュメントの数をドキュメントの総数で割った idf パート(用語が出現するドキュメントの頻度)です。ドキュメントの総数は変更されない値であるため、副入力としてパイプラインに注入されます。副入力は、パイプラインが入力の PCollection 内の要素を処理するたびにアクセスできる追加の入力です。

Tf-idf は、すべての単語からそのランクへのマッピングを、各単語が出現するドキュメントに出力します。

TopWikipediaSessions

Java

TopWikipediaSessions は、Google Cloud Storage から読み取った Wikipedia の編集データを処理するバッチ パイプラインです。1 つのセッションにおける Wikipedia の編集シーケンスが最も長かったユーザーを見つけます。セッションは、1 時間未満の間隔で分割された編集のシーケンスとして定義されます。

たとえば、ユーザー A が Wikipedia の編集を 30 分間隔で 5 回行ったとします。翌日、ユーザー A は再度 Wikipedia を編集しました。このユーザーの最長セッションは、(6 回ではなく)5 回の編集で構成されます。ユーザー B は Wikipedia を 20 回編集しましたが、2 時間ごとに編集したため、このユーザーの最長セッションは 1 回の編集で構成されます。したがって、ユーザー A が 1 つのセッションにおける編集シーケンスが最も長かったユーザーになります。

この例では、ウィンドウ処理を使用して時間ベースのデータ集計を実行しています。1 時間のウィンドウ期間を使用してセッションを定義し、各ユーザー セッションの編集回数を計算します。

このパイプラインは、その出力をフォーマットされた文字列として Google Cloud Storage 内のテキスト ファイルに書き込みます。

TrafficMaxLaneFlow

Java

TrafficMaxLaneFlow パイプラインは、トラフィック センサーからのデータを解析します。このパイプラインは、バッチモードとストリーミング モードのどちらでも実行できます。バッチモードでは、入力ファイルからトラフィック センサーのデータを読み取ります。ストリーミング モードでは、Cloud Pub/Sub トピックから継続的にデータが流れ込みます。

TrafficMaxLaneFlow は、ウィンドウ処理(具体的にはスライディング タイム ウィンドウ)を使用して入力データ ストリームを解析します。スライディング タイム ウィンドウでは、データ ストリーム内の時間間隔を使用して、ウィンドウが重複するデータのバンドルを定義します。

TrafficMaxLaneFlow は、カスタムの Combine 変換を使用してレーン情報を抽出し、ウィンドウごとに特定のステーションに関して検出される最大のレーンフローを算出します。この結合は単純な最大結合ではなく、フローの値とともに追加情報を保持する必要があるため、カスタムの Combine 変換が必要になります。

このパイプラインは、最大値を補足情報とともにフォーマットし、BigQuery テーブルに書き込みます。

TrafficRoutes

Java

TrafficRoutes は、トラフィック センサーからのデータを解析し、事前に定義された「ルート」の小規模なセットの平均速度を算出し、それらのルートの「減速」を検出します。

TrafficRoutes パイプラインは、(バッチモードで)有限データまたは(ストリーミング モードで)無限データに対して実行されます。バッチモードでは、テキスト ファイルからトラフィック センサーのデータを読み取ります。ストリーミング モードでは、Cloud Pub/Sub トピックからデータを読み取ります。

このパイプラインは、ウィンドウ処理(具体的にはスライディング タイム ウィンドウ)を使用してデータ ストリームを解析します。スライディング タイム ウィンドウでは、データ ストリーム内の時間間隔を使用して、ウィンドウが重複するデータのバンドルを定義します。TrafficRoutes のデフォルトのウィンドウ期間は 3 分、デフォルトのウィンドウ間隔は 1 分です。したがって、各ウィンドウは前のウィンドウが開始してから 1 分後に開始し、3 分間のデータサンプルを含んでいます。各ウィンドウでは、事前に定義された「ルート」のセットの平均速度を算出し、「減速」を検出します。「減速」は、スライディング ウィンドウ内のほとんどの速度が前のウィンドウの読み取り速度より低い場合に発生します。

このパイプラインは、結果をフォーマットして BigQuery に書き込みます。

estimate_pi

Java

このサンプル パイプラインは、Dataflow SDK for Java では利用できません。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。