Migrating from Cloud Dataflow SDK 1.x for Java

This document highlights the major changes between the Cloud Dataflow SDK for Java 1.x releases and 2.x releases.

Cloud Dataflow SDK Deprecation Notice: The Cloud Dataflow SDK 2.5.0 is the last Cloud Dataflow SDK release that is separate from the Apache Beam SDK releases. The Cloud Dataflow service fully supports official Apache Beam SDK releases. The Cloud Dataflow service also supports previously released Apache Beam SDKs starting with version 2.0.0 and above. See the Cloud Dataflow support page for the support status of various SDKs. The Apache Beam downloads page contains release notes for the Apache Beam SDK releases.

Migrating from 1.x to 2.x

To install and use the Apache Beam SDK for Java 2.x, see the Apache Beam SDK installation guide.

Major changes from 1.x to 2.x

NOTE: All users need to be aware of these changes in order to upgrade to 2.x versions.

Package renaming and restructuring

As part of generalizing Apache Beam to work well with environments beyond Google Cloud Platform, the SDK code has been renamed and restructured.

Renamed com.google.cloud.dataflow to org.apache.beam

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-78

The SDK is now declared in the package org.apache.beam instead of com.google.cloud.dataflow. You need to update all of your import statements with this change.

New subpackages: runners.dataflow, runners.direct, and io.gcp

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-77

Runners have been reorganized into their own packages, so many things from com.google.cloud.dataflow.sdk.runners have moved into either org.apache.beam.runners.direct or org.apache.beam.runners.dataflow.

Pipeline options specific to running on the Cloud Dataflow service have moved from com.google.cloud.dataflow.sdk.options to org.apache.beam.runners.dataflow.options.

Most I/O connectors to Google Cloud Platform services have been moved into subpackages. For example, BigQueryIO has moved from com.google.cloud.dataflow.sdk.io to org.apache.beam.sdk.io.gcp.bigquery.

Most IDEs will be able to help identify the new locations. To verify the new location for specific files, you can use t to search the code on GitHub. The Cloud Dataflow SDK 1.x for Java releases are built from the GoogleCloudPlatform/DataflowJavaSDK repository (master-1.x branch). The Cloud Dataflow SDK 2.x for Java releases correspond to code from the apache/beam repository.

Runners

Removed Pipeline from Runner names

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1185

The names of all Runners have been shortened by removing Pipeline from the names. For example, DirectPipelineRunner is now DirectRunner, and DataflowPipelineRunner is now DataflowRunner.

Require setting --tempLocation to a Google Cloud Storage path

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-430

Instead of allowing you to specify only one of --stagingLocation or --tempLocation and then Cloud Dataflow inferring the other, the Cloud Dataflow Service now requires --gcpTempLocation to be set to a Google Cloud Storage path, but it can be inferred from the more general --tempLocation. Unless overridden, this will also be used for the --stagingLocation.

Removed InProcessPipelineRunner

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-243

The DirectRunner continues to run on a user's local machine, but now additionally supports multithreaded execution, unbounded PCollections, and triggers for speculative and late outputs. It more closely aligns to the documented Beam model, and may (correctly) cause additional unit tests failures.

As this functionality is now in the DirectRunner, the InProcessPipelineRunner (Cloud Dataflow SDK 1.6+ for Java) has been removed.

Replaced BlockingDataflowPipelineRunner with PipelineResult.waitToFinish()

Users affected: All | Impact: Compile error

The BlockingDataflowPipelineRunner is now removed. If your code programmatically expects to run a pipeline and wait until it has terminated, then it should use the DataflowRunner and explicitly call pipeline.run().waitToFinish().

If you used --runner BlockingDataflowPipelineRunner on the command line to interactively induce your main program to block until the pipeline has terminated, then this is a concern of the main program; it should provide an option such as --blockOnRun that will induce it to call waitToFinish().

Replaced TemplatingDataflowPipelineRunner with --templateLocation

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-551

The functionality in TemplatingDataflowPipelineRunner (Cloud Dataflow SDK 1.9+ for Java) has been replaced by using the --templateLocation with DataflowRunner.

ParDo and DoFn

DoFns use annotations instead of method overrides

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-37

In order to allow for more flexibility and customization, DoFn now uses method annotations to customize processing instead of requiring users to override specific methods.

The differences between the new DoFn and the old are demonstrated in the following code sample. Previously (with Cloud Dataflow SDK 1.x for Java), your code would look like this:

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

Now (with Apache Beam SDK 2.x for Java), your code will look like this:

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

If your DoFn accessed ProcessContext#window(), then there is a further change. Instead of this:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

you will write this:

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

or:

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

The runtime will automatically provide the window to your DoFn.

DoFns are reused across multiple bundles

Users affected: All | Impact: May silently cause unexpected results | JIRA Issue: BEAM-38

In order to allow for performance improvements, the same DoFn may now be reused to process multiple bundles of elements, instead of guaranteeing a fresh instance per bundle. Any DoFn that keeps local state (e.g. instance variables) beyond the end of a bundle may encounter behavioral changes, as the next bundle will now start with that state instead of a fresh copy.

To manage the lifecycle, new @Setup and @Teardown methods have been added. The full lifecycle is as follows (while a failure may truncate the lifecycle at any point):

  • @Setup: Per-instance initialization of the DoFn, such as opening reusable connections.
  • Any number of the sequence:
    • @StartBundle: Per-bundle initialization, such as resetting the state of the DoFn.
    • @ProcessElement: The usual element processing.
    • @FinishBundle: Per-bundle concluding steps, such as flushing side effects.
  • @Teardown: Per-instance teardown of the resources held by the DoFn, such as closing reusable connections.

Note: This change is expected to have limited impact in practice. However, it does not generate a compile-time error and has the potential to silently cause unexpected results.

Changed order of parameters when specifying side inputs or outputs

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1422

The DoFn now always has to be specified first when applying a `ParDo. Instead of this:

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

you will write this:

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransforms

Removed .named()

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-370

Remove the .named() methods from PTransforms and sub-classes. Instead use PCollection.apply(“name”, PTransform).

Renamed PTransform.apply() to PTransform.expand()

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-438

PTransform.apply() was renamed to PTransform.expand() to avoid confusion with PCollection.apply(). All user-written composite transforms will need to rename the overridden apply() method to expand(). There is no change to how pipelines are constructed.

Additional breaking changes

The following is a list of additional breaking changes and upcoming changes.

Individual API changes

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-725

Removed the following GcpOptions: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

Use GoogleCredentials.fromStream(InputStream for credential). The stream can contain a Service Account key file in JSON format from the Google Developers Console or a stored user credential using the format supported by the Cloud SDK.

Changed --enableProfilingAgent to --saveProfilesToGcs

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1122

Moved --update to DataflowPipelineOptions

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-81

Move --update PipelineOption to DataflowPipelineOptions from DataflowPipelineDebugOptions.

Removed BoundedSource.producesSortedKeys()

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1201

Remove producesSortedKeys() from BoundedSource.

Changed PubsubIO API

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-974, BEAM-1415

Starting with 2.0.0-beta2, PubsubIO.Read and PubsubIO.Write must be instantiated using PubsubIO.<T>read() and PubsubIO.<T>write() instead of the static factory methods such as PubsubIO.Read.topic(String).

Methods for configuring PubsubIO have been renamed, e.g. PubsubIO.read().topic(String) was renamed to PubsubIO.read().fromTopic(). Likewise: subscription() to fromSubscription(), timestampLabel and idLabel respectively to withTimestampAttribute and withIdAttribute, PubsubIO.write().topic() to PubsubIO.write().to().

Instead of specifying Coder for parsing the message payload, PubsubIO exposes functions for reading and writing strings, Avro messages and Protobuf messages, e.g. PubsubIO.readStrings(), PubsubIO.writeAvros(). For reading and writing custom types, use PubsubIO.read/writeMessages() (and PubsubIO.readMessagesWithAttributes if message attributes should be included) and use ParDo or MapElements to convert between your custom type and PubsubMessage.

Removed DatastoreIO support for the unsupported v1beta2 API

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-354

DatastoreIO is now based on the Cloud Datastore API v1.

Changed DisplayData.Builder

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-745

DisplayData.Builder.include(..) has a new required path parameter for registering sub-component display data. Builder APIs now return a DisplayData.ItemSpec<>, rather than DisplayData.Item.

Required FileBasedSink.getWriterResultCoder()

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-887

Transformed FileBasedSink.getWriterResultCoder into an abstract method, which must be provided.

Renamed Filter.byPredicate() to Filter.by()

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-342

Removed IntraBundleParallelization

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-414

Renamed RemoveDuplicates to Distinct

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-239

Changed TextIO to use a different syntax and to only operate on strings

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1354

TextIO.Read.from() is changed to TextIO.read().from(), likewise TextIO.Write.to() is changed to TextIO.write().to().

TextIO.Read now always returns a PCollection<String> and does not take .withCoder() to parse the strings. Instead, parse the strings by applying a ParDo or MapElements to the collection. Likewise, TextIO.Write now always takes a PCollection<String>, and to write something else to TextIO, convert it to String using a ParDo or MapElements.

Changed AvroIO to use a different syntax

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1402

For reading and writing Avro generated types, AvroIO.Read.from().withSchema(Foo.class) is changed to AvroIO.read(Foo.class).from(), likewise AvroIO.Write.

For reading and writing Avro generic records using a specified schema, AvroIO.Read.from().withSchema(Schema or String) is changed to AvroIO.readGenericRecords().from(), likewise AvroIO.Write.

Changed KafkaIO to specify type parameters explicitly and use Kafka serializers/deserializers

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1573, BEAM-2221

In KafkaIO, you now have to specify the key and value type parameters explicitly: e.g. KafkaIO.<Foo, Bar>read() and KafkaIO.<Foo, Bar>write().

Instead of using Coder for interpreting key and value bytes, use the standard Kafka Serializer and Deserializer classes. E.g.: instead of KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), use KafkaIO.read().withKeyDeserializer(StringDeserializer.class), likewise for KafkaIO.write().

Changed syntax for BigQueryIO

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1427

Instead of BigQueryIO.Read.from() and BigQueryIO.Write.to(), use BigQueryIO.read().from() and BigQueryIO.write().to().

Changed syntax for KinesisIO.Read

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1428

Instead of KinesisIO.Read.from().using(), use KinesisIO.read().from().withClientProvider().

Changed syntax for TFRecordIO

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1913

Instead of TFRecordIO.Read.from() and TFRecordIO.Write.to(), use TFRecordIO.read().from() and TFRecordIO.write().to().

Consolidated XmlSource and XmlSink under XmlIO

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1914

Instead of using XmlSource and XmlSink directly, use XmlIO.

E.g.: instead of Read.from(XmlSource.from()), use XmlIO.read().from(); instead of Write.to(XmlSink.writeOf()), use XmlIO.write().to().

Renamed CountingInput to GenerateSequence and generalized it

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1414

Instead of CountingInput.unbounded(), use GenerateSequence.from(0). Instead of CountingInput.upTo(n), use GenerateSequence.from(0).to(n).

Changed Count, Latest, Sample

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1417, BEAM-1421, BEAM-1423

Classes Count.PerElement, Count.PerKey, Count.Globally are now private, so you have to use the factory functions such as Count.perElement() (whereas previously you could use new Count.PerElement()). Additionally, if you want to e.g. use .withHotKeyFanout() on the result of the transform, then you can't do that directly on a result of .apply(Count.perElement()) and such anymore - instead Count exposes its combine function as Count.combineFn() and you should apply Combine.globally(Count.combineFn()) yourself.

Similar changes apply to the Latest and Sample transforms.

Changed order of parameters for MapElements and FlatMapElements

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1418

When using MapElements and FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), now the descriptor has to be specified first: e.g. FlatMapElements.into(descriptor).via(fn).

Changed Window when configuring additional parameters

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1425

When using Window to configure something other than the WindowFn itself (Window.into()), use Window.configure(). E.g.: instead of Window.triggering(...), use Window.configure().triggering(...).

Renamed Write.Bound to Write

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1416

Class Write.Bound is now simply Write. This matters only if you were extracting applications of Write.to(Sink) into a variable - its type used to be Write.Bound<...>, now it'll be Write<...>.

Renamed Flatten transform classes

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1419

Classes Flatten.FlattenIterables and Flatten.FlattenPCollectionList are renamed respectively to Flatten.Iterables and Flatten.PCollections.

Split GroupByKey.create(boolean) in two methods

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1420

GroupByKey.create(boolean fewKeys) is now simply GroupByKey.create() and GroupByKey.createWithFewKeys().

Changed SortValues

Users affected: All | Impact: Compile error | JIRA Issue: BEAM-1426

BufferedExternalSorter.Options setter methods are renamed from setSomeProperty to withSomeProperty.

Additional Google API dependency

Starting with SDK version 2.0.0, you must also enable the Cloud Resource Manager API.

Dependency upgrades

The 2.x release upgrades the pinned versions of most dependencies, including Avro, protobuf, and gRPC. Some of these dependencies may have made breaking changes of their own, which may cause issues if your code also directly depends on the dependency. Versions used in 2.0.0 can be found in the pom.xml or by using mvn dependency:tree.

Internal refactorings

There have been significant changes to the internal structure of the SDK. Any users who were relying on things beyond the public API (such as classes or methods ending in Internal or in util packages) may find that things have changed significantly.

If you were using StateInternals or TimerInternals: These internal APIs have been removed. You may now use the experimental State and Timer APIs for DoFn.

Oliko tästä sivusta apua? Kerro mielipiteesi

Palautteen aihe:

Tämä sivu
Cloud Dataflow
Tarvitsetko apua? Siirry tukisivullemme.