Apache Beam 通过提供可添加到流水线的一站式丰富转换来简化数据丰富化工作流。本页面介绍如何使用 Apache Beam 丰富化转换来丰富流式数据。
丰富数据时,您可以通过添加第二个来源的相关数据来扩充一个来源的原始数据。其他数据可能来自各种来源,例如 Bigtable 或 BigQuery。Apache Beam 丰富转换使用键值对查找将其他数据连接到原始数据。
以下示例提供了数据丰富功能有用的一些场景:
- 您想要创建一个电子商务流水线,用于捕获网站或应用中的用户活动,并提供定制化建议。转换会将这些活动整合到流水线数据中,以便您提供定制建议。
- 您的用户数据要与地理数据联接以进行基于地理位置的分析。
- 您想要创建一个流水线,用于从发出遥测事件的物联网 (IOT) 设备收集数据。
优势
丰富转换具有以下优势:
- 无需编写复杂代码或管理底层库即可转换数据。
- 提供内置源代码处理程序。
- 通过
BigTableEnrichmentHandler
处理程序,使用 Bigtable 来源来丰富数据,而无需传递配置详细信息。 - 将
VertexAIFeatureStoreEnrichmentHandler
处理程序与 Vertex AI Feature Store 和 Bigtable 在线传送搭配使用。
- 通过
- 使用客户端节流来管理对请求的速率限制。请求使用默认重试策略以指数方式退避。您可以根据自己的用例配置速率限制。
支持和限制
丰富转换具有以下要求:
- 适用于批处理和流式处理流水线。
- Apache Beam Python SDK 2.54.0 版及更高版本提供了
BigTableEnrichmentHandler
处理程序。 - Apache Beam Python SDK 2.55.0 版及更高版本提供了
VertexAIFeatureStoreEnrichmentHandler
处理程序。 - 使用 Apache Beam Python SDK 2.55.0 及更高版本时,您还需要安装适用于 Redis 的 Python 客户端。
- Dataflow 作业必须使用 Runner v2。
使用丰富转换
要使用丰富转换,请在流水线中添加以下代码:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
由于丰富转换默认执行交叉联接,因此请设计自定义联接以丰富输入数据。这种设计可确保联接仅包含指定的字段。
在以下示例中,left
是丰富转换的输入元素,right
是从外部服务为该输入元素提取的数据。
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
参数
要使用丰富转换,必须提供 EnrichmentHandler
参数。
您还可以使用配置参数为联接函数、超时、节流器或重复器(重试策略)指定 lambda
函数。以下配置参数可用:
join_fn
:一个lambda
函数,该函数将字典作为输入并返回经过丰富的行 (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
)。经过丰富的行指定了如何联接从 API 中提取的数据。默认为交叉联接。timeout
:API 在超时前等待请求完成的秒数。默认为 30 秒。throttler
:指定节流机制。唯一支持的选项是默认客户端自适应节流。repeater
:指定发生TooManyRequests
和TimeoutException
等错误时的重试策略。默认值为ExponentialBackOffRepeater
。
后续步骤
- 如需查看更多示例,请参阅 Apache Beam 转换目录中的丰富转换。
- 使用 Apache Beam 和 Bigtable 来丰富数据。
- 使用 Apache Beam 和 Vertex AI Feature Store 来丰富数据。