丰富流式数据

Apache Beam 通过提供可添加到流水线的一站式丰富转换,来简化数据丰富工作流。本页面介绍如何使用 Apache Beam 丰富转换来丰富流式数据。

丰富数据时,您可以通过添加来自第二个来源的相关数据来增强来自一个源的原始数据。额外数据可以来自各种源,例如 BigtableBigQuery。Apache Beam 丰富转换使用键值对查找将额外数据与原始数据相关联。

以下示例提供了数据丰富有用的一些情况:

  • 您想要创建电子商务流水线,用于捕获网站或应用中的用户活动并提供个性化推荐。转换会将活动整合到流水线数据中,以便您可以提供个性化推荐。
  • 您有用户数据,想要将其与地理位置数据联接,以便进行基于地理位置的分析。
  • 您想要创建流水线,用于从发出遥测事件的物联网 (IOT) 设备收集数据。

优势

丰富转换具有以下优势:

  • 无需编写复杂代码或管理底层库即可转换数据。
  • 提供内置源代码处理程序。
  • 使用客户端节流来管理对请求的速率限制。系统会使用默认的重试策略以指数方式对这些请求进行退避。您可以根据使用场景配置速率限制。

支持和限制

丰富转换具有以下要求:

  • 适用于批处理和流式处理流水线。
  • Apache Beam Python SDK 2.54.0 版及更高版本提供了 BigTableEnrichmentHandler 处理程序。
  • Apache Beam Python SDK 2.55.0 版及更高版本提供了 VertexAIFeatureStoreEnrichmentHandler 处理程序。
  • 使用 Apache Beam Python SDK 2.55.0 及更高版本时,您还需要安装适用于 Redis 的 Python 客户端
  • Dataflow 作业必须使用 Runner v2

使用丰富转换

如需使用丰富转换,请在流水线中添加以下代码:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

由于丰富转换默认执行交叉联接,因此请设计自定义联接来丰富输入数据。这种设计可确保联接仅包含指定的字段。

在以下示例中,left 是丰富转换的输入元素,而 right 是从外部服务提取的该输入元素的数据。

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

参数

要使用丰富转换,必须提供 EnrichmentHandler 参数。

您还可以使用配置参数为联接函数、超时、节流器或重复器(重试策略)指定 lambda 函数。以下配置参数可用:

  • join_fn:一个 lambda 函数,该函数将字典作为输入并返回经过丰富的行 (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row])。经过丰富的行指定了如何联接从 API 中提取的数据。默认为交叉联接。
  • timeout:在超时之前等待 API 完成请求所需的秒数。默认为 30 秒。
  • throttler:指定节流机制。唯一支持的选项是默认客户端自适应节流。
  • repeater:指定发生 TooManyRequestsTimeoutException 等错误时的重试策略。默认值为 ExponentialBackOffRepeater

后续步骤