从 Java 版 Dataflow SDK 1.x 迁移

本文档重点介绍了 Java 版 Dataflow SDK 1.x 版本与 2.x 版本之间的重大更改。

Dataflow SDK 弃用通知:Dataflow SDK 2.5.0 是最后一个独立于 Apache Beam SDK 版本发布的 Dataflow SDK 版本。Dataflow 服务完全支持 Apache Beam SDK 正式版本。Dataflow 服务还支持以前发布的 Apache Beam SDK 2.0.0 及更高版本。如需了解各种 SDK 的支持状态,请参阅 Dataflow 支持页面Apache Beam 下载页面包含 Apache Beam SDK 各版本的版本说明。

从 1.x 迁移到 2.x

要安装和使用 Java 版 Apache Beam SDK 2.x,请参阅 Apache Beam SDK 安装指南

从 1.x 到 2.x 的重大更改

注意:所有用户都需要注意这些更改,以便升级到 2.x 版本。

软件包重命名和重组

在对 Apache Beam 进行泛化以与不局限于 Google Cloud Platform 的环境高效协作的过程中,SDK 代码已进行了重命名和重组。

com.google.cloud.dataflow 已重命名为 org.apache.beam

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-78

现在,该 SDK 在软件包 org.apache.beam(而非 com.google.cloud.dataflow)中声明。此更改后,您需要更新所有导入语句。

新的子软件包:runners.dataflowrunners.directio.gcp

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-77

运行程序已被重组到各自的软件包中,因此 com.google.cloud.dataflow.sdk.runners 中的很多内容已迁移至 org.apache.beam.runners.directorg.apache.beam.runners.dataflow

特定于在 Dataflow 服务上运行的流水线选项已从 com.google.cloud.dataflow.sdk.options 迁移至 org.apache.beam.runners.dataflow.options

Google Cloud Platform 服务的大多数 I/O 连接器已迁移至子软件包。例如,BigQueryIO 已从 com.google.cloud.dataflow.sdk.io 迁移至 org.apache.beam.sdk.io.gcp.bigquery

大多数 IDE 都能够帮助识别新位置。如需验证特定文件的新位置,您可以使用 t 在 GitHub 上搜索代码。Java 版 Cloud Dataflow SDK 1.x 基于 GoogleCloudPlatform/DataflowJavaSDK 代码库(master-1.x 分支)构建。Java 版 Dataflow SDK 2.x 与 apache/beam 代码库中的代码对应。

运行程序

从运行程序名称中移除了 Pipeline

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1185

所有运行程序的名称中均移除了 Pipeline,因此名称变短了。例如,DirectPipelineRunner 现在是 DirectRunnerDataflowPipelineRunner 现在是 DataflowRunner

需要将 --tempLocation 设置为 Google Cloud Storage 路径

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-430

不再支持如下方式:仅指定 --stagingLocation--tempLocation 中的一个,然后由 Dataflow 推断出另一个;现在,Dataflow 服务要求将 --gcpTempLocation 设置为 Google Cloud Storage 路径,不过也可以借助较通用的 --tempLocation 将其推断出来。除非进行替换,否则该路径还将用于 --stagingLocation

已移除 InProcessPipelineRunner

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-243

DirectRunner 仍然在用户的本地计算机上运行,不过它现在还另外支持多线程执行、无界限 PCollection 以及针对推测输出和延迟输出的触发器。它与记录成文的 Beam 模型之间更加一致,且可能(正确地)导致其他单元测试的失败。

此功能目前位于 DirectRunner 中,因此 InProcessPipelineRunner(Java 版 Dataflow SDK 1.6+)已移除。

BlockingDataflowPipelineRunner 已替换为 PipelineResult.waitToFinish()

受影响的用户:所有 | 影响:编译错误

BlockingDataflowPipelineRunner 现已移除。如果您的代码需要以编程方式运行某个流水线并一直等到流水线终止,则它应使用 DataflowRunner 并明确调用 pipeline.run().waitToFinish()

如果您以前会在命令行中使用 --runner BlockingDataflowPipelineRunner,目的是以交互方式引导主程序暂停,直到该流水线终止,那么在新版本中,这个问题应该交由主程序处理;主程序应该提供可引导其调用 waitToFinish() 的选项(例如 --blockOnRun)。

TemplatingDataflowPipelineRunner 已替换为 --templateLocation

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-551

TemplatingDataflowPipelineRunner(Java 版 Cloud Dataflow SDK 1.9+)中的功能已被替换,将 --templateLocationDataflowRunner 结合使用即可实现同样的功能。

ParDo 和 DoFn

DoFn 使用注释,而非方法替换项

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-37

为了支持更高的灵活性和自定义级别,DoFn 现在使用方法注释来自定义处理过程,不再要求用户替换特定方法。

以下代码示例演示了新旧 DoFn 之间的区别。以前,在 Java 版 Dataflow SDK 1.x 中,您的代码与以下内容类似:

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

现在,在 Java 版 Apache Beam SDK 2.x 中,您的代码与以下内容类似:

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

如果 DoFn 访问了 ProcessContext#window(),则还会进行进一步的更改。您不再使用如下代码:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

而是要编写如下代码:

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

或者:

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

运行时将自动为您的 DoFn 提供窗口。

DoFn 在多个软件包之间重复使用

受影响的用户:所有 | 影响:可能会以静默方式导致意外结果 | JIRA 问题BEAM-38

为了支持性能提升,同一 DoFn 现在可以重复用于处理由多个元素组成的多个软件包,而不是保证为每个软件包提供一个全新实例。在软件包末尾保留本地状态(例如实例变量)的所有 DoFn 都可能会遇到行为更改,因为下一个软件包将基于该状态(而非全新副本)开始。

为管理生命周期而新增了 @Setup@Teardown 方法。完整的生命周期如下(不过,故障可能会在任何时候截断生命周期):

  • @Setup:按实例对 DoFn 执行初始化,例如打开可重复使用的连接。
  • 任意数量的序列:
    • @StartBundle:按软件包执行初始化,例如重置 DoFn 的状态。
    • @ProcessElement:正常的元素处理过程。
    • @FinishBundle:按软件包结束的步骤,例如消除副作用。
  • @Teardown:按实例拆除由 DoFn 持有的资源,如关闭可重复使用的连接。

注意:此更改在实践中的影响应该有限。不过,它不会生成编译时错误,且可能会以静默方式导致意外结果。

更改了指定辅助输入或输出时的参数顺序

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1422

现在,应用 ParDo 时应始终先指定 DoFn。您不再使用如下代码:

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

而是要编写如下代码:

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransform

已移除 .named()

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-370

从 PTransform 和子类中移除 .named() 方法。请改用 PCollection.apply(“name”, PTransform)

PTransform.apply() 已重命名为 PTransform.expand()

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-438

PTransform.apply() 已重命名为 PTransform.expand(),以免与 PCollection.apply().混淆。用户编写的所有复合转换都需要将替换的 apply() 方法重命名为 expand()。流水线的构建方式没有变化。

其他重大更改

下面列出了其他一些重大更改及即将生效的更改。

个别 API 更改

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-725

移除了以下 GcpOptionsTokenServerUrlCredentialDirCredentialIdSecretsFileServiceAccountNameServiceAccountKeyFile

使用 GoogleCredentials.fromStream(InputStream for credential)。该流可能包含来自 Google Developers Console 的 JSON 格式的服务帐号密钥文件,或采用 Cloud SDK 支持的格式存储的用户凭据。

--enableProfilingAgent 已更改为 --saveProfilesToGcs

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1122

--update 已移至 DataflowPipelineOptions

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-81

--update PipelineOptionDataflowPipelineDebugOptions 移至 DataflowPipelineOptions

已移除 BoundedSource.producesSortedKeys()

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1201

BoundedSource 中移除 producesSortedKeys()

更改了 PubsubIO API

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-974BEAM-1415

从 2.0.0-beta2 开始,必须使用 PubsubIO.<T>read()PubsubIO.<T>write()(而非 PubsubIO.Read.topic(String) 等静态出厂方法)对 PubsubIO.ReadPubsubIO.Write 进行实例化。

用于配置 PubsubIO 的方法已重命名,例如 PubsubIO.read().topic(String) 已重命名为 PubsubIO.read().fromTopic()。同样,subscription() 已重命名为 fromSubscription()timestampLabelidLabel 已分别重命名为 withTimestampAttributewithIdAttributePubsubIO.write().topic() 已重命名为 PubsubIO.write().to()

PubsubIO 会公开用于读写字符串、Avro 消息和 Protobuf 消息的函数(例如 PubsubIO.readStrings()PubsubIO.writeAvros()),不再指定用于解析消息载荷的 Coder。如需读写自定义类型,请使用 PubsubIO.read/writeMessages()(如果应包含消息特性,则使用 PubsubIO.readMessagesWithAttributes),还可以使用 ParDoMapElements 在自定义类型和 PubsubMessage 之间进行转换。

移除了对不受支持的 v1beta2 API 的 DatastoreIO 支持

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-354

DatastoreIO 现在基于 Cloud Datastore API v1。

更改了 DisplayData.Builder

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-745

DisplayData.Builder.include(..) 包含一个新的必需路径参数,用于注册子组件显示数据。Builder API 现在会返回 DisplayData.ItemSpec<>,而不是 DisplayData.Item

FileBasedSink.getWriterResultCoder() 为必需项

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-887

FileBasedSink.getWriterResultCoder 已转换为一个抽象方法,您必须提供其具体实现。

Filter.byPredicate() 已重命名为 Filter.by()

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-342

已移除 IntraBundleParallelization

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-414

RemoveDuplicates 已重命名为 Distinct

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-239

更改了 TextIO,以使用其他语法且仅针对字符串进行操作

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1354

TextIO.Read.from() 更改为 TextIO.read().from(),同样,TextIO.Write.to() 更改为 TextIO.write().to()

TextIO.Read 现在始终返回 PCollection<String>,且不通过 .withCoder() 来解析字符串。取而代之的是,它会通过对集合应用 ParDoMapElements 来解析字符串。 同样,TextIO.Write 现在始终接受 PCollection<String>;如需将其他内容写入 TextIO,请通过 ParDoMapElements 将其转换为 String

AvroIO 更改为使用其他语法

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1402

为了读写 Avro 生成的类型,AvroIO.Read.from().withSchema(Foo.class) 已更改为 AvroIO.read(Foo.class).from()AvroIO.Write 也是如此。

为了读写使用指定架构的 Avro 常规记录,AvroIO.Read.from().withSchema(Schema or String) 已更改为 AvroIO.readGenericRecords().from()AvroIO.Write 也是如此。

更改了 KafkaIO,以明确指定类型参数并使用 Kafka 序列化器/反序列化器

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1573BEAM-2221

在 KafkaIO 中,您现在必须明确指定键和值的类型参数,例如 KafkaIO.<Foo, Bar>read()KafkaIO.<Foo, Bar>write()

请使用标准 Kafka SerializerDeserializer 类(而非 Coder)来解释键和值字节。例如,使用 KafkaIO.read().withKeyDeserializer(StringDeserializer.class),而不要使用 KafkaIO.read().withKeyCoder(StringUtf8Coder.of())KafkaIO.write() 也是如此。

更改了 BigQueryIO 的语法

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1427

使用 BigQueryIO.read().from()BigQueryIO.write().to(),而不要使用 BigQueryIO.Read.from()BigQueryIO.Write.to()

更改了 KinesisIO.Read 的语法

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1428

使用 KinesisIO.read().from().withClientProvider(),而不要使用 KinesisIO.Read.from().using()

更改了 TFRecordIO 的语法

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1913

使用 TFRecordIO.read().from()TFRecordIO.write().to(),而不要使用 TFRecordIO.Read.from()TFRecordIO.Write.to()

已合并 XmlIO 下的 XmlSourceXmlSink

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1914

使用 XmlIO,而不要直接使用 XmlSourceXmlSink

例如:使用 XmlIO.read().from(),而不要使用 Read.from(XmlSource.from())。使用 XmlIO.write().to(),而不要使用 Write.to(XmlSink.writeOf())

CountingInput 已重命名为 GenerateSequence,并进行了泛化

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1414

使用 GenerateSequence.from(0),而不要使用 CountingInput.unbounded()。使用 GenerateSequence.from(0).to(n),而不要使用 CountingInput.upTo(n)

更改了 CountLatestSample

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1417BEAM-1421BEAM-1423

Count.PerElement Count.PerKey Count.Globally 现已为专用类,因此您必须使用出厂函数,例如 Count.perElement()(而在以前,您可以使用 new Count.PerElement())。此外,举例来说,如果您想要对转换结果使用 .withHotKeyFanout(),则不应再对 .apply(Count.perElement()) 的结果直接使用它;取而代之的是,Count 会将其组合函数公开为 Count.combineFn(),而您应自行应用 Combine.globally(Count.combineFn())

类似更改还适用于 LatestSample 转换。

更改了 MapElementsFlatMapElements 的参数顺序

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1418

现在,您在将 MapElementsFlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor) 结合使用时必须先指定描述符,例如 FlatMapElements.into(descriptor).via(fn)

更改了配置其他参数时所用的 Window

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1425

使用 Window 配置除 WindowFn 本身 (Window.into()) 之外的参数时,请使用 Window.configure()。例如,使用 Window.configure().triggering(...),而不要使用 Window.triggering(...)

Write.Bound 已重命名为 Write

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1416

Write.Bound 类现已简化为 Write。只有当您将 Write.to(Sink) 的应用提取到某个变量中时才会产生影响,其类型先前为 Write.Bound<...>,现在是 Write<...>

Flatten 转换类已重命名

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1419

Flatten.FlattenIterablesFlatten.FlattenPCollectionList 类已分别重命名为 Flatten.IterablesFlatten.PCollections

GroupByKey.create(boolean) 已拆分为两种方法

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1420

GroupByKey.create(boolean fewKeys) 现已简化为 GroupByKey.create()GroupByKey.createWithFewKeys()

更改了 SortValues

受影响的用户:所有 | 影响:编译错误 | JIRA 问题BEAM-1426

BufferedExternalSorter.Options setter 方法已从 setSomeProperty 重命名为 withSomeProperty

其他 Google API 依赖项

从 SDK 2.0.0 版开始,您还必须启用 Cloud Resource Manager API

依赖项升级

2.x 版本对大多数依赖项的固定版本进行了升级,包括 Avro、protobuf 和 gRPC。其中一些依赖项可能自身就进行了重大更改,如果您的代码仍然直接使用原有的依赖项,则可能会出现问题。2.0.0 中使用的版本可在 pom.xml 中或通过 mvn dependency:tree 找到。

内部重构

SDK 的内部结构发生了显著变化。以前依赖于公共 API 以外的内容(例如以 Internal 为结果或包含在 util 软件包中的类或方法)的任何用户都会发现它们发生了显著变化。

如果您使用的是 StateInternalsTimerInternals:这些内部 API 已移除。现在,您可以使用实验性 StateTimer API 来处理 DoFn 了。

已将聚合器替换为指标

您不应使用聚合器来监控,而应使用指标来深入了解流水线的当前状态。如需了解详情,请参阅如何在 Cloud Monitoring 中使用指标