Apache Beam 通过提供可添加到流水线的一站式丰富转换,来简化数据丰富工作流。本页面介绍如何使用 Apache Beam 丰富转换来丰富流式数据。
丰富数据时,您可以通过添加来自第二个来源的相关数据来增强来自一个源的原始数据。额外数据可以来自各种源,例如 Bigtable 或 BigQuery。Apache Beam 丰富转换使用键值对查找将额外数据与原始数据相关联。
以下示例提供了数据丰富有用的一些情况:
- 您想要创建电子商务流水线,用于捕获网站或应用中的用户活动并提供个性化推荐。转换会将活动整合到流水线数据中,以便您可以提供个性化推荐。
- 您有用户数据,想要将其与地理位置数据联接,以便进行基于地理位置的分析。
- 您想要创建流水线,用于从发出遥测事件的物联网 (IOT) 设备收集数据。
优势
丰富转换具有以下优势:
- 无需编写复杂代码或管理底层库即可转换数据。
- 提供内置源代码处理程序。
- 通过
BigTableEnrichmentHandler
处理程序,使用 Bigtable 来源来丰富数据,而无需传递配置详细信息。 - 通过
BigQueryEnrichmentHandler
处理程序,使用 BigQuery 来源来丰富数据,而无需传递配置详细信息。 - 将
VertexAIFeatureStoreEnrichmentHandler
处理程序与 Vertex AI Feature Store 和 Bigtable 在线传送搭配使用。
- 通过
- 使用客户端节流来管理对请求的速率限制。系统会使用默认重试策略以指数退避算法来延迟请求。您可以根据使用场景配置速率限制。
支持和限制
丰富转换具有以下要求:
- 适用于批处理和流式处理流水线。
- Apache Beam Python SDK 2.54.0 版及更高版本提供了
BigTableEnrichmentHandler
处理程序。 - Apache Beam Python SDK 2.57.0 版及更高版本提供了
BigQueryEnrichmentHandler
处理程序。 - Apache Beam Python SDK 2.55.0 版及更高版本提供了
VertexAIFeatureStoreEnrichmentHandler
处理程序。 - 使用 Apache Beam Python SDK 2.55.0 及更高版本时,您还需要安装适用于 Redis 的 Python 客户端。
- Dataflow 作业必须使用 Runner v2。
使用丰富转换
如需使用丰富转换,请在流水线中添加以下代码:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
由于丰富转换默认执行交叉联接,因此请设计自定义联接以丰富输入数据。这种设计可确保联接仅包含指定的字段。
在以下示例中,left
是丰富转换的输入元素,right
是针对该输入元素从外部服务提取的数据。
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
参数
要使用丰富转换,必须提供 EnrichmentHandler
参数。
您还可以使用配置参数为联接函数、超时、节流器或重复器(重试策略)指定 lambda
函数。以下配置参数可用:
join_fn
:一个lambda
函数,该函数将字典作为输入并返回经过丰富的行 (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
)。经过丰富的行指定了如何联接从 API 中提取的数据。默认为交叉联接。timeout
:在超时之前等待 API 完成请求所需的秒数。默认为 30 秒。throttler
:指定节流机制。唯一支持的选项是默认客户端自适应节流。repeater
:指定发生TooManyRequests
和TimeoutException
等错误时的重试策略。默认值为ExponentialBackOffRepeater
。
后续步骤
- 如需查看更多示例,请参阅 Apache Beam 转换目录中的丰富转换。
- 使用 Apache Beam 和 Bigtable 来丰富数据。
- 使用 Apache Beam 和 BigQuery 来丰富数据。
- 使用 Apache Beam 和 Vertex AI Feature Store 来丰富数据。