丰富流式数据

Apache Beam 通过提供可添加到流水线的一站式丰富转换来简化数据丰富化工作流。本页面介绍如何使用 Apache Beam 丰富化转换来丰富流式数据。

丰富数据时,您可以通过添加第二个来源的相关数据来扩充一个来源的原始数据。其他数据可能来自各种来源,例如 BigtableBigQuery。Apache Beam 丰富转换使用键值对查找将其他数据连接到原始数据。

以下示例提供了数据丰富功能有用的一些场景:

  • 您想要创建一个电子商务流水线,用于捕获网站或应用中的用户活动,并提供定制化建议。转换会将这些活动整合到流水线数据中,以便您提供定制建议。
  • 您的用户数据要与地理数据联接以进行基于地理位置的分析。
  • 您想要创建一个流水线,用于从发出遥测事件的物联网 (IOT) 设备收集数据。

优势

丰富转换具有以下优势:

支持和限制

丰富转换具有以下要求:

  • 适用于批处理和流式处理流水线。
  • Apache Beam Python SDK 2.54.0 版及更高版本提供了 BigTableEnrichmentHandler 处理程序。
  • Apache Beam Python SDK 2.55.0 版及更高版本提供了 VertexAIFeatureStoreEnrichmentHandler 处理程序。
  • 使用 Apache Beam Python SDK 2.55.0 及更高版本时,您还需要安装适用于 Redis 的 Python 客户端
  • Dataflow 作业必须使用 Runner v2

使用丰富转换

要使用丰富转换,请在流水线中添加以下代码:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

由于丰富转换默认执行交叉联接,因此请设计自定义联接以丰富输入数据。这种设计可确保联接仅包含指定的字段。

在以下示例中,left 是丰富转换的输入元素,right 是从外部服务为该输入元素提取的数据。

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

参数

要使用丰富转换,必须提供 EnrichmentHandler 参数。

您还可以使用配置参数为联接函数、超时、节流器或重复器(重试策略)指定 lambda 函数。以下配置参数可用:

  • join_fn:一个 lambda 函数,该函数将字典作为输入并返回经过丰富的行 (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row])。经过丰富的行指定了如何联接从 API 中提取的数据。默认为交叉联接。
  • timeout:API 在超时前等待请求完成的秒数。默认为 30 秒。
  • throttler:指定节流机制。唯一支持的选项是默认客户端自适应节流。
  • repeater:指定发生 TooManyRequestsTimeoutException 等错误时的重试策略。默认值为 ExponentialBackOffRepeater

后续步骤