ストリーミング データを拡充する

Apache Beam には、パイプラインに追加してすぐに使える拡充変換が用意されています。これにより、データ拡充のワークフローを簡素化できます。このページでは、Apache Beam 拡充変換を使用してストリーミング データを拡充する方法について説明します。

データを拡充すると、別のソースから関連データを追加して元データを拡張できます。追加データは、BigtableBigQuery など、さまざまなソースから取得できます。Apache Beam の拡充変換では、Key-Value ルックアップを使用して追加データを元データに接続します。

データ拡充は次のようなケースで役立ちます。

  • ウェブサイトまたはアプリからユーザーのアクティビティをキャプチャし、カスタマイズされた最適化案を提示する e コマース パイプラインを作成する場合。この変換により、アクティビティがパイプライン データに組み込まれ、カスタマイズされた推奨事項を提供できるようになります。
  • ユーザーデータを地理データと結合して、地域ベースの分析を行う場合。
  • テレメトリー イベントを送信する IoT(モノのインターネット)デバイスからデータを収集するパイプラインを作成する場合。

利点

拡充変換には次の利点があります。

  • 複雑なコードの記述や、基盤となるライブラリの管理を必要とせずにデータを変換します。
  • 組み込みのソースハンドラを提供します。
  • クライアント側のスロットリングを使用して、リクエストのレート制限を管理します。リクエストは、デフォルトの再試行戦略によって指数関数的にバックアップされます。ユースケースに合わせてレート制限を構成できます。

サポートと制限事項

拡充変換には次の要件があります。

  • バッチ パイプラインとストリーミング パイプラインで使用できます。
  • BigTableEnrichmentHandler ハンドラは、Apache Beam Python SDK バージョン 2.54.0 以降で使用できます。
  • VertexAIFeatureStoreEnrichmentHandler ハンドラは、Apache Beam Python SDK バージョン 2.55.0 以降で使用できます。
  • Apache Beam Python SDK バージョン 2.55.0 以降を使用する場合は、Redis 用の Python クライアントもインストールする必要があります。
  • Dataflow ジョブで Runner v2 を使用する必要があります。

拡充変換を使用する

拡充変換を使用するには、次のコードをパイプラインに追加します。

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

拡充変換はデフォルトでクロス結合を実行するため、入力データを拡充するようにカスタム結合を設計します。この設計により、指定したフィールドのみが結合に含まれるようになります。

次の例では、left は拡充変換の入力要素で、right はその入力要素について外部サービスから取得されたデータです。

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

パラメータ

拡充変換を使用するには、EnrichmentHandler パラメータが必要です。

構成パラメータを使用して、結合関数、タイムアウト、スロットラー、リピーター(再試行方法)の lambda 関数を指定することもできます。次の構成パラメータを使用できます。

  • join_fn: 入力として辞書を受け取り、拡充された行(Callable[[Dict[str, Any], Dict[str, Any]], beam.Row])を返す lambda 関数。拡充された行には、API から取得したデータの結合方法を指定します。デフォルトはクロス結合です。
  • timeout: API がリクエストの完了を待機する秒数。この時間が経過するとタイムアウトになります。デフォルトは 30 秒です。
  • throttler: スロットリング メカニズムを指定します。サポートされているオプションは、デフォルトのクライアントサイドのアダプティブ スロットリングのみです。
  • repeater: TooManyRequestsTimeoutException などのエラーが発生した場合の再試行方法を指定します。デフォルトは ExponentialBackOffRepeater です。

次のステップ