既存のパイプラインの更新

Apache Beam SDK には、Cloud Dataflow マネージド サービス上で進行中のストリーミング ジョブを更新して新しいパイプライン コードを実行するための機能があります。

既存の Cloud Dataflow ジョブの更新が必要になる理由はさまざまであり、次のようなものが考えられます。

  • パイプライン コードを補正または改善する。
  • パイプライン コードのバグを修正する。
  • パイプラインを更新して、データ形式の変更を処理する、またはデータソースのバージョンやその他の変更を考慮する。

ジョブを更新するときは、Cloud Dataflow サービスによる互換性チェックが、現在実行中のジョブと置換候補のジョブとの間で行われます。互換性チェックにより、中間状態情報やバッファデータなどが前のジョブから置換ジョブに確実に転送されます。

更新プロセスとその影響

Cloud Dataflow サービスでジョブを更新すると、既存のジョブは、更新されたパイプライン コードを実行する新しいジョブに置換されます。Cloud Dataflow サービスはジョブ名を保持しますが、更新された jobId で置換ジョブを実行します。

置換ジョブは、前のジョブの中間状態データと、前のジョブの現在「処理中」のバッファデータ レコードまたはメタデータを保存します。たとえば、ウィンドウの解決を待機している間に、パイプライン内の一部のレコードがバッファされることがあります。

処理中のデータ

「処理中」のデータは、新しいパイプラインの変換によって引き続き処理されます。ただし、置換パイプライン コードに追加する変換は、レコードがバッファされている場所によっては、反映される場合と反映されない場合があります。たとえば、既存のパイプラインに次の変換があるとします。

Java: SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Java: SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(FormatStrings().named("Format"));

次のように、ジョブを新しいパイプライン コードで置換できます。

Java: SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Java: SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(RemoveStringsStartingWithA().named("Remove"))
   .apply(FormatStrings().named("Format"));

文字「A」で始まる文字列を除外する変換を追加しても、次の変換(FormatStrings)は、前のジョブから転送された、「A」で始まるバッファ内の文字列または処理中の文字列を依然として参照します。

ウィンドウ処理の変更

ウィンドウ処理トリガーの戦略は置換パイプライン内の PCollection で変更できますが、その場合は慎重に変更してください。ウィンドウ処理やトリガーの戦略を変更しても、すでにバッファ内にあるデータや、それ以外の処理中のデータには影響しません。

パイプラインのウィンドウ処理に対する変更は、固定時間ウィンドウやスライド時間ウィンドウの長さの変更など、小規模にとどめることをおすすめします。ウィンドウ処理アルゴリズムの変更など、ウィンドウ処理やトリガーに大きな変更を加えると、パイプライン出力に予期しない結果が生じることがあります。

置換ジョブの起動

ジョブを更新するには、新しいジョブを起動して、実行中のジョブを置換する必要があります。置換ジョブを起動するときは、ジョブの通常のオプションに加えて、次のパイプライン オプションを設定して更新プロセスを実行する必要があります。

  • --update オプションを渡します。
  • --jobName オプション(PipelineOptions 内)を、更新するジョブと同じ名前に設定します。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transformNameMapping オプションで渡す必要があります。

jobName を指定する

置換ジョブを起動するときに --jobName オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。正しいジョブ名の値を見つけるには、Cloud Dataflow Monitoring Interface で前のジョブを選択し、[概要] タブの [ジョブ名] フィールドを調べます。

図 1: 実行中の Cloud Dataflow ジョブの [概要] タブ。[ジョブ名] フィールドが強調されています。

あるいは、Cloud Dataflow コマンドライン インターフェースを使用して、既存のジョブのリストをクエリすることもできます。コマンド gcloud alpha dataflow jobs list をシェルまたはターミナル ウィンドウで入力して、Google Cloud Platform プロジェクト内の Cloud Dataflow ジョブのリストを取得し、置き換えるジョブの NAME フィールドを探します。

ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
2015-07-28_17_02_27-7257409117866690674   windowedwordcount-johndoe-0729000214 Streaming  2015-07-28 17:02:28  Running

変換マッピングの作成

置換パイプラインで変換名が変更されて前のパイプラインでの名前とは異なるものになった場合は、Cloud Dataflow サービスに変換マッピングを渡す必要があります。変換マッピングは、前のパイプライン コード内の名前付き変換を置換パイプライン コード内の名前にマッピングします。マッピングを渡すには、次の一般的なフォーマットで --transformNameMapping コマンドライン オプションを使用します。

  --transformNameMapping={"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

--transformNameMapping でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。

変換名の決定

マップ内の各インスタンスの変換名は、パイプラインの変換を適用したときに指定した名前です。次に例を示します。

Java: SDK 2.x

.apply("FormatResults", ParDo
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Java: SDK 1.x

.apply(ParDo
  .named("FormatResults")
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

前のジョブの変換名を特定するには、Cloud Dataflow Monitoring Interface でジョブの実行グラフを調べるという方法もあります。

図 2: Cloud Dataflow Monitoring Interface に表示される WordCount パイプラインの実行グラフ。

複合変換の命名

変換名は階層化されており、これはパイプライン内の変換階層に基づいています。パイプラインに複合変換がある場合、ネストしている変換にはその中にある変換の名前が付けられます。たとえば、パイプラインに CountWidgets という名前の複合変換があり、この内側に Parse という変換があるとします。内側の変換の完全名は CountWidgets/Parse となり、変換マッピングではその完全名を指定する必要があります。

新しいパイプラインで複合変換が別の名前にマッピングされる場合、ネストしている変換もすべて自動的に名前が変更されます。変換マッピングで、内側の変換の変更後の名前を指定する必要があります。

変換階層のリファクタリング

複合変換をリファクタリングした、またはパイプラインが変更したライブラリからの複合変換に依存しているなどのために、置換パイプラインで前のパイプラインとは異なる変換階層を使用する場合、マッピングを明示的に宣言する必要があります。

たとえば、前のパイプラインが複合変換 CountWidgets を適用し、そこに Parse という名前の内部変換が含まれると仮定します。ここで、置換パイプラインが CountWidgets をリファクタリングし、ParseScan という名前の別の変換の内部にネストするとします。更新を成功させるには、前のパイプラインの完全な変換名(CountWidgets/Parse)を新しいパイプラインの変換名(CountWidgets/Scan/Parse)に明示的にマッピングする必要があります。

  --transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse 変換を完全に削除すると仮定します。

  --transformNameMapping={"CountWidgets/Parse":""}

ジョブの互換性チェック

置換ジョブを起動するとき、Cloud Dataflow サービスは置換ジョブと前のジョブの間で互換性チェックを行います。互換性チェックに合格した場合、前のジョブは停止します。その後、同じジョブ名を維持したまま、置換ジョブが Cloud Dataflow サービス上で起動されます。互換性チェックに不合格の場合は、前のジョブが引き続き Cloud Dataflow サービス上で実行され、置換ジョブからはエラーが返されます。

この互換性チェックでは、提供した変換マッピングで指定したとおりに、Cloud Dataflow サービスが前のジョブのステップから置換ジョブに中間状態データを転送できることを確認します。互換性チェックでは、パイプラインの PCollection で使用されているコーダーが同一であることも確認されます。Coder を変更すると、処理中のデータまたはバッファ内のレコードが置換パイプラインで正しくシリアル化されない可能性があるため、互換性チェックが不合格になることがあります。

互換性違反の防止

前のパイプラインと置換パイプラインの間に一定の差異があると、互換性チェックが不合格になることがあります。この差異には次のようなものがあります。

  • マッピングの提供なしでのパイプライン グラフの変更。 ジョブを更新するとき、Cloud Dataflow サービスは、各ステップの中間状態データを転送するために、前のジョブ内の変換と置換ジョブ内の変換を照合しようとします。ステップの名前を変更した場合やステップを削除した場合は、変換マッピングを渡す必要があります。Cloud Dataflow は、これに基づいて状態データを対応付けます。
  • ステップの副入力の変更。 置換パイプライン内で変換の副入力を追加または削除すると、互換性チェックが不合格になります。
  • ステップのコーダーの変更。 ジョブを更新するとき、Cloud Dataflow サービスはその時点でバッファ内にあるデータレコード(たとえば、ウィンドウ処理の解決を待っているもの)を保存し、置換ジョブで処理します。置換ジョブで使用されるデータ エンコードが異なっている場合や互換性がない場合、Cloud Dataflow はレコードのシリアル化や逆シリアル化を行えません。
  • パイプラインから「ステートフル」オペレーションを削除した。 特定のステートフル オペレーションをパイプラインから削除した場合に、置換ジョブが Cloud Dataflow による互換性チェックに不合格となることがあります。Cloud Dataflow サービスでは、効率化のために複数のステップを融合できます。融合されたステップ内から状態依存のオペレーションを削除した場合、チェックが不合格になります。ステートフル オペレーションには次のようなものがあります。

    • 副入力を生成または消費する変換
    • I/O 読み取り
    • キー付きの状態を使用する変換
    • ウィンドウ マージのある変換
  • 異なる地理的ゾーンで置換ジョブを実行しようとしている。置換ジョブは、前のジョブと同じゾーンで実行する必要があります。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。