电子商务示例应用演示了使用 Dataflow 实现流式数据分析和实时 AI 的最佳实践。该示例包含任务模式,这些模式展示了完成 Java 编程任务的最佳方式。创建电子商务应用通常需要这些任务。
该应用包含以下 Java 任务模式:
- 使用 Apache Beam 架构处理结构化数据
- 使用 JsonToRow 转换 JSON 数据
- 使用
AutoValue
代码生成器生成普通的旧 Java 对象 (POJO) - 将无法处理的数据加入队列以供进一步分析
- 依次应用数据验证转换
- 使用
DoFn.StartBundle
对外部服务进行微批量调用 - 使用适当的旁路输入模式
使用 Apache Beam 架构处理结构化数据
您可以使用 Apache Beam 架构更轻松地处理结构化数据。
通过将对象转换为行,您可以生成非常干净的 Java 代码,这使得有向无环图 (DAG) 的构建练习更容易。您还可以在创建的分析语句中将对象属性引用为字段,而不必调用方法。
示例
使用 JsonToRow 转换 JSON 数据
在 Dataflow 中处理 JSON 字符串是一种常见需求。例如,在流式传输从 Web 应用捕获的点击流信息时,系统会处理 JSON 字符串。要处理 JSON 字符串,您需要在流水线处理期间将其转换为行或普通旧 Java 对象 (POJO)。
您可以使用 Apache Beam 内置转换 JsonToRow 将 JSON 字符串转换为行。但是,如果您需要队列来处理失败消息,则需要单独进行构建,请参阅将无法处理的数据加入队列以供进一步分析。
如果您需要使用 AutoValue 将 JSON 字符串转换为 POJO,请使用 @DefaultSchema(AutoValueSchema.class)
注解注册该类型的架构,然后使用 Convert 实用程序类。生成的代码类似于以下内容:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
如需了解详情(包括可用于推断架构的不同 Java 类型),请参阅创建架构。
如果 JsonToRow 不适用于您的数据,可以使用 Gson 作为替代方法。Gson 对数据的默认处理是相当宽松的,您可能需要在数据转换过程中构建更多验证。
示例
使用 AutoValue
代码生成器生成 POJO
由于 Apache Beam 架构支持的结构化数据处理方式,它通常是表示流水线中对象的最佳方式。不过,有时需要普通旧 Java 对象 (POJO),例如在处理键值对象或处理对象状态时。
构建 POJO 时,您需要对 equals()
和 hashcode()
方法进行重写,而此操作既耗时又容易出错。不正确的重写可能会导致应用行为不一致或数据丢失。
如需生成 POJO,请使用 AutoValue
类构建器。此选项可确保使用必要的重写值,并避免潜在的错误。
AutoValue
在 Apache Beam 代码库中大量使用,因此熟悉此类构建器有助于在 Dataflow 上使用 Java 开发 Apache Beam 流水线。
如果您添加 @DefaultSchema(AutoValueSchema.class)
注解,则还可以将 AutoValue
与 Apache Beam 架构搭配使用。如需了解详情,请参阅创建架构。
如需详细了解 AutoValue
,请参阅 为什么选择 AutoValue?
和 AutoValue
文档。
示例
将无法处理的数据加入队列以供进一步分析
在生产系统中,处理有问题的数据非常重要。如果可能,您可以在数据流中验证并更正数据。如果无法更正,请将值记录到未处理的消息队列中(有时称为死信队列),以供稍后分析。将数据从一种格式转换为另一种格式时(例如,将 JSON 字符串转换为行时)最容易发生问题。
要解决此问题,请使用多输出转换,将包含未处理数据的元素传送到另一个 PCollection 中以进一步分析。这种处理是您可能需要在流水线的许多位置使用的常见操作。请尽量使转换足够通用,以便在多个位置使用。首先,创建一个错误对象以封装通用属性,包括原始数据。接下来,创建一个具有多个目标位置选项的接收器转换。
示例
依次应用数据验证转换
从外部系统收集的数据通常需要清理。设计流水线结构,使其尽可能在流中更正有问题的数据。 如果需要,将数据发送到队列以进一步分析。
由于单个消息可能会遇到多个需要更正的问题,因此请仔细规划所需的有向无环图 (DAG)。如果某个元素包含存在多个缺陷的数据,您必须确保该元素经历适当的转换。
例如,假设某一元素具有以下值,且两个值都不得为 null:
{"itemA": null,"itemB": null}
确保元素经历可纠正两个潜在问题的转换:
badElements.apply(fixItemA).apply(fixItemB)
您的流水线可能具有更多的串行步骤,但融合有助于最大限度地减少产生的处理开销。
示例
使用 DoFn.StartBundle
对外部服务进行微批量调用
您可能需要在流水线中调用外部 API。由于流水线在许多计算资源之间分配工作,因此对流经系统的每个元素进行一次调用可能会导致外部服务端点过载。如果您没有应用任何归纳函数,则此问题尤为常见。
为避免此问题,请批量调用外部系统。
您可以使用 GroupByKey
转换或使用 Apache Beam Timer API 来进行批量调用。但是,这些方法都需要重排,这会产生一些处理开销,并且需要神奇数字来确定键空间。
请改用 StartBundle
和 FinishBundle
生命周期元素来批处理数据。使用这些选项时无需进行 shuffle。
此选项的一个小缺点是,捆绑包大小是由运行程序的实现动态确定的,具体取决于流水线及其工作器中当前发生的情况。在流模式下,捆绑包通常较小。Dataflow 捆绑受后端因素的影响,例如分片使用情况、特定键可用的数据数量以及流水线的吞吐量。
示例
EventItemCorrectionService.java
使用适当的旁路输入模式来丰富数据
在流式分析应用程序中,通常会使用额外的信息来丰富数据,以帮助进行进一步分析。例如,如果您有交易的商店 ID,则建议添加有关商店位置的信息。如需添加这些额外的信息,通常可以获取一个元素并从对照表中引入信息。
对于变化较慢且比较小的对照表,最好将该表作为实现 Map<K,V>
接口的单例类引入流水线。此选项使您可以避免让每个元素都进行 API 调用来进行查找。在流水线中添加表的副本后,您需要定期更新它,使其保持最新。
如需处理缓慢更新的旁路输入,请使用 Apache Beam 旁路输入模式。
缓存
旁路输入在内存中加载,因此会自动缓存。
您可以使用 --setWorkerCacheMb
选项设置缓存的大小。
您可以在 DoFn
实例之间共享缓存,并使用外部触发器刷新缓存。
示例
SlowMovingStoreLocationDimension.java