Dataflow SDK 1.x for Java からの移行

このドキュメントでは、Dataflow SDK for Java の 1.x リリースと 2.x リリースの主な違いをまとめています。

Dataflow SDK 非推奨のお知らせ: Dataflow SDK 2.5.0 は、Apache Beam SDK リリースとは別個にリリースされる最後の Dataflow SDK リリースとなります。Dataflow サービスでは、正式な Apache Beam SDK のリリースがフルサポートの対象となります。また、Dataflow サービスでは、バージョン 2.0.0 以降のリリース済み Apache Beam SDK もサポートされます。各種 SDK のサポート状況については、Dataflow のサポートページを参照してください。Apache Beam のダウンロード ページでは、Apache Beam SDK リリースのリリースノートもダウンロードできます。

1.x から 2.x への移行

Apache Beam SDK for Java 2.x をインストールして使用する方法については、Apache Beam SDK インストール ガイドをご覧ください。

1.x から 2.x への主な変更点

注: 2.x バージョンにアップグレードする際には、これらの変更点を必ず把握するようにしてください。

パッケージ名の変更と再構築

Apache Beam が Google Cloud Platform を超えた環境で最適に動作するための一般化の一環として、SDK コードの名前が変更され、再構築されました。

名前が com.google.cloud.dataflow から org.apache.beam に変わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-78

SDK が、com.google.cloud.dataflow ではなく org.apache.beam パッケージで宣言されるようになりました。この変更により、すべてのインポート ステートメントを更新する必要があります。

新しいサブパッケージ: runners.dataflowrunners.directio.gcp

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-77

ランナーが独自のパッケージへと再編成されたため、com.google.cloud.dataflow.sdk.runners 内の多くが org.apache.beam.runners.directorg.apache.beam.runners.dataflow のいずれかに移動しました。

Dataflow サービスでの実行に固有のパイプライン オプションは com.google.cloud.dataflow.sdk.options から org.apache.beam.runners.dataflow.options に移動しました。

Google Cloud Platform サービスへの大部分の I/O コネクタがサブパッケージに移動しました。たとえば、BigQueryIO は com.google.cloud.dataflow.sdk.io から org.apache.beam.sdk.io.gcp.bigquery に移動しました。

ほとんどの IDE で、新しい場所を特定できるようになります。特定のファイルの新しい場所を確認するには、t を使用して GitHub のコードを検索します。Dataflow SDK 1.x for Java のリリースは、GoogleCloudPlatform/DataflowJavaSDK リポジトリ(master-1.x ブランチ)から構築されています。Dataflow SDK 2.x for Java のリリースは、apache/beam リポジトリのコードと対応します。

ランナー

ランナー名から Pipeline を削除しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1185

すべてのランナー名から Pipeline が削除され、名前が短くなりました。たとえば、DirectPipelineRunnerDirectRunner となり、DataflowPipelineRunnerDataflowRunner となりました。

Google Cloud Storage パスに --tempLocation を設定する必要があります

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-430

ユーザーが --stagingLocation または --tempLocation のうち 1 つだけを指定して、Dataflow にもう 1 つを推測させるのではなく、Dataflow サービスでは --gcpTempLocation を Google Cloud Storage パスに設定することが必要になりましたが、これはより一般的な --tempLocation から推測できます。オーバーライドされない限り、これは --stagingLocation にも使用されます。

InProcessPipelineRunner を削除しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-243

DirectRunner は継続してユーザーのローカルマシンで実行されますが、マルチスレッド実行、制限なし PCollection、投機的な出力と遅延出力のトリガーを新たにサポートします。文書化された Beam のモデルにより近くなっており、ユニットテストの失敗が(正常に)増加する場合があります。

この機能は DirectRunner に移行されたため、InProcessPipelineRunner(Dataflow SDK 1.6+ for Java)は削除されました。

BlockingDataflowPipelineRunnerPipelineResult.waitToFinish() に代わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー

BlockingDataflowPipelineRunner は削除されました。パイプラインを実行して終了するまで待機するようにコード内でプログラムされている場合は、DataflowRunner を使用して pipeline.run().waitToFinish() を明示的に呼び出す必要があります。

コマンドラインで --runner BlockingDataflowPipelineRunner を使用して、パイプラインが終了するまでブロックするようメイン プログラムにインタラクティブに指示していた場合、これはメイン プログラムの問題となります。waitToFinish() を呼び出すように指示する --blockOnRun などのオプションを指定する必要があります。

TemplatingDataflowPipelineRunner--templateLocation に代わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-551

TemplatingDataflowPipelineRunner(Dataflow SDK 1.9+ for Java)の機能が、--templateLocationDataflowRunner と使用するように置き換えられました。

ParDo と DoFn

DoFn でメソッドのオーバーライドではなくアノテーションを使用

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-37

柔軟性とカスタマイズ性を高めるために、ユーザーが特定のメソッドをオーバーライドする代わりに、DoFn でメソッドのアノテーションを使用して処理をカスタマイズできるようになりました。

次のコードサンプルで、新旧の DoFn の違いをご確認いただけます。以前の Dataflow SDK 1.x for Java では、コードは次のようなものでした。

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

新しい Apache Beam SDK 2.x for Java では、コードは次のようになります。

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

DoFnProcessContext#window() にアクセスしていた場合は、さらに変更があります。従来は次のようなものでした。

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

新しいバージョンでは、次のようなコードを記述する必要があります。

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

または、次のように記述する必要があります。

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

ランタイムが自動で DoFn にウィンドウを提供します。

複数のバンドルでの DoFn の再利用

影響を受けるユーザー: すべて | 影響: 通知なく予期しない結果が発生する可能性 | JIRA の問題: BEAM-38

パフォーマンスを改善するため、複数の要素のバンドルを処理するために同じ DoFn を再利用できるようになりました。従来は、バンドルごとに新しいインスタンスを保証していました。バンドルの終了後にローカル状態(インスタンス変数など)を保持する DoFn は動作を変えることができます。これは、次のバンドルが新たなコピーではなく、その状態から開始されるためです。

ライフサイクルを管理するために、新たに @Setup メソッドと @Teardown メソッドが追加されました。全体のライフサイクルは次のとおりです(どの時点でもエラーによってライフサイクルが短縮される場合があります)。

  • @Setup: 再利用可能な接続の開始などの、インスタンスごとのDoFn の初期化。
  • 任意の数のシーケンス:
    • @StartBundle: DoFn の状態のリセットなどの、バンドルごとの初期化。
    • @ProcessElement: 通常の要素の処理。
    • @FinishBundle: 副次的影響の解決などの、バンドルごとの完了ステップ。
  • @Teardown: 再利用可能な接続の終了などの、DoFn が保持するリソースのインスタンスごとの破棄。

注: この変更による影響は、実際は限られたものになると想定されます。しかし、コンパイル時にエラーは発生しないため、通知なしで予期せぬ結果がもたらされる可能性があります。

副入力または副出力を指定する際のパラメータの順序が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1422

ParDo を適用するときは必ず最初に DoFn を指定する必要があります。つまり、

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

のようなコードでなく、次のようなコードを記述する必要があります。

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransform

.named() を削除しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-370

PTransform とサブクラスから .named() メソッドを削除しました。代わりに PCollection.apply(“name”, PTransform) を使用します。

名前が PTransform.apply() から PTransform.expand() に変わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-438

PCollection.apply() との混同を避けるため、PTransform.apply()の名前が PTransform.expand() に変更されました。ユーザーが記述したすべての複合変換において、オーバーライドされた apply() メソッドを expand() へと変更する必要があります。パイプラインの構築方法には変更はありません。

その他の重要な変更点

以下は、その他の重要な変更点と予定されている変更のリストです。

個々の API の変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-725

次の GcpOptions を削除しました。TokenServerUrlCredentialDirCredentialIdSecretsFileServiceAccountNameServiceAccountKeyFile

GoogleCredentials.fromStream(InputStream for credential) を使用してください。ストリームには、Google Developers Console からの JSON 形式のサービス アカウント キーのファイルか、Cloud SDK でサポートされている形式を使用して保存されたユーザー認証情報を含めることができます。

--enableProfilingAgent--saveProfilesToGcs に変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1122

--updateDataflowPipelineOptions に移動しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-81

--update PipelineOptionDataflowPipelineDebugOptions から DataflowPipelineOptions に移動しました。

BoundedSource.producesSortedKeys() を削除しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1201

producesSortedKeys()BoundedSource から削除してください。

PubsubIO API が変更されました

影響を受けるユーザー: すべて| 影響: コンパイルのエラー| JIRA の問題: BEAM-974BEAM-1415

2.0.0-beta2 以降では、PubsubIO.ReadPubsubIO.Write は、PubsubIO.Read.topic(String) などの静的ファクトリ メソッドではなく、PubsubIO.<T>read()PubsubIO.<T>write() を使用してインスタンス化する必要があります。

PubsubIO を構成するメソッドの名前が変更されました。たとえば、PubsubIO.read().topic(String) の名前が PubsubIO.read().fromTopic() に変更されました。同様に: subscription()fromSubscription()timestampLabelidLabel はそれぞれ withTimestampAttributewithIdAttributePubsubIO.write().topic()PubsubIO.write().to() に変更されました。

PubsubIO では、Coder を指定することでメッセージ ペイロードが解析されるのではなく、文字列、Avro メッセージ、Protobuf メッセージを読み書きするための PubsubIO.readStrings()PubsubIO.writeAvros() などの関数が公開されています。カスタム型を読み書きする場合は、PubsubIO.read/writeMessages()(メッセージ属性も含める場合は PubsubIO.readMessagesWithAttributes)を使用し、ParDo または MapElements を使用してカスタム型と PubsubMessage の間で変換を行ってください。

サポート対象外の v1beta2 API に対する DatastoreIO のサポートの終了

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-354

DatastoreIO は Cloud Datastore API v1 をベースとするようになりました。

DisplayData.Builder が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-745

DisplayData.Builder.include(..) に、サブコンポーネントの表示データを登録するために必要なパスパラメータが新たに追加されました。Builder API では、DisplayData.Item ではなく DisplayData.ItemSpec<> を返すようになりました。

FileBasedSink.getWriterResultCoder() が必須になりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-887

FileBasedSink.getWriterResultCoder が、必須の抽象化メソッドに変更されました。

名前が Filter.byPredicate() から Filter.by() に変わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-342

IntraBundleParallelization を削除しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-414

名前が RemoveDuplicates から Distinct に変わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-239

異なる構文を使用して文字列のみを操作するように TextIO を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1354

TextIO.Read.from()TextIO.read().from() に変更されました、同様に TextIO.Write.to()TextIO.write().to() に変更されました。

TextIO.Read が常に PCollection<String> を返し、文字列の解析に .withCoder() を使用しないようになりました。代わりに、コレクションに ParDo または MapElements を適用して文字列を解析します。同様に、TextIO.Write が常に PCollection<String> を使用し、TextIO に書き込みを行い、ParDo または MapElements を使用して String に変換するようになりました。

異なる構文を使用するように AvroIO を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1402

Avro で生成された型を読み書きするため、AvroIO.Read.from().withSchema(Foo.class)AvroIO.read(Foo.class).from() に変更されました(AvroIO.Write も同様)。

指定されたスキーマを使用して Avro 汎用レコードを読み書きするため、AvroIO.Read.from().withSchema(Schema or String)AvroIO.readGenericRecords().from() に変更されました(AvroIO.Write も同様)。

型パラメータを明示的に指定し、Kafka シリアライザ / デシリアライザを使用するように KafkaIO が変更されました。

影響を受けるユーザー: すべて| 影響: コンパイルのエラー| JIRA の問題: BEAM-1573BEAM-2221

KafkaIO では、キーと値の型のパラメータを明示的に指定する必要があります。例: KafkaIO.<Foo, Bar>read()KafkaIO.<Foo, Bar>write()

キーと値のバイトを解釈するために、Coder を使用するのでなく、Kafka の標準の Serializer クラスと Deserializer クラスを使用してください。例: KafkaIO.read().withKeyCoder(StringUtf8Coder.of()) ではなく KafkaIO.read().withKeyDeserializer(StringDeserializer.class) を使用、KafkaIO.write() も同様。

BigQueryIO の構文を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1427

BigQueryIO.Read.from()BigQueryIO.Write.to() の代わりに、BigQueryIO.read().from()BigQueryIO.write().to() を使用。

KinesisIO.Read の構文を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1428

KinesisIO.Read.from().using() の代わりに、KinesisIO.read().from().withClientProvider() を使用してください。

TFRecordIO の構文を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1913

TFRecordIO.Read.from()TFRecordIO.Write.to() の代わりに、TFRecordIO.read().from()TFRecordIO.write().to() を使用。

XmlIOXmlSourceXmlSink を統合しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1914

XmlSourceXmlSink を直接使用する代わりに XmlIO を使用してください。

例: Read.from(XmlSource.from()) の代わりに、XmlIO.read().from() を使用。Write.to(XmlSink.writeOf()) の代わりに XmlIO.write().to() を使用。

CountingInputGenerateSequence に名前変更され、汎用化されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1414

CountingInput.unbounded() の代わりに、GenerateSequence.from(0) を使用してください。CountingInput.upTo(n) の代わりに、GenerateSequence.from(0).to(n) を使用してください。

CountLatestSample を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1417BEAM-1421BEAM-1423

クラス Count.PerElementCount.PerKeyCount.Globally が非公開になったため、Count.perElement() などのファクトリ関数を使用する必要があります(以前は new Count.PerElement() を使用できました)。さらに、変換の結果に対して、たとえば、.withHotKeyFanout() を使用する場合、.apply(Count.perElement()) の結果に対して直接実行することはできません。代わりに、Count で結合関数が Count.combineFn() として公開され、Combine.globally(Count.combineFn()) を自分自身で適用する必要があります。

同様の変更は、Latest 変換と Sample 変換にも適用されます。

MapElementsFlatMapElements のパラメータの順序を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1418

MapElementsFlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor) を使用する際に、まず記述子を指定することが必要になりました。例: FlatMapElements.into(descriptor).via(fn)

追加パラメータの構成時の Window を変更しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1425

Window を使用して WindowFn 自体でないもの(Window.into())を構成する場合は、Window.configure() を使用してください。例: Window.triggering(...) の代わりに、Window.configure().triggering(...) を使用します。

名前が Write.Bound から Write に変わりました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1416

Write.Bound がシンプルな Write になりました。これは、Write.to(Sink) のアプリケーションを抽出して変数に格納する場合にのみ関係します。その変数の型は以前は Write.Bound<...> でしたが、Write<...> になりました。

Flatten 変換クラスの名前が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1419

クラス Flatten.FlattenIterablesFlatten.FlattenPCollectionList の名前が、それぞれ Flatten.IterablesFlatten.PCollections に変更されました。

GroupByKey.create(boolean) を 2 つのメソッドに分割しました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1420

GroupByKey.create(boolean fewKeys) がシンプルな GroupByKey.create()GroupByKey.createWithFewKeys() になりました。

SortValues が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1426

BufferedExternalSorter.Options setter メソッドの名前が setSomeProperty から withSomeProperty に変更されました。

追加の Google API との依存関係

SDK バージョン 2.0.0 から、Cloud Resource Manager API を有効にすることも必要になりました。

依存関係のアップグレード

2.x リリースでは、Avro、Protobuf、gRPC など、多くの依存関係の固定されたバージョンがアップグレードされます。これらの依存関係の一部は、独自に大幅に変更された可能性があるため、コードが依存関係に直接依存している場合は、問題が発生する可能性があります。2.0.0 で使用されているバージョンは、pom.xml または mvn dependency:tree で確認できます。

内部のリファクタリング

SDK の内部構造が大幅に変更されました。公開 API(Internal パッケージや util パッケージで終わるクラスやメソッドなど)以外のものに依存していたユーザーは、大幅な変更に直面する可能性があります。

StateInternals または TimerInternals を使用していた場合、これらの内部 API は削除されました。DoFn 用に試験運用版の StateTimer API を使用できるようになりました。