ストリーミング パイプラインをアップグレードする

このページでは、ストリーミング パイプラインのアップグレードに関するガイダンスと推奨事項について説明します。たとえば、Apache Beam SDK の新しいバージョンにアップグレードする必要がある場合や、パイプライン コードを更新したい場合があります。シナリオに応じてさまざまなオプションが用意されています。

ジョブが完了すると停止するバッチ パイプラインとは異なり、ストリーミング パイプラインは通常、中断のない処理を提供するために継続的に実行します。そのため、ストリーミング パイプラインをアップグレードする際は、次の点を考慮する必要があります。

  • パイプラインの中断を最小限に抑えるか、回避しなければならない場合があります。新しいバージョンのパイプラインがデプロイされている間、処理が一時的に中断することを許容できる場合があります。また、アプリケーションが中断を許容できない場合もあります。
  • パイプライン更新プロセスでは、メッセージ処理やその他の接続されたシステムへの中断を最小限に抑えるようにスキーマの変更を処理する必要があります。たとえば、イベント処理パイプラインでメッセージのスキーマが変更された場合、ダウンストリーム データシンクでもスキーマの変更が必要になることがあります。

パイプラインと更新要件に応じて、次のいずれかの方法でストリーミング パイプラインを更新します。

更新中に発生する可能性のある問題とその防止方法については、置換ジョブを検証するジョブの互換性チェックをご覧ください。

ベスト プラクティス

  • Apache Beam SDK バージョンは、パイプライン コードの変更とは別にアップグレードします。
  • 追加の更新を行う前に、変更のたびにパイプラインをテストします。
  • パイプラインで使用している Apache Beam SDK バージョンを定期的にアップグレードします。

処理中の更新を実行する

ジョブを停止せずに、進行中の一部のストリーミング パイプラインを更新できます。このシナリオは、処理中のジョブの更新と呼ばれます。処理中のジョブの更新は、次の場合にのみ使用できます。

  • ジョブで Streaming Engine を使用する必要がある。
  • ジョブが実行中の状態である必要がある。
  • ジョブで使用するワーカーの数のみを変更する。

詳細については、「水平自動スケーリング」ページの自動スケーリングの範囲を設定するをご覧ください。

処理中のジョブの更新を行う方法については、既存のパイプラインを更新するをご覧ください。

置換ジョブを起動する

更新されたジョブが既存のジョブと互換性がある場合は、update オプションを使用してパイプラインを更新できます。既存のジョブを置き換えると、更新されたパイプライン コードが新しいジョブによって実行されます。Dataflow サービスはジョブ名を保持しますが、更新されたジョブ ID で置換ジョブを実行します。このプロセスにより、既存のジョブが停止して互換性チェックが実行され、新しいジョブが開始される間にダウンタイムが発生する可能性があります。詳細については、ジョブの置換による影響をご覧ください。

Dataflow は互換性チェックを行い、更新されたパイプライン コードを実行中のパイプラインに安全にデプロイできるようにします。コードの変更によっては、副入力が既存のステップに対して追加または削除されたときなど、互換性チェックが失敗する場合があります。互換性チェックが失敗した場合は、インプレース ジョブ更新を実行できません。

置換ジョブの起動手順については、置換ジョブを起動するをご覧ください。

パイプラインの更新が現在のジョブと互換性がない場合は、パイプラインを停止して置き換える必要があります。パイプラインでダウンタイムを許容できない場合は、並列パイプラインを実行します。

パイプラインを停止して置き換える

処理を一時的に停止できる場合は、パイプラインをキャンセルまたはドレインしてから、更新されたパイプラインで置き換えます。パイプラインをキャンセルすると、Dataflow は処理を直ちに停止し、リソースをできるだけ早くシャットダウンします。これにより、処理中のデータが失われる可能性があります。データの損失を避けるため、通常はドレインが推奨されます。Dataflow スナップショットを使用してストリーミング パイプラインの状態を保存することもできます。これにより、状態を失うことなく Dataflow ジョブの新しいバージョンを開始できます。詳細については、Dataflow スナップショットを使用するをご覧ください。

パイプラインをドレインすると、処理中のウィンドウが直ちに終了し、すべてのトリガーが発生します。処理中のデータは失われませんが、ドレインによってウィンドウのデータが不完全になる可能性があります。この場合、処理中のウィンドウが結果の一部または不完全な結果を出力します。詳細については、ジョブのドレインによる影響をご覧ください。既存のジョブが完了したら、更新されたパイプライン コードを含む新しいストリーミング ジョブを起動し、処理を再開します。

この方法では、既存のストリーミング ジョブが停止してから、置換パイプラインがデータの処理を再開できるようになるまでの間にダウンタイムが発生します。ただし、既存のパイプラインをキャンセルまたはドレインして、更新されたパイプラインで新しいジョブを開始するほうが、並列パイプラインよりも複雑ではありません。

詳細な手順については、Dataflow ジョブをドレインするをご覧ください。現在のジョブをドレインしたら、同じジョブ名で新しいジョブを開始します。

Pub/Sub スナップショットとシークを使用したメッセージの再処理

場合によっては、ドレインされたパイプラインを置換またはキャンセルした後、以前に配信された Pub/Sub メッセージを再処理する必要があります。たとえば、更新されたビジネス ロジックを使用してデータを再処理する必要が生じることがあります。Pub/Sub シークは、Pub/Sub スナップショットからメッセージを再生できる機能です。Dataflow で Pub/Sub シークを使用すると、サブスクリプション スナップショットの作成時点からメッセージを再処理できます。

開発とテストでは、Pub/Sub シークを使用して既知のメッセージを繰り返し再生し、パイプラインからの出力を確認できます。Pub/Sub シークを使用する場合は、サブスクリプションがパイプラインによって消費されるときは、サブスクリプション スナップショットをシークしないでください。シークすると、Dataflow のウォーターマーク ロジックを無効にし、Pub/Sub メッセージの 1 回限りの処理に影響を与える場合があります。

ターミナル ウィンドウで Pub/Sub シークと Dataflow パイプラインを使用する場合に推奨される gcloud CLI ワークフローは次のとおりです。

  1. サブスクリプションのスナップショットを作成するには、gcloud pubsub snapshots create コマンドを使用します。

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. パイプラインをドレインまたはキャンセルするには、gcloud dataflow jobs drain コマンドまたは gcloud dataflow jobs cancel コマンドを使用します。

    gcloud dataflow jobs drain JOB_ID
    

    または

    gcloud dataflow jobs cancel JOB_ID
    
  3. スナップショットをシークするには、gcloud pubsub subscriptions seek コマンドを使用します。

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. サブスクリプションを使用する新しいパイプラインをデプロイします。

並列パイプラインを実行する

更新中にストリーミング パイプラインの中断を回避する必要がある場合は、並列パイプラインを実行します。パイプライン コードを更新した新しいストリーミング ジョブを作成し、既存のパイプラインと並行して新しいパイプラインを実行します。

新しいパイプラインを作成するときは、既存のパイプラインに使用したものと同じウィンドウ戦略を使用します。更新されたパイプラインによって処理された最も古い完了ウィンドウのタイムスタンプをウォーターマークが超えるまで、既存のパイプラインを実行し続けます。次に、既存のパイプラインをドレインまたはキャンセルします。更新されたパイプラインは引き続き実行され、処理を引き継ぎます。

次の図に、このプロセスを示します。

Pipeline A が Pipeline B の 5 分間のウィンドウと重なる。

この図において、Pipeline B は、Pipeline A を引き継ぐ、更新されたジョブです。値 t は、Pipeline B で処理される最も古い完了ウィンドウのタイムスタンプです。値 wPipeline A のウォーターマークです。簡単にするため、完全なウォーターマークには遅延データがないと仮定します。処理時間と経過時間は横軸に示されます。どちらのパイプラインも 5 分間の固定(タンブリング)ウィンドウを使用します。ウォーターマークが各ウィンドウの終わりを通過した後、結果がトリガーされます。

同時出力は、2 つのパイプラインがオーバーラップする期間に行われます。そのため、異なる宛先に結果を書き込むように 2 つのパイプラインを構成します。ダウンストリームのシステムでは、データベース ビューなどの 2 つの宛先シンクを抽象化し、結合された結果をクエリできます。これらのシステムでは、抽象化を使用して、オーバーラップする期間から重複を排除することもできます。

次の例は、Pub/Sub から入力データを読み取り、処理を行い、結果を BigQuery に書き込むパイプラインを使用する方法を示しています。

  1. 最初の状態で、既存のストリーミング パイプライン(Pipeline A)が実行され、サブスクリプション(Subscription A)を使用して Pub/Sub トピック(Topic)からメッセージを読み取ります。結果は BigQuery テーブル(Table A)に書き込まれます。結果は BigQuery ビューを介して使用されます。BigQuery ビューは、基になるテーブルの変更をマスクするためのファサードとして機能します。これは、ファサード パターンと呼ばれる設計方法の一つです。次の図は、初期状態を示しています。

    1 つのサブスクリプションを持つ 1 つのパイプラインと、単一の BigQuery テーブルへの書き込み。

  2. 更新されたパイプライン用に新しいサブスクリプション(Subscription B)を作成します。Subscription B を使用して Pub/Sub トピック(Topic)から読み取り、別の BigQuery テーブル(Table B)に書き込む新しいパイプライン(Pipeline B)をデプロイします。次の図は、この流れを表しています。

    2 つのパイプライン(それぞれ 1 つのサブスクリプションを使用)。各パイプラインは個別の BigQuery テーブルに書き込まれます。ファサード ビューを両方のテーブルから読み取ります。

    この時点では、Pipeline APipeline B は並列で実行されており、結果は別々のテーブルに書き込まれます。時間 t は、Pipeline B で処理される最も古い完了ウィンドウのタイムスタンプとして記録します。

  3. Pipeline A のウォーターマークが時間 t を超えたら、Pipeline A をドレインします。パイプラインをドレインすると、開いているウィンドウが終了し、処理中のデータの処理が完了します。パイプラインにウィンドウが含まれていて完全なウィンドウが重要である場合(遅延データがないと仮定)は、Pipeline A をドレインする前に、両方のパイプラインを実行して完全に重複するウィンドウを生成します。処理中のデータがすべて処理されて Table A に書き込まれた後、Pipeline A のストリーミング ジョブを停止します。次の図は、この段階を示しています。

    Pipeline A がドレインされて Subscription A を読み取れなくなり、ドレインの完了後は Table A にデータが送信されなくなります。すべての処理は 2 番目のパイプラインによって行われます。

  4. この時点では、Pipeline B のみが実行されています。BigQuery のビュー(ファサード ビュー)からクエリを実行でき、これは、Table ATable B のファサードとして機能します。両方のテーブルに同じタイムスタンプの行が存在する場合は、Table B から行を返すようにビューを構成します。Table B に行が存在しない場合は、Table A にフォールバックするようにします。次の図は、Table ATable B の両方から読み込むビュー(ファサード ビュー)を示しています。

    Pipeline A が削除され、Pipeline B のみが実行されている。

    この時点で、Subscription A を削除できます。

新しいパイプライン デプロイで問題が検出された場合、並列パイプラインを使用するとロールバックが簡単になります。この例では、Pipeline A を実行した状態で、Pipeline B をモニタリングし、正しく動作しているかどうか確認することもできます。Pipeline B に問題がある場合は、Pipeline A にロールバックできます。

制限事項

この方法には、次の制限事項があります。

  • 同じ入力に対して 2 つのパイプラインを実行すると、出力で重複データが生成される可能性が高くなります。ダウンストリームのシステムが重複データを認識し、それに対処できる必要があります。
  • Pub/Sub ソースから読み取る際、同じサブスクリプションを複数のパイプラインで使用することは推奨されません。正確性の問題が発生する可能性があります。ただし、抽出・変換・読み込み(ETL)パイプラインなどの一部のユースケースでは、2 つのパイプラインで同じサブスクリプションを使用することで重複が減る場合があります。このシナリオでは自動スケーリングに関する問題が発生する可能性が高くなりますが、実行中のジョブの更新機能を使用することで軽減できます。詳細については、Pub/Sub ストリーミング パイプラインにおける自動スケーリングの微調整に関するブログ投稿をご覧ください。
  • Pub/Sub ソースから読み取る場合、2 つ目のサブスクリプションを使用すると重複が発生しますが、データの正確性と自動スケーリングに問題は発生しません。

スキーマ ミューテーションを処理する

データ処理システムでは、ビジネス要件や技術上の理由から、時間の経過とともにスキーマ ミューテーションへの対応が必要になります。ビジネス情報システムの中断を避けるため、スキーマの更新を適用する場合は慎重に計画し、実施する必要があります。

Pub/Sub トピックから JSON ペイロードを含むメッセージを読み取るパイプラインについて考えてみましょう。パイプラインは、各メッセージを TableRow インスタンスに変換し、その行を BigQuery テーブルに書き込みます。出力テーブルのスキーマは、パイプラインによって処理されるメッセージと似ています。次の図では、スキーマを Schema A として示しています。

Schema A を使用してサブスクリプションを読み取り、BigQuery 出力テーブルに書き込むパイプライン。

時間が経つと、メッセージ スキーマが複雑な形で変更される可能性があります。たとえば、フィールドが追加、削除、置換される場合です。Schema A は新しいスキーマに進化します。以降、その新しいスキーマを Schema B と呼びます。この場合、Pipeline A は更新され、出力テーブルのスキーマは Schema B をサポートしている必要があります。

出力テーブルでは、ダウンタイムなしでスキーマ ミューテーションを実行できます。たとえば、新しいフィールドを追加したり、列モードを緩和できます(REQUIREDNULLABLE に変更するなど)。通常、これらのミューテーションは既存のクエリに影響しません。ただし、既存のスキーマ フィールドを変更または削除するスキーマ ミューテーションは、クエリの中断や、他の中断をもたらします。次の方法では、ダウンタイムなしで変更に対応できます。

パイプラインによって書き込まれるデータをプリンシパル テーブルと 1 つ以上のステージング テーブルに分割します。プリンシパル テーブルには、パイプラインによって書き込まれた履歴データが保存されます。ステージング テーブルには、最新のパイプライン出力が保存されます。プリンシパル テーブルとステージング テーブルに対する BigQuery ファサード ビューを定義すると、履歴データと最新のデータの両方をクエリできます。

次の図は、前のパイプライン フローをステージング テーブル(Staging Table A)、プリンシパル テーブル、ファサード ビューを含むように修正したものです。

サブスクリプションを読み取り、BigQuery ステージング テーブルに書き込むパイプライン。2 番目のテーブル(プリンシパル)には、以前のバージョンのスキーマの出力があります。ファサード ビューは、ステージング テーブルとプリンシパル テーブルの両方から読み取ります。

修正後のフローでは、Pipeline ASchema A を使用するメッセージを処理し、その出力は互換性のあるスキーマを持つ Staging Table A に書き込まれます。プリンシパル テーブルは、前バージョンのパイプラインによって書き込まれた履歴データと、ステージング テーブルから定期的にマージされる履歴データから構成されます。コンシューマは、ファサード ビューを使用して、履歴データとリアルタイム データの両方を含む最新のデータに対してクエリを実行できます。

メッセージ スキーマが Schema A から Schema B に更新されたら、Schema B を使用するメッセージに対応するようにパイプライン コードを更新します。既存のパイプラインを新しい実装で更新する必要があります。並列パイプラインを実行することで、ストリーミング データ処理を中断することなく継続できます。パイプラインが長時間実行されていないため、パイプラインの終了と置換により処理が中断されます。

更新されたパイプラインは、Schema B を使用する追加のステージング テーブル(Staging Table B)に書き込みます。オーケストレートされたワークフローを使用して、パイプラインを更新する前に新しいステージング テーブルを作成できます。ファサード ビューを更新して、新しいステージング テーブルの結果が含まれるようにします(場合によっては、関連するワークフロー ステップを使用します)。

次の図は、Schema B を含む Staging Table B と、プリンシパル テーブルと両方のステージング テーブルが含まれるようにファサード ビューを更新するフローを示しています。

パイプラインは Schema B を使用して、Staging Table B に書き込みます。ファセット ビューは、プリンシパル テーブル、Staging Table A、Staging Table B から読み取ります。

パイプラインの更新とは別のプロセスとして、ステージング テーブルを定期的または必要に応じてプリンシパル テーブルにマージできます。次の図は、Staging Table A がプリンシパル テーブルにマージされる仕組みを示しています。

Staging Table A はプリンシパル テーブルに統合されます。ファサード ビューは、Staging Table B とプリンシパル テーブルから読み取ります。

次のステップ