Apache Beam SDK には、Dataflow マネージド サービス上で進行中のストリーミング ジョブを更新して新しいパイプライン コードを実行するための機能があります。
既存の Dataflow ジョブを更新する理由はさまざまです。
- パイプライン コードを補正または改善する。
- パイプライン コードのバグを修正する。
- パイプラインを更新して、データ形式の変更を処理する、またはデータソースのバージョンやその他の変更を考慮する。
ジョブを更新する場合、Dataflow サービスは現在実行中のジョブと潜在的な置換ジョブの間で互換性チェックを実行します。互換性チェックにより、中間状態情報やバッファデータなどが前のジョブから置換ジョブに確実に転送されます。
更新プロセスとその影響
Dataflow サービスでジョブを更新すると、更新されたパイプライン コードを実行する新しいジョブで既存のジョブが置換されます。Dataflow サービスはジョブ名を保持しますが、更新されたジョブ ID で置換ジョブを実行します。
置換ジョブは、前のジョブの中間状態データと、前のジョブの現在「処理中」のバッファデータ レコードまたはメタデータを保存します。たとえば、ウィンドウの解決を待機している間に、パイプライン内の一部のレコードがバッファされることがあります。
処理中のデータ
「処理中」のデータは、新しいパイプラインの変換によって引き続き処理されます。ただし、置換パイプライン コードに追加する変換は、レコードがバッファされている場所によっては、反映される場合と反映されない場合があります。たとえば、既存のパイプラインに次の変換があるとします。
Java: SDK 2.x
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Java: SDK 1.x
次のように、ジョブを新しいパイプライン コードで置換できます。
Java: SDK 2.x
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Java: SDK 1.x
文字「A」で始まる文字列を除外する変換を追加しても、次の変換(FormatStrings
)は、前のジョブから転送された、「A」で始まるバッファ内の文字列または処理中の文字列を依然として参照します。
ウィンドウ処理の変更
ウィンドウ処理とトリガーの戦略は置換パイプライン内の PCollection
で変更できますが、その場合は慎重に変更してください。ウィンドウ処理やトリガーの戦略を変更しても、すでにバッファ内にあるデータや、それ以外の処理中のデータには影響しません。
パイプラインのウィンドウ処理に対する変更は、固定時間ウィンドウやスライド時間ウィンドウの長さの変更など、小規模にとどめることをおすすめします。ウィンドウ処理アルゴリズムの変更など、ウィンドウ処理やトリガーに大きな変更を加えると、パイプライン出力に予期しない結果が生じることがあります。
置換ジョブの起動
ジョブを更新するには、新しいジョブを起動して、実行中のジョブを置換する必要があります。置換ジョブを起動するときは、ジョブの通常のオプションに加えて、次のパイプライン オプションを設定して更新プロセスを実行する必要があります。
Java
--update
オプションを渡します。--jobName
オプション(PipelineOptions 内)を、更新するジョブと同じ名前に設定します。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transformNameMapping
オプションで渡す必要があります。
Python
--update
オプションを渡します。--job_name
オプション(PipelineOptions 内)を、更新するジョブと同じ名前に設定します。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform_name_mapping
オプションで渡す必要があります。
置換ジョブ名の指定
Java
置換ジョブを起動するときに --jobName
オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。
Python
置換ジョブを起動するときに --job_name
オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。
正しいジョブ名の値を見つけるには、Dataflow Monitoring Interface で前のジョブを選択し、[ジョブ情報] サイドパネルの [ジョブ名] フィールドを調べます。

または、Dataflow コマンドライン インターフェースを使用して、既存のジョブのリストを取得できます。gcloud dataflow jobs list
コマンドをシェルまたはターミナル ウィンドウで入力して、Google Cloud Platform プロジェクト内の Dataflow ジョブのリストを取得し、置き換えるジョブの NAME
フィールドを探します。
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
変換マッピングの作成
Java
置換パイプラインで変換名が変更されて前のパイプラインでの名前とは異なるものになった場合は、Dataflow サービスに変換マッピングを渡す必要があります。変換マッピングは、前のパイプライン コード内の名前付き変換を置換パイプライン コード内の名前にマッピングします。マッピングは、次の形式で --transformNameMapping
コマンドライン オプションを使用して渡すことができます。
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--transformNameMapping
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
Python
置換パイプラインで変換名が変更されて前のパイプラインでの名前とは異なるものになった場合は、Dataflow サービスに変換マッピングを渡す必要があります。変換マッピングは、前のパイプライン コード内の名前付き変換を置換パイプライン コード内の名前にマッピングします。マッピングは、次の形式で --transform_name_mapping
コマンドライン オプションを使用して渡すことができます。
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--transform_name_mapping
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
変換名の決定
マップ内の各インスタンスの変換名は、パイプラインの変換を適用したときに指定した名前です。例:
Java: SDK 2.x
.apply("FormatResults", ParDo .of(new DoFn<KV<String, Long>>, String>() { ... } }))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Java: SDK 1.x
前のジョブの変換名は、Dataflow Monitoring Interface でジョブの実行グラフを調べることでも取得できます。

複合変換の命名
変換名は階層化されており、これはパイプライン内の変換階層に基づいています。パイプラインに複合変換がある場合、ネストしている変換にはその中にある変換の名前が付けられます。たとえば、パイプラインに CountWidgets
という名前の複合変換があり、この内側に Parse
という変換があるとします。内側の変換の完全名は CountWidgets/Parse
となり、変換マッピングではその完全名を指定する必要があります。
新しいパイプラインで複合変換が別の名前にマッピングされる場合、ネストしている変換もすべて自動的に名前が変更されます。変換マッピングで、内側の変換の変更後の名前を指定する必要があります。
変換階層のリファクタリング
複合変換をリファクタリングした、またはパイプラインが変更したライブラリからの複合変換に依存しているなどのために、置換パイプラインで前のパイプラインとは異なる変換階層を使用する場合、マッピングを明示的に宣言する必要があります。
たとえば、前のパイプラインが複合変換 CountWidgets
を適用し、そこに Parse
という名前の内部変換が含まれると仮定します。ここで、置換パイプラインが CountWidgets
をリファクタリングし、Parse
を Scan
という名前の別の変換の内部にネストするとします。更新を成功させるには、前のパイプラインの完全な変換名(CountWidgets/Parse
)を新しいパイプラインの変換名(CountWidgets/Scan/Parse
)に明示的にマッピングする必要があります。
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
--transform_name_mapping={"CountWidgets/Parse":""}
ジョブの互換性チェック
置換ジョブを開始する場合、Dataflow サービスは置換ジョブと前のジョブの間で互換性チェックを実行します。互換性チェックに合格した場合、前のジョブは停止します。その後、同じジョブ名を維持したまま、置換ジョブが Dataflow サービス上で起動されます。互換性チェックに不合格の場合は、前のジョブが引き続き Dataflow サービス上で実行され、置換ジョブからはエラーが返されます。
Java
Python
互換性チェックでは、提供する変換マッピングの指定に従って、Dataflow サービスが中間状態データを前のジョブのステップから置換ジョブに転送できることが確認されます。互換性チェックでは、パイプラインの PCollection
で使用されているコーダーが同一であることも確認されます。Coder
を変更すると、処理中のデータまたはバッファ内のレコードが置換パイプラインで正しくシリアル化されない可能性があるため、互換性チェックが不合格になることがあります。
互換性違反の防止
前のパイプラインと置換パイプラインの間に一定の差異があると、互換性チェックが不合格になることがあります。この差異には次のようなものがあります。
- マッピングの提供なしでのパイプライン グラフの変更。ジョブを更新した場合、Dataflow サービスは、各ステップの中間状態データを転送するために前のジョブの変換を置換ジョブの変換に一致させようとします。ステップを名前変更または削除した場合は、Dataflow がそれに合わせて状態データを一致させることができるように、変換マッピングを提供する必要があります。
- ステップの副入力の変更。置換パイプライン内で変換の副入力を追加または削除すると、互換性チェックが不合格になります。
- ステップのコーダーの変更。ジョブを更新するとき、Dataflow サービスはその時点でバッファ内にあるデータレコード(たとえば、ウィンドウ処理の解決を待っているもの)を保存し、置換ジョブで処理します。置換ジョブが異なるまたは互換性のないデータ エンコードを使用する場合、Dataflow サービスはこれらのレコードをシリアル化または逆シリアル化できません。
パイプラインからステートフル オペレーションを削除した。特定のステートフル オペレーションをパイプラインから削除した場合に、置換ジョブが Dataflow による互換性チェックに不合格となることがあります。Dataflow サービスでは、効率化のために複数のステップを融合できます。融合されたステップ内から状態依存のオペレーションを削除した場合、チェックが不合格になります。ステートフル オペレーションには次のようなものがあります。
- 副入力を生成または消費する変換
- I/O 読み取り
- キー付きの状態を使用する変換
- ウィンドウ マージのある変換
異なる地理的ゾーンで置換ジョブを実行しようとしている。置換ジョブは、前のジョブと同じゾーンで実行する必要があります。