Apache Beam には、パイプラインに追加してすぐに使える拡充変換が用意されています。これにより、データ拡充のワークフローを簡素化できます。このページでは、Apache Beam 拡充変換を使用してストリーミング データを拡充する方法について説明します。
データを拡充すると、別のソースから関連データを追加して元データを拡張できます。追加データは、Bigtable や BigQuery など、さまざまなソースから取得できます。Apache Beam の拡充変換では、Key-Value ルックアップを使用して追加データを元データに接続します。
データ拡充は次のようなケースで役立ちます。
- ウェブサイトまたはアプリからユーザーのアクティビティをキャプチャし、カスタマイズされた最適化案を提示する e コマース パイプラインを作成する場合。この変換により、アクティビティがパイプライン データに組み込まれ、カスタマイズされた推奨事項を提供できるようになります。
- ユーザーデータを地理データと結合して、地域ベースの分析を行う場合。
- テレメトリー イベントを送信する IoT(モノのインターネット)デバイスからデータを収集するパイプラインを作成する場合。
利点
拡充変換には次の利点があります。
- 複雑なコードの記述や、基盤となるライブラリの管理を必要とせずにデータを変換します。
- 組み込みのソースハンドラを提供します。
BigTableEnrichmentHandler
ハンドラを使用すると、構成の詳細を渡さずに Bigtable ソースを使用してデータを拡充できます。- Vertex AI Feature Store と Bigtable オンライン サービングで
VertexAIFeatureStoreEnrichmentHandler
ハンドラを使用します。
- クライアント側のスロットリングを使用して、リクエストのレート制限を管理します。リクエストは、デフォルトの再試行戦略によって指数関数的にバックアップされます。ユースケースに合わせてレート制限を構成できます。
サポートと制限事項
拡充変換には次の要件があります。
- バッチ パイプラインとストリーミング パイプラインで使用できます。
BigTableEnrichmentHandler
ハンドラは、Apache Beam Python SDK バージョン 2.54.0 以降で使用できます。VertexAIFeatureStoreEnrichmentHandler
ハンドラは、Apache Beam Python SDK バージョン 2.55.0 以降で使用できます。- Apache Beam Python SDK バージョン 2.55.0 以降を使用する場合は、Redis 用の Python クライアントもインストールする必要があります。
- Dataflow ジョブで Runner v2 を使用する必要があります。
拡充変換を使用する
拡充変換を使用するには、次のコードをパイプラインに追加します。
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
拡充変換はデフォルトでクロス結合を実行するため、入力データを拡充するようにカスタム結合を設計します。この設計により、指定したフィールドのみが結合に含まれるようになります。
次の例では、left
は拡充変換の入力要素で、right
はその入力要素について外部サービスから取得されたデータです。
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
パラメータ
拡充変換を使用するには、EnrichmentHandler
パラメータが必要です。
構成パラメータを使用して、結合関数、タイムアウト、スロットラー、リピーター(再試行方法)の lambda
関数を指定することもできます。次の構成パラメータを使用できます。
join_fn
: 入力として辞書を受け取り、拡充された行(Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
)を返すlambda
関数。拡充された行には、API から取得したデータの結合方法を指定します。デフォルトはクロス結合です。timeout
: API がリクエストの完了を待機する秒数。この時間が経過するとタイムアウトになります。デフォルトは 30 秒です。throttler
: スロットリング メカニズムを指定します。サポートされているオプションは、デフォルトのクライアントサイドのアダプティブ スロットリングのみです。repeater
:TooManyRequests
やTimeoutException
などのエラーが発生した場合の再試行方法を指定します。デフォルトはExponentialBackOffRepeater
です。
次のステップ
- Apache Beam 変換カタログの拡充変換で、その他の例を確認する。
- Apache Beam と Bigtable を使用してデータを拡充する。
- Apache Beam と Vertex AI Feature Store を使用してデータを拡充する。