Cloud Dataflow SDK 1.x for Java からの移行

このドキュメントでは、Cloud Dataflow SDK for Java の 1.x リリースと 2.x リリースの主な違いをまとめています。

Cloud Dataflow SDK のサポート終了のお知らせ: Cloud Dataflow SDK 2.5.0 が Cloud Dataflow SDK の最後のリリースになります。Cloud Dataflow SDK のリリースは Apache Beam SDK のリリースとは別個のものです。Cloud Dataflow サービスでは、正式な Apache Beam SDK のリリースがフルサポートの対象となります。Cloud Dataflow サービスでは、バージョン 2.0.0 以降のリリース済みの Apache Beam SDK のリリースもサポートされます。さまざまな SDK のサポート状況については、Cloud Dataflow のサポートページをご覧ください。Apache Beam のダウンロード ページでは、Apache Beam SDK リリースのリリースノートもダウンロードできます。

1.x から 2.x への移行

Apache Beam SDK for Java 2.x をインストールして使用する方法については、Apache Beam SDK インストール ガイドをご覧ください。

1.x から 2.x への主な変更点

注: 2.x バージョンにアップグレードする際には、これらの変更点を必ず把握するようにしてください。

パッケージ名の変更と再構築

Apache Beam が Google Cloud Platform を超えた環境で最適に動作するための一般化の一環として、SDK コードの名前が変更され、再構築されました。

com.google.cloud.dataflow の名前を org.apache.beam へと変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA 課題: BEAM-78

SDK が、com.google.cloud.dataflow ではなく org.apache.beam のパッケージで宣言されるようになりました。この変更により、すべてのインポート ステートメントを更新する必要があります。

新たなサブパッケージ: runners.dataflowrunners.directio.gcp

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA 課題の: BEAM-77

ランナーが独自のパッケージへと再編成されたため、com.google.cloud.dataflow.sdk.runners 内の多くが、org.apache.beam.runners.directorg.apache.beam.runners.dataflow のいずれかに移動されました。

Cloud Dataflow サービスでの実行に固有のパイプライン オプションが、com.google.cloud.dataflow.sdk.options から org.apache.beam.runners.dataflow.options へと移動されました。

Google Cloud Platform サービスへの大部分の I/O コネクタがサブパッケージへと移動されました。たとえば、BigQueryIO は com.google.cloud.dataflow.sdk.io から org.apache.beam.sdk.io.gcp.bigquery へと移動されました。

ほとんどの IDE で、新しい場所を特定できるようになります。あるファイルの新しい場所を確認するには、t を使用して GitHub のコードを検索します。Cloud Dataflow SDK 1.x for Java のリリースは、GoogleCloudPlatform/DataflowJavaSDK リポジトリ(master-1.x ブランチ)から構築されています。Cloud Dataflow SDK 2.x for Java のリリースは、apache/beam リポジトリのコードと対応します。

ランナー

ランナー名から Pipeline を削除

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1185

すべてのランナー名から Pipeline が削除され、名前が短くなりました。たとえば、DirectPipelineRunnerDirectRunner に変更され、DataflowPipelineRunnerDataflowRunner に変更されました。

Google Cloud Storage パスに --tempLocation の設定が必要

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-430

ユーザーが --stagingLocation または --tempLocation のうち 1 つだけを指定して、Cloud Dataflow にもう 1 つを推測させるのではなく、Cloud Dataflow サービスでは --gcpTempLocation を Google Cloud Storage パスに設定することが必要になりましたが、これはより一般的な --tempLocation から推測できます。オーバーライドされない限り、これは --stagingLocation にも使用されます。

InProcessPipelineRunner の削除

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-243

DirectRunner は継続してユーザーのローカルマシンで実行されますが、マルチスレッド実行、制限なし PCollection、投機的な出力と遅延出力のトリガーを新たにサポートします。文書化された Beam のモデルにより近くなっており、ユニットテストの失敗が(正常に)増加する場合があります。

この機能は DirectRunner が持つようになったため、InProcessPipelineRunner(Cloud Dataflow SDK 1.6+ for Java)は削除されました。

BlockingDataflowPipelineRunnerPipelineResult.waitToFinish() へ置き換え

影響を受けるユーザー: すべて | 影響: コンパイルのエラー

BlockingDataflowPipelineRunner が削除されました。パイプラインを実行して終了するまで待機するようにコード内でプログラムされている場合は、DataflowRunner を使用して pipeline.run().waitToFinish() を明示的に呼び出す必要があります。

コマンドラインで --runner BlockingDataflowPipelineRunner を使用して、パイプラインが終了するまでブロックするようメインプログラムにインタラクティブに指示している場合、これはメインプログラムの問題になります。waitToFinish() を呼び出すように指示する --blockOnRun などのオプションを指定する必要があります。

TemplatingDataflowPipelineRunner--templateLocation へ置き換え

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-551

TemplatingDataflowPipelineRunner(Cloud Dataflow SDK 1.9+ for Java)の機能が、--templateLocationDataflowRunner と使用するように置き換えられました。

ParDo と DoFn

DoFn でメソッドのオーバーライドではなくアノテーションを使用

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-37

柔軟性とカスタマイズ性を高めるために、ユーザーが特定のメソッドをオーバーライドする代わりに、DoFn でメソッドのアノテーションを使用して処理をカスタマイズできるようになりました。

次のコードサンプルで、新旧の DoFn の違いをご確認いただけます。以前の Cloud Dataflow SDK 1.x for Java では、コードは次のようなものでした。

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

新しい Apache Beam SDK 2.x for Java では、コードは次のようになります。

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

DoFnProcessContext#window() にアクセスしていた場合は、さらに変更があります。従来は次のようなものでした。

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

今は、次のようなコードか、

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

次のようなコードを記述します。

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

ランタイムが自動で DoFn にウィンドウを与えます。

複数のバンドルでの DoFn の再利用

影響を受けるユーザー: すべて | 影響: 通知なく予期しない結果が発生する可能性 | JIRA の問題: BEAM-38

パフォーマンス向上のため、複数の要素のバンドルを処理するために同じ DoFn を再利用できるようになりました。従来は、バンドルごとに新しいインスタンスを保証していました。バンドルの終了後にローカル状態(インスタンス変数など)を保持する DoFn は動作を変えることができます。これは、次のバンドルが新たなコピーではなく、その状態から開始されるためです。

ライフサイクルを管理するために、新たに @Setup メソッドと @Teardown メソッドが追加されました。全体のライフサイクルは次のとおりです(どの時点でもエラーによってライフサイクルが短縮される場合があります)。

  • @Setup: 再利用可能な接続の開始などの、インスタンスごとの DoFn の初期化。
  • 任意の数のシーケンス:
    • @StartBundle: DoFn の状態のリセットなどの、バンドルごとの初期化。
    • @ProcessElement: 通常の要素の処理。
    • @FinishBundle: 副次的影響の解決などの、バンドルごとの完了ステップ。
  • @Teardown: 再利用可能な接続の終了などの、DoFn が保持するリソースのインスタンスごとの破棄。

注: この変更による影響は、実際は限られたものになると想定されます。しかし、コンパイル時にエラーは発生しないため、通知なしで予期せぬ結果がもたらされる可能性があります。

副入力または副出力を指定する際のパラメータの順序が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1422

ParDo を適用するときは必ず最初に DoFn を指定する必要があります。つまり、

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

のようなコードでなく、次のようなコードを記述する必要があります。

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransform

.named() の削除

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-370

PTransform とサブクラスから .named() メソッドを削除しました。代わりに PCollection.apply(“name”, PTransform) を使用します。

PTransform.apply() の名前を PTransform.expand() へと変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-438

PCollection.apply() との混同を避けるため、PTransform.apply() の名前が PTransform.expand() へと変更されました。ユーザーが記述したすべての複合変換において、オーバーライドされた apply() メソッドの名前を expand() へと変更する必要があります。パイプラインの構築方法には変更はありません。

その他の重要な変更点

以下は、その他の重要な変更点と予定されている変更のリストです。

個々の API の変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-725

次の GcpOptions を削除しました: TokenServerUrlCredentialDirCredentialIdSecretsFileServiceAccountNameServiceAccountKeyFile

GoogleCredentials.fromStream(InputStream for credential) を使用します。ストリームには、Google Developers Console からの JSON 形式のサービス アカウント キーのファイルか、Cloud SDK でサポートされている形式を使用して保存されたユーザー認証情報を含めることができます。

--enableProfilingAgent--saveProfilesToGcs へと変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1122

--updateDataflowPipelineOptions へと移動

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-81

--update PipelineOptionDataflowPipelineDebugOptions から DataflowPipelineOptions へと移動

BoundedSource.producesSortedKeys() の削除

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1201

BoundedSource から producesSortedKeys() を削除

PubsubIO API の変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-974BEAM-1415

2.0.0-beta2 以降では、PubsubIO.ReadPubsubIO.Write は、PubsubIO.Read.topic(String) などの静的ファクトリ メソッドではなく、PubsubIO.<T>read()PubsubIO.<T>write() を使用してインスタンス化する必要があります。

PubsubIO を構成するメソッドの名前が変更されました。たとえば、PubsubIO.read().topic(String) の名前が PubsubIO.read().fromTopic() に変更されました。同様に、subscription()fromSubscription()timestampLabelwithTimestampAttributeidLabelwithIdAttribute、および PubsubIO.write().topic()PubsubIO.write().to() に、それぞれ名前変更されました。

PubsubIO では、Coder を指定することでメッセージ ペイロードが解析されるのではなく、文字列、Avro メッセージおよび Protobuf メッセージを読み書きするための PubsubIO.readStrings()PubsubIO.writeAvros() などの関数が公開されています。カスタム型を読み書きする場合は、PubsubIO.read/writeMessages()(および、メッセージ属性も含める場合は PubsubIO.readMessagesWithAttributes)を使用し、ParDo または MapElements を使用してカスタム型と PubsubMessage の間で変換を行ってください。

サポート対象外の v1beta2 API に対する DatastoreIO のサポートの終了

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-354

DatastoreIO は Cloud Datastore API v1 をベースとするようになりました。

変更: DisplayData.Builder

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-745

DisplayData.Builder.include(..) に、サブコンポーネントの表示データを登録するために必要なパスパラメータが新たに追加されました。Builder API が DisplayData.ItemSpec<> を返すようになりました。従来は DisplayData.Item を返していました。

FileBasedSink.getWriterResultCoder() が必要

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-887

FileBasedSink.getWriterResultCoder が、必須の抽象化メソッドに変更されました。

Filter.byPredicate() の名前を Filter.by() へと変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-342

IntraBundleParallelization の削除

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-414

RemoveDuplicates の名前を Distinct へと変更

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-239

異なる構文を使用するとともに文字列のみを操作するように TextIO を変更しました。

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1354

TextIO.Read.from()TextIO.read().from() に、TextIO.Write.to()TextIO.write().to() に変更されました。

TextIO.Read が常に PCollection<String> を返し、文字列の解析に .withCoder() を使用しないようになりました。代わりに、コレクションに ParDo または MapElements を適用して文字列を解析します。同様に、TextIO.Write が常に PCollection<String> を使用し、TextIO に他の何かを書き込み、ParDo または MapElements を使用して String に変換するようになりました。

異なる構文を使用するように AvroIO が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1402

Avro で生成された型を読み書きするための AvroIO.Read.from().withSchema(Foo.class)AvroIO.read(Foo.class).from() に変更)と AvroIO.Write(同様に変更)が変更されました。

指定したスキーマを使用して Avro 汎用レコードを読み書きするための AvroIO.Read.from().withSchema(Schema or String)AvroIO.readGenericRecords().from() に変更)と AvroIO.Write(同様に変更)が変更されました。

型パラメータを明示的に指定するとともに Kafka シリアライザ / デシリアライザを使用するように KafkaIO が変更されました。

影響を受けるユーザー: すべて| 影響: コンパイルのエラー| JIRA の問題: BEAM-1573BEAM-2221

KafkaIO では、KafkaIO.<Foo, Bar>read()KafkaIO.<Foo, Bar>write() のようにキーと値の型のパラメータを明示的に指定する必要があります。

キーと値のバイトを解釈するために、Coder を使用するのでなく、Kafka の標準の Serializer クラスおよび Deserializer クラスを使用してください。例: KafkaIO.read().withKeyCoder(StringUtf8Coder.of()) でなく KafkaIO.read().withKeyDeserializer(StringDeserializer.class) を使用してください。KafkaIO.write() についても同様です。

BigQueryIO の構文が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1427

BigQueryIO.Read.from()BigQueryIO.Write.to() でなく、BigQueryIO.read().from()BigQueryIO.write().to() を使用してください。

KinesisIO.Read の構文が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1428

KinesisIO.Read.from().using() でなく KinesisIO.read().from().withClientProvider() を使用してください。

TFRecordIO の構文が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1913

TFRecordIO.Read.from()TFRecordIO.Write.to() でなく、TFRecordIO.read().from()TFRecordIO.write().to() を使用してください。

XmlSourceXmlSinkXmlIO に統合されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1914

直接 XmlSourceXmlSink を使用するのでなく、XmlIO を使用してください。

例: Read.from(XmlSource.from()) でなく XmlIO.read().from() を、Write.to(XmlSink.writeOf()) でなく XmlIO.write().to() を使用してください。

CountingInputGenerateSequence に名前変更され、汎用化されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1414

CountingInput.unbounded() でなく GenerateSequence.from(0) を使用します。CountingInput.upTo(n) でなく GenerateSequence.from(0).to(n) を使用します。

変更された CountLatestSample

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1417BEAM-1421BEAM-1423

クラス Count.PerElementCount.PerKeyCount.Globally が非公開になったため、Count.perElement() などのファクトリ関数を使用する必要があります(以前は new Count.PerElement() を使用できました)。さらに、変換の結果に対してたとえば、.withHotKeyFanout() を使用する場合、.apply(Count.perElement()) の結果に対して直接実行することはできません。代わりに、Count で結合関数が Count.combineFn() として公開され、Combine.globally(Count.combineFn()) を自分自身で適用する必要があります。

同様の変更が Latest 変換と Sample 変換にも適用されます。

MapElements および FlatMapElements パラメータの順序が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1418

MapElementsFlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor) を使用する際に、まず記述子を指定することが必要になりました。例: FlatMapElements.into(descriptor).via(fn)

追加のパラメータを構成するときの Window が変更されました

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1425

Window を使用して WindowFn 自体でないもの(Window.into())を構成する場合は、Window.configure() を使用してください。例: Window.triggering(...) でなく Window.configure().triggering(...) を使用してください。

Write.Bound の名前が Write に変更されました。

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1416

クラス Write.Bound がシンプルに Write になりました。これは、Write.to(Sink) のアプリケーションを抽出して変数に格納する場合にのみ関係します。その変数の型は以前は Write.Bound<...> でしたが、Write<...> になりました。

名前が変更された Flatten 変換クラス

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1419

クラス Flatten.FlattenIterablesFlatten.FlattenPCollectionList の名前が、それぞれ Flatten.IterablesFlatten.PCollections に変更されました。

2 つのメソッドに分割された GroupByKey.create(boolean)

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1420

GroupByKey.create(boolean fewKeys) がシンプルに GroupByKey.create()GroupByKey.createWithFewKeys() になりました。

変更された SortValues

影響を受けるユーザー: すべて | 影響: コンパイルのエラー | JIRA の問題: BEAM-1426

BufferedExternalSorter.Options setter メソッドの名前が setSomeProperty から withSomeProperty に変更されました。

追加の Google API との依存関係

SDK バージョン 2.0.0 から、Cloud Resource Manager API を有効にすることも必要になりました。

依存関係のアップグレード

2.x リリースでは、Avro、Protobuf、gRPC など、多くの依存関係の固定されたバージョンがアップグレードされます。これらの依存関係の一部は、独自に大幅に変更された可能性があるため、コードが依存関係に直接依存している場合は、問題が発生する可能性があります。2.0.0 で使用されているバージョンは、pom.xml 内で、または mvn dependency:tree を使って確認できます。

内部のリファクタリング

SDK の内部構造が大幅に変更されました。公開 API(Internal パッケージや util パッケージで終わるクラスやメソッドなど)以外のものに依存していたユーザーは、大幅な変更に直面する可能性があります。

StateInternalsTimerInternals を使用していた場合: これらの内部 API は削除されました。DoFn 用に試験運用版の State API と Timer API を使用できるようになりました。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。