Java タスクパターン

e コマース サンプル アプリケーションは、Dataflow を使用してストリーミング データの分析とリアルタイム AI を実装するためのベスト プラクティスを示しています。この例には、Java プログラミング タスクの最適な実行方法を示すタスクパターンが含まれています。一般に、これらのタスクは e コマース アプリケーションを作成する際に必要になります。

このアプリケーションには、次の Java タスクパターンが含まれます。

Apache Beam スキーマを使用して構造化データを処理する

Apache Beam スキーマを使用すると、構造化データの処理が容易になります。

オブジェクトをに変換すると、非常にクリーンな Java コードが生成されます。これにより、有向非巡回グラフ(DAG)の作成を構築できます。メソッドを呼び出すことなく、作成したアナリティクス ステートメント内のフィールドとしてオブジェクト プロパティを参照することもできます。

CountViewsPerProduct.java

JsonToRow を使用して JSON データを変換する

一般に、Dataflow では JSON 文字列の処理が必要になります。たとえば、JSON 文字列はウェブ アプリケーションからキャプチャされたクリックストリーム情報をストリーミングするときに処理されます。JSON 文字列は、パイプラインの処理中にまたは Plain Old Java Object(POJO)に変換する必要があります。

Apache Beam の組み込み変換 JsonToRow を使用して、JSON 文字列を行に変換できます。失敗したメッセージを処理するキューが必要な場合は、個別にビルドする必要があります。詳しくは、処理不能データをキューイングして分析を強化するをご覧ください。

AutoValue を使用して JSON 文字列を POJO に変換する必要がある場合は、@DefaultSchema(AutoValueSchema.class) アノテーションを使用して型のスキーマを登録し、Convert ユーティリティ クラスを使用します。結果のコードは以下のようになります。

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

スキーマを推測可能な Java 型の詳細については、スキーマの作成をご覧ください。

JsonToRow がデータを処理できない場合、Gson が代替になります。Gson のデフォルトのデータ処理はかなりシンプルなため、データ変換プロセスの検証がさらに必要になる場合があります。

AutoValue コード ジェネレーターを使用して POJO を生成する

Apache Beam スキーマは、構造化データを操作することができるため、多くの場合、パイプライン内のオブジェクトを表す最適な方法です。ただし、Key-Value オブジェクトの処理やオブジェクトの状態の処理など、Plain Old Java Object(POJO)が必要になる場合もあります。POJO を手動で作成するには、equals() メソッドと hashcode() メソッドに対してオーバーライドをコーディングする必要があります。この作業には時間がかかり、エラーが発生しやすくなります。オーバーライドが適切でないと、アプリケーションの動作の不整合や、データ損失につながる可能性があります。

POJO を生成するには、AutoValue クラスビルダーを使用します。このオプションにより、必要なオーバーライドが使用され、潜在的なエラーを回避できます。 AutoValue は Apache Beam のコードベースで頻繁に使用されるため、このクラスビルダーに慣れておくと、Java を使用して Dataflow で Apache Beam パイプラインを開発する場合に便利です。

@DefaultSchema(AutoValueSchema.class) アノテーションを追加すると、Apache Beam スキーマでも AutoValue が可能です。詳細については、スキーマの作成をご覧ください。

AutoValue の詳細については、Why AutoValue?AutoValue のドキュメントをご覧ください。

Clickstream.java

詳しく分析するために処理不能なデータをキューに入れる

本番環境システムでは、問題のあるデータの処理が重要になります。可能であれば、インストリーム データの検証と修正を行います。修正が不可能な場合は、後で分析するために、値を未処理のメッセージ キュー(デッドレター キュー)に記録します。問題がよく発生するのはデータを別の形式に変換する場合です(たとえば、JSON 文字列をに変換する場合など)。

この問題に対処するため、マルチ出力変換を使用して、未処理のデータが含まれる要素を別の PCollection に送り、さらに分析を行います。この処理は、パイプライン内の多くの場所で使用する可能性がある一般的なオペレーションです。複数の場所で使用できるように、変換を汎用化してみましょう。まず、元のオブジェクトを含む共通のプロパティをラップするエラー オブジェクトを作成します。次に、エクスポート先に複数のオプションを持つシンク変換を作成します。

データ検証変換を順次適用する

外部システムから収集したデータは、多くの場合クリーニングを必要とします。問題のあるインストリーム データを可能な限り修正できるように、パイプラインを構造化します。必要に応じて、データを詳細な分析のためにキューに送信します。

1 つのメッセージに対して、修正が必要な複数の問題が生じる可能性があるため、必要な有向非巡回グラフ(DAG)について計画を立てます。要素に複数の問題があるデータが含まれている場合は、要素が適切な変換を通過するようにする必要があります。

たとえば、次の値を持つ要素を考えます。どちらも null にすることはできません。

{"itemA": null,"itemB": null}

次のどちらの潜在的な問題にも対処する変換を介して要素が流れるようにする必要があります。

badElements.apply(fixItemA).apply(fixItemB)

パイプラインに連続したステップがさらに存在する可能性もあります。しかし、fusion により、その処理オーバーヘッドは最小限に抑えられます。

ValidateAndCorrectCSEvt.java

DoFn.StartBundle を使用して、外部サービスへのマイクロバッチ呼び出しを行う

パイプラインの一部として外部 API の呼び出しが必要になることがあります。パイプラインは多くのコンピューティング リソースに処理を分散するため、システムを通過する要素ごとに、1 回の呼び出しで外部サービス エンドポイントに過剰な負荷がかかる可能性があります。この問題は、削減関数を適用していない場合に特によく発生します。

この問題を回避するには、外部システムに対してバッチ呼び出しを行います。

GroupByKey 変換または Apache Beam タイマー API を使用して、呼び出しをバッチ処理できます。ただし、どちらの方法でもシャッフルが必要で、処理のオーバーヘッドが発生するとともに、キー空間を特定するためにマジック ナンバーを使用する必要が生じます。

代わりに、StartBundleFinishBundle ライフサイクル要素を使用してデータをバッチ処理します。これらのオプションではシャッフルは不要です。

このオプションの軽微な問題点としては、バンドルのサイズが、パイプラインとそのワーカー内で現在発生している処理に基づいてランナーの実装によって動的に決定されるということがあります。ストリーム モードでは、バンドルのサイズが小さくなることがよくあります。Dataflow バンドルは、シャーディングの使用量、特定のキーで使用できるデータ、パイプラインのスループットなどのバックエンド要因の影響を受けます。

EventItemCorrectionService.java

データ拡充に適切な副入力パターンを使用する

ストリーミング分析アプリケーションでは、さらなる分析に役立つ可能性のある追加情報によってデータが拡充されることがよくあります。たとえば、トランザクションのストア ID がある場合は、店舗の場所に関する情報を追加したいと考えるかもしれません。多くの場合、この要素の追加要素を取得するには、ルックアップ テーブルから情報を取得します。

ゆっくり変化し、小さいサイズのルックアップ テーブルでは、そのテーブルを Map<K,V> インターフェースを実装するシングルトン クラスとしてパイプラインに取り込むとうまく機能します。これにより、各要素がルックアップの API 呼び出しを行うことを回避できます。パイプラインにテーブルのコピーを含めた場合は、それを定期的に更新して最新の状態に保つ必要があります。

更新の遅い副入力を処理するには、Apache Beam 副入力パターンを使用します。

キャッシュ

副入力はメモリに読み込まれるため、自動的にキャッシュに保存されます。

キャッシュのサイズを設定するには、--setWorkerCacheMb オプションを使用します。

キャッシュを DoFn インスタンス間で共有し、外部トリガーを使用してキャッシュを更新できます。

SlowMovingStoreLocationDimension.java