Dataflow パイプライン ワークフローの設計

パイプライン開発には、コードの開発、テスト、本番環境への配信など、さまざまな段階と作業を伴います。このドキュメントでは、次の点について説明します。

  • 異なる環境への自動的なビルド、テスト、パイプラインのデプロイをサポートする継続的インテグレーションと継続的デリバリー(CI / CD)に関する検討事項。
  • 本番環境でのパフォーマンスとリソース使用率を最適化する Dataflow の機能。
  • 本番環境でストリーミング パイプラインを更新するためのアプローチとウォッチポイント。
  • 本番環境でのパイプラインの信頼性を向上させるベスト プラクティス。

継続的インテグレーション

継続的インテグレーション(CI)では、コードを共有リポジトリに頻繁にマージする必要があります。こうすることより、たびたび更新されるウェブサイトなど、変更が多いアプリケーションに便利です。通常、データ パイプラインは他の種類のアプリケーションほど変更されることはありませんが、CI を実践することは、パイプライン開発に多くのメリットをもたらします。たとえば、テストの自動化により、欠陥が見つかった際、迅速なフィードバックが提供され、修正ミスによる不具合がコードベースに入る可能性を減らします。

テストの自動化は CI の重要な要素です。適切なテスト カバレッジと組み合わせると、テストの自動化によって、コード commit ごとにテストスイートが実行されます。CI サーバーを使用すると、Maven などのビルド自動化ツールと連携して、CI パイプラインの 1 つ以上のステップとしてテストスイートを実行できます。単体テストと統合テストに合格したコードを、パイプラインの起動元となるデプロイ アーティファクトに正常にパッケージ化できます。このビルドを「合格したビルド」といいます。

ビルドに合格することで作成されるデプロイ アーティファクトの数と種類は、パイプラインのリリース方法によって異なります。Apache Beam Java SDK を使用すると、パイプラインのコードを自己実行型 JAR ファイルにパッケージ化できます。その後、本番環境や本番環境用の Google Cloud プロジェクトなど、デプロイ環境用にプロジェクトでホストされているバケットに JAR ファイルを保存できます。クラシック テンプレート(テンプレート化された実行の一種)、デプロイ アーティファクトには JSON テンプレート ファイル、パイプラインの JAR ファイル、オプションのメタデータ テンプレートが含まれます。次のセクションで説明するように、継続的デリバリーを使用して、異なるデプロイ環境にアーティファクトをデプロイできます。

継続的デリバリーとデプロイ

継続的デリバリー(CD)は、手動で起動可能なデプロイ環境にデプロイ アーティファクトをコピーします。通常、CI サーバーでビルドされたアーティファクトは、エンドツーエンド テストを実行するために 1 つ以上の本番前環境にデプロイされます。すべてのテストが正常に完了すると、本番環境が更新されます。

バッチ パイプラインでは、継続的デプロイにより、新しい Dataflow ジョブとしてパイプラインを直接起動できます。他のシステムでは、必要に応じてアーティファクトを使用して、バッチジョブを起動できます。たとえば、Cloud Composer を使用してワークフロー内のバッチジョブを実行できます。また、Cloud Scheduler を使用してバッチジョブをスケジューリングできます。

ストリーミング パイプラインは、バッチ パイプラインよりもデプロイが複雑なため、継続的デプロイを使用した自動化はさらに難しくなる場合があります。たとえば、既存のストリーミング パイプラインを置換または更新する方法を考える必要が生じる場合があります。パイプラインを更新できない場合や、パイプラインを更新しない場合は、複数の Dataflow ジョブの調整などの方法を使用して、ビジネスの中断を最小限にすることが可能です。

CI / CD に必要な ID とロール

CI / CD パイプラインは、さまざまなシステムと連携し、パイプラインの構築、テスト、デプロイを行います。たとえば、パイプラインはソースコード リポジトリにアクセスする必要があります。これらのインタラクションを有効にするには、パイプラインに適切な ID とロールがあることを確認してください。次のパイプライン アクティビティでは、パイプラインに特定の ID とロールを割り当てる必要があります。

Google Cloud などの外部サービスとの統合テスト

アドホック テストやシステム統合テストの実行に Direct Runner を使用する場合、パイプラインでは Google Cloud のデータソースとシンクを使用する Google Cloud CLI の認証情報か、GOOGLE_APPLICATION_CREDENTIALS 環境変数により提供される認証情報を使用します。パイプラインからアクセスされる Google Cloud リソースの認証情報を取得するために使用されるサービス アカウントに、十分なロールと権限があることを確認します。

アーティファクトを別のデプロイ環境にデプロイする

可能であれば、各環境に固有の認証情報(各プロジェクトに有効)を使用し、それに応じてリソースへのアクセスを制限します。

プロジェクトごとに一意のサービス アカウントを使用し、ストレージ バケットに対してデプロイ アーティファクトを読み書きします。パイプラインがテンプレートを使用するかどうかによって、デプロイ プロセスで複数のアーティファクトのステージングが必要になる場合があります。たとえば、Dataflow テンプレートの作成とステージングでは、パイプラインのテンプレート ファイルなどパイプラインの起動に必要なデプロイ アーティファクトを Cloud Storage バケットに書き込むための権限が必要です。

パイプラインを別のデプロイ環境にデプロイする

可能であれば、プロジェクトごとに異なるサービス アカウントを使用して、プロジェクト内の Google Cloud リソース(Dataflow 自体へのアクセスを含む)へアクセスし、管理してください。

Dataflow ジョブの作成と管理に使用するサービス アカウントには、ジョブ管理に十分な IAM 権限が必要です。詳細については、Dataflow のセキュリティと権限のページの Dataflow サービス アカウントのセクションをご覧ください。

Dataflow ジョブを実行するワーカー サービス アカウントには、ジョブの実行中に Compute Engine リソースを管理し、Apache Beam パイプラインと Dataflow サービスの間のインタラクションを管理する権限が必要です。詳細については、Dataflow のセキュリティと権限のページのワーカー サービス アカウントのセクションをご覧ください。

ジョブにユーザー管理のワーカー サービス アカウントを指定するには、--serviceAccount パイプライン オプションを使用します。ジョブの作成時にワーカー サービス アカウントを指定しないと、Dataflow は Compute Engine のデフォルトのサービス アカウントを使用しようとします。通常、Compute Engine デフォルト サービス アカウントには Dataflow ジョブに必要な権限よりも多くの権限セットが含まれているため、本番環境ではユーザー管理ワーカー サービス アカウントを指定することをおすすめします。

本番環境シナリオでは、Dataflow ジョブ管理とワーカー サービス アカウントに対して別々のサービス アカウントを使用することをおすすめします。これにより、単一のサービス アカウントを使用する場合よりも、セキュリティが強化されます。たとえば、Dataflow ジョブの作成に使用されるサービス アカウントでは、データソースやシンクにアクセスする必要がない場合があります。また、パイプラインで使用される他のリソースを使用する必要がない場合もあります。このシナリオでは、Dataflow ジョブの実行に使用されるワーカー サービス アカウントに、パイプライン リソースを使用する権限が付与されます。ジョブの作成に必要な別のサービス アカウントには、Dataflow ジョブを管理する権限(作成権限を含む)が与えられます。

CI / CD パイプラインの例

次の図は、CI / CD 用データ パイプラインの一般的かつ非ツール依存の概観を示しています。また、この図では、開発タスク、デプロイ環境、パイプライン ランナーの関係も示されています。

CI / CD パイプラインのステージ。

この図では、次のステージが示されています。

  • コード開発: コード開発中、デベロッパーは Direct Runner を使用しパイプライン コードをローカルで実行します。また、デベロッパーは、Dataflow Runner を使用して即席のパイプラインを実行するためのサンドボックス環境を使用します。

    一般的な CI / CD パイプラインでは、デベロッパーがソース管理システムに変更を加えた場合(リポジトリに新しいコードを push するなど)は、継続的インテグレーション プロセスが開始されます。

  • ビルドとテスト: 継続的インテグレーションのプロセスにより、パイプラインのコードがコンパイルされ、続いて Direct Runner を使用して単体テスト変換統合テストが実行されます。オプションのシステム統合テストも実行できます。これには小さいテスト データセットを使用する外部のソースとシンクによる統合テストも含まれます。

    テストが成功すると、CI プロセスは、継続的デリバリー プロセスにアクセス可能な場所にパイプラインの起動に必要なデプロイ アーティファクトを保存します。これには、JAR ファイル、Docker イメージ、テンプレート メタデータが含まれる場合があります。生成されるデプロイ アーティファクトの種類に応じて、Cloud Storage と Artifact Registry を使用して、異なるアーティファクト タイプを保存できます。

  • デリバリーとデプロイ: 継続的デリバリー プロセスにより、デプロイ アーティファクトが本番環境にコピーされます。それ以外の場合、これらのアーティファクトはその環境内で使用できるようになります。デベロッパーは、Dataflow Runner を使用してエンドツーエンド テストを手動で実行することも、継続的デプロイを使用してテストを自動的に開始することもできます。通常、継続的デプロイでは、バッチ パイプラインを有効にするほうがストリーミング パイプラインを有効にするより簡単です。これは、バッチ パイプラインは継続的に実行されず、新しいリリースで置き換えることが簡単なためです。

    ストリーミング パイプラインを更新するプロセスは、複雑なことを簡単にするものですが、更新は本番前環境でテストする必要があります。更新手順は、リリース間で常に異なる場合があります。たとえば、あるパイプラインでは、インプレース更新が不可能な方法で変更される可能性があります。このため、継続的デプロイを使用してパイプラインの更新を自動化することは難しい場合があります。

すべてのエンドツーエンド テストに合格したら、継続的デリバリー プロセスの一環としてデプロイ アーティファクトをコピーするか、本番環境で使用できるようにします。新しいパイプラインが既存のストリーミング パイプラインを更新または置き換える場合は、本番前環境でテストされた手順を使用して新しいパイプラインをデプロイします。

テンプレート化されていないジョブとテンプレートによるジョブの実行

開発環境から直接 Apache Beam SDK を使用して Dataflow ジョブを作成できます。このタイプのジョブは、テンプレート化されていないジョブといいます。この方法はデベロッパーにとって便利ですが、パイプラインの開発と実行のタスクを分離したほうがよい場合もあります。この分離を行うには、Dataflow テンプレートを使用し、独立したタスクとしてパイプラインをステージングして実行します。テンプレートのステージング後、他のユーザー(デベロッパー以外)は、Google Cloud CLI、Google Cloud コンソール、または Dataflow REST API を使用して、テンプレートからジョブを実行できます。

Dataflow には、次のジョブ テンプレートのタイプが用意されています。

  • クラシック テンプレート: デベロッパーは、Apache Beam SDK を使用してパイプライン コードを実行し、JSON シリアル化実行グラフをテンプレートとして保存できます。Apache Beam SDK は、パイプライン コードに必要な依存関係と一緒にテンプレート ファイルを Cloud Storage のロケーションに保存します。
  • Flex テンプレート: デベロッパーは Google Cloud CLI を使用してパイプラインを Docker イメージとしてパッケージ化します。このパッケージは Artifact Registry に保存されます。Flex テンプレート仕様ファイルも自動生成され、ユーザーが指定した Cloud Storage のロケーションに保存されます。Flex テンプレート仕様ファイルには、パイプライン パラメータなど、テンプレートの実行方法を示すメタデータが含まれています。

リンク先のドキュメントに記載されている機能のほかに、Flex テンプレートには、テンプレートの管理の面でクラシック テンプレートよりも優れた点があります。

  • クラシック テンプレートの場合、複数のアーティファクト(JAR ファイルなど)が Cloud Storage のステージング ロケーションに保存されますが、これらのアーティファクトを管理する機能がありません。対照的に、Flex テンプレートは Docker イメージ内にカプセル化されます。このイメージは、Artifact Registry によって管理される 1 つのデプロイ アーティファクトに、パイプラインに必要なすべての依存関係(Flex テンプレートの仕様を除く)をパッケージ化します。
  • Flex テンプレートの場合、Docker のイメージ管理機能を使用できます。たとえば、Artifact Registry に pull 権限と push 権限(任意)を付与することで、Flex テンプレートを安全に共有できます。また、Flex テンプレートの異なるバージョンに Docker イメージタグを使用できます。

デベロッパーは、クラシック テンプレートと Flex テンプレートを使用して、レジストリを所有するプロジェクトや、テンプレート アセットをホストするストレージ バケットを所有するプロジェクトとは異なるプロジェクトでジョブを起動できます。この機能は、複数のお客様のデータ処理を異なるプロジェクトやパイプライン ジョブに分割する必要がある場合に便利です。Flex テンプレートを使用すると、パイプラインの起動時に使用する Docker イメージのバージョンを指定できます。この機能により、後でテンプレートを更新するときに、複数のプロジェクトでバッチ パイプラインやストリーミング パイプラインを段階的に入れ替えることができます。

リソース使用量を最適化する Dataflow の機能

Dataflow には、次のランナー固有の機能が備わっており、リソース使用率が最適化されます。これにより、パフォーマンスの改善やコストの削減が可能になります。

  • Streaming Engine: Streaming Engine では、ストリーミング パイプラインを VM ワーカーではなく、専用のサービスで実行します。これには、自動スケーリングの応答性の改善、ワーカー VM リソースの消費の削減、再デプロイを伴わない自動更新サービスなどのメリットがあります。ストリーミング パイプラインには Streaming Engine を有効にすることをおすすめします。Apache Beam Java SDK または Python SDK の最新バージョンを使用する場合、この機能はデフォルトで有効になっています。
  • Dataflow Shuffle: Dataflow Shuffle は、バッチ パイプラインのシャッフル オペレーションを VM ワーカーではなく、専用のサービスで実行します。これには、ほとんどのバッチ パイプラインでの高速実行、ワーカー VM によるリソース消費の削減、自動スケーリングの応答性の向上、フォールト トレラントの改善などのメリットがあります。バッチ パイプラインでは Dataflow Shuffle を有効にすることをおすすめします。この機能は、Apache Beam Java SDK と最新の Python SDK を使用してデフォルトで有効になっています。
  • 柔軟なリソース スケジューリング(FlexRS): FlexRS では、高度なスケジューリング技術、Dataflow Shuffle サービス、プリエンプティブル VM インスタンスと通常の VM の組み合わせを使用することで、バッチ処理のコストを削減できます。

本番環境でストリーミング パイプラインを更新する

ジョブが完了すると停止するバッチ パイプラインとは異なり、ストリーミング パイプラインは通常、中断のない処理を提供するために継続的に実行します。最初のデプロイ後、既存のパイプラインを更新する場合は、次の点を考慮してください。

  • パイプラインのビジネス ロジックは、パイプラインの中断を最小限に抑えるか回避するように更新する必要があります。要件によっては、新しいバージョンのパイプラインがデプロイされている間、処理が一時的に中断することを許容できる場合があります。しかし、アプリケーションは中断の発生を受け入れられないかもしれません。
  • パイプライン更新プロセスでは、メッセージ処理やその他の接続されたシステムへの中断を最小限に抑えるようにスキーマの変更を処理する必要があります。たとえば、イベント処理パイプラインでメッセージのスキーマが変更された場合、ダウンストリーム データシンクでもスキーマの変更が必要になることがあります。

以降のセクションでは、これら 2 つの問題に対処する方法について説明します。

ストリーミング パイプラインのインプレース更新を実行する

特定の状況において、Dataflow は、パイプラインをキャンセルまたはドレインすることなく、進行中のストリーミング ジョブを直接更新できます。Dataflow は互換性チェックを行い、更新されたパイプライン コードを実行中のパイプラインに安全にデプロイできるようにします。コードの変更によっては、副入力が既存のステップに対して追加または削除されたときなど、互換性チェックが失敗する場合があります。詳細については、既存のパイプラインを更新するをご覧ください。

コード変更の適用に加え、インプレース更新を使用して、ストリーミング Dataflow ジョブのワーカー数を変更できます。詳細については、[水平自動スケーリング] ページの自動スケーリングの範囲を設定するをご覧ください。

ストリーミング パイプラインの停止と置換

処理を一時的に停止できる場合は、パイプラインのキャンセルまたはドレインが可能です。パイプラインをキャンセルすると、Dataflow は処理を直ちに停止し、リソースをできるだけ早くシャットダウンします。これにより、パイプラインで処理されているデータ(処理中のデータ)が失われます。データの損失を避けるため、通常はドレインが推奨されます。

パイプラインをドレインすると、処理中のウィンドウが直ちに終了し、すべてのトリガーが発生します。処理中のデータは失われませんが、ドレインによってウィンドウのデータが不完全になる可能性があります。この場合、処理中のウィンドウが結果の一部または不完全な結果を出力します。詳細については、ジョブのドレインによる影響をご覧ください。既存のジョブが完了したら、更新されたパイプライン コードを含む新しいストリーミング ジョブを起動し、処理を再開できます。

もう 1 つの欠点は、既存のストリーミング ジョブが終了してから、置換パイプラインがデータの処理を再開できるようになるまでの間にダウンタイムが発生することです。

ただし、既存のパイプラインをキャンセルまたはドレインして、更新されたパイプラインで新しいジョブを開始するのは比較的簡単です。

Pub/Sub スナップショットとシークを使用したメッセージの再処理

場合によっては、ドレインされたパイプラインを置換またはキャンセルした後、以前に配信された Pub/Sub メッセージを再処理する必要があります。たとえば、更新されたビジネス ロジックを使用してデータを再処理する必要が生じることがあります。Pub/Sub シークは、Pub/Sub スナップショットからメッセージを再生できる機能です。Pub/Sub シーク機能と Dataflow パイプラインを使用すると、サブスクリプション スナップショットの作成時点からメッセージを再処理できます。

開発とテストでは、Pub/Sub シークを使用して既知のメッセージを繰り返し再生し、パイプラインからの出力を確認できます。Pub/Sub シークを使用する場合は、サブスクリプションがパイプラインによって消費されるときは、サブスクリプション スナップショットをシークしないでください。シークすると、Dataflow のウォーターマーク ロジックを無効にし、Pub/Sub メッセージの 1 回限りの処理に影響を与える場合があります。

ターミナル ウィンドウで Pub/Sub シークと Dataflow パイプラインを使用する場合に推奨される gcloud CLI ワークフローは次のとおりです。

  1. サブスクリプションのスナップショットを作成します。

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. パイプラインをドレインまたはキャンセルします。

    gcloud dataflow jobs drain JOB_ID
    

    または

    gcloud dataflow jobs cancel JOB_ID
    
  3. 目的のスナップショットまで移動します。

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. サブスクリプションを使用する新しいパイプラインをデプロイします。

並列パイプラインを実行する

Dataflow がジョブを直接更新できない場合でも、並列パイプラインを作成することで、ストリーミング パイプラインの中断を回避できます。この方法を使用するには、パイプライン コードを更新した新しいストリーミング ジョブを作成し、既存のパイプラインと並行して実行します。

新しいパイプラインを作成するときは、既存のパイプラインに使用したものと同じウィンドウ戦略を使用します。更新されたパイプラインによって処理された最も古い完了ウィンドウのタイムスタンプをウォーターマークが超えるまで、既存のパイプラインを実行し続けます。次に、既存のパイプラインをドレインまたはキャンセルします。更新されたパイプラインは引き続き実行され、処理を引き継ぎます。

次の図に、このプロセスを示します。

Pipeline A が Pipeline B の 5 分間のウィンドウと重なる。

この図において、Pipeline B は、Pipeline A を引き継ぐ、更新されたジョブです。値 t は、Pipeline B で処理される最も古い完了ウィンドウのタイムスタンプです。値 wPipeline A のウォーターマークです。簡単にするため、完全なウォーターマークには遅延データがないと仮定して処理と経過時間は水平軸で表され、両方のパイプラインで 5 分間の固定(タンブリング)ウィンドウが使用されます。ウォーターマークが各ウィンドウの終わりを通過した後、結果がトリガーされます。

同時出力は、2 つのパイプラインがオーバーラップする期間に行われます。したがって、既存のパイプラインが結果を書き込む場所とは別の場所に結果を書き込むように、更新されたパイプラインを構成します。ダウンストリームのシステムでは、データベース ビューなどの 2 つの宛先シンクを抽象化し、結合された結果をクエリできます。これらのシステムでは、抽象化を使用して、オーバーラップする期間から重複を排除することもできます。

次の例は、Pub/Sub から入力データを読み取り、処理を行い、結果を BigQuery に書き込む簡単なパイプラインを使用する方法を示しています。

  1. 最初の状態で、既存のストリーミング パイプライン(Pipeline A)が実行され、サブスクリプション(Subscription A)を使用して Pub/Sub トピック(Topic)からメッセージを読み取ります。結果は BigQuery テーブル(Table A)に書き込まれます。結果は BigQuery ビューを介して使用されます。BigQuery ビューは、基になるテーブルの変更をマスクするためのファサードとして機能します。これは、ファサード パターンと呼ばれる設計方法の一つです。次の図は、初期状態を示しています。

    1 つのサブスクリプションを持つ 1 つのパイプラインと、単一の BigQuery テーブルへの書き込み。

  2. 更新されたパイプライン用に新しいサブスクリプション(Subscription B)を作成します。Subscription B を使用して Pub/Sub トピック(Topic)から読み取る、更新されたパイプライン(Pipeline B)をデプロイし、別の BigQuery テーブル(Table B)に書き込みます。次の図は、この流れを表しています。

    2 つのパイプライン(それぞれ 1 つのサブスクリプションを使用)。各パイプラインは個別の BigQuery テーブルに書き込まれます。ファサード ビューを両方のテーブルから読み取ります。

    この時点では、Pipeline APipeline B は並列で実行されており、結果は別々のテーブルに書き込まれます。時間 t は、Pipeline B で処理される最も古い完了ウィンドウのタイムスタンプとして記録します。

  3. ウォーターマークが時間 t を超えた場合にパイプライン A がドレインされるようにします。これにより、開いているウィンドウが閉じられ、処理中のデータの処理が完了します。完了ウィンドウが重要な場合(遅延データがないことが前提)、最初のパイプラインをドレインする前、両方のパイプラインで重複ウィンドウが完了するまでは、両方のパイプラインを実行できます。処理中のデータがすべて処理されて Table A に書き込まれた後、Pipeline A のストリーミング ジョブを停止します。次の図は、この段階を示しています。

    Pipeline A がドレインされて Subscription A を読み取れなくなり、ドレインの完了後は Table A にデータが送信されなくなります。すべての処理は 2 番目のパイプラインによって行われます。

  4. この時点では、Pipeline B のみが実行されています。BigQuery のビュー(ファサード ビュー)からクエリを実行でき、これは、Table ATable B のファサードとして機能します。両方のテーブルに同じタイムスタンプの行が存在する場合、Table B から行を返すようにビューを構成します。また、Table B に行が存在しない場合は、Table A にフォールバックするように構成します。次の図は、Table ATable B の両方から読み込むビュー(ファサード ビュー)を示しています。

    Pipeline A が削除され、Pipeline B のみが実行されている。

    この時点で、Subscription A を削除できます。2 つの BigQuery テーブルは、マージすることも、別々に保持することも可能です。

    新しいパイプライン デプロイメントで問題が検出された場合、重複(並列)パイプラインのデプロイのロールバックが簡単になります。前の例で、Pipeline A を実行した状態で、Pipeline B をモニタリングし、正しく動作しているかどうか確認することもできます。Pipeline B に問題がある場合は、Pipeline A を使用してロールバックできます。

スキーマ ミューテーションを処理する

データ処理システムでは、ビジネス要件や技術上の理由から、時間の経過とともにスキーマ ミューテーションへの対応が必要になります。ビジネス情報システムの中断を避けるため、スキーマの更新を適用する場合は慎重に計画し、実施する必要があります。パイプラインの更新を実行するときは、取り込みを中断することなく、また既存のスキーマに依存する接続システムに影響を与えることなく、ストリーミング データ パイプライン内でのスキーマ ミューテーションを処理することが重要です。

Pub/Sub トピックから JSON ペイロードを含むメッセージを読み取る簡単なパイプラインについて考えてみましょう。パイプラインは、各メッセージを TableRow インスタンスに変換し、その行を BigQuery テーブルに書き込みます。出力テーブルのスキーマは、パイプラインによって処理されるメッセージと似ています。次の図では、スキーマを Schema A として示しています。

Schema A を使用してサブスクリプションを読み取り、BigQuery 出力テーブルに書き込むパイプライン。

時間が経つと、メッセージ スキーマが複雑な形で変更されるとします。フィールドが追加、削除、置換され、Schema A は新しいスキーマに進化します。以降、その新しいスキーマを Schema B と呼びます。この場合、Pipeline A は更新され、出力テーブルのスキーマは Schema B をサポートしている必要があります。

プリンシパル テーブルとステージング テーブルを使用したスキーマ ミューテーションの処理

出力テーブルでは、新しいフィールドを追加するスキーマ、または列モードREQUIRED から NULLABLE など)を緩和するスキーマ ミューテーションは、ダウンタイムなしで実行できます。通常、ミューテーションは既存のクエリに影響しません。ただし、既存のスキーマ フィールドを変更または削除するスキーマ ミューテーションは、クエリの中断や、他の中断をもたらします。既存のスキーマ フィールドを変更または削除する際は、ダウンタイムを発生させずに変更に対応するための方法を計画する必要があります。

中断を回避する方法として、パイプラインによって書き込まれるデータをプリンシパル テーブルと 1 つ以上のステージング テーブルに分割する方法があります。このアプローチでは、プリンシパル テーブルにはパイプラインによって書き込まれた履歴データが保存され、ステージング テーブルには最新のパイプライン出力が格納されます。プリンシパル テーブルとステージング テーブルに対する BigQuery ファサード ビューを定義すると、履歴データと最新のデータの両方をクエリできます。

次の図は、前のフローをステージング テーブル(Staging Table A)、プリンシパル テーブル、ファサード ビューを含むように修正したものです。

サブスクリプションを読み取り、BigQuery ステージング テーブルに書き込むパイプライン。2 番目のテーブル(プリンシパル)には、以前のバージョンのスキーマの出力があります。ファサード ビューは、ステージング テーブルとプリンシパル テーブルの両方から読み取ります。

修正後のフローでは、Pipeline ASchema A を使用するメッセージを処理し、その出力は互換性のあるスキーマを持つ Staging Table A に書き込まれます。プリンシパル テーブルは、前バージョンのパイプラインによって書き込まれた履歴データと、ステージング テーブルから定期的にマージされる履歴データから構成されます。コンシューマは、ファサード ビューを使用して、履歴データとリアルタイム データの両方を含む最新のデータに対してクエリを実行できます。

メッセージ スキーマが Schema A から Schema B に更新されたら、Schema B を使用するメッセージに対応するようにパイプライン コードを更新します。既存のパイプラインを新しい実装で更新する必要があります。インプレース更新を実行するか、並列パイプラインを実行することで、ストリーミングのデータ処理を中断することなく継続できます。一方、パイプラインの終了と置換は、パイプラインが長時間実行されていないため、処理が中断されます。

更新されたパイプラインは、Schema B を使用する追加のステージング テーブル(Staging Table B)に書き込みます。オーケストレートされたワークフローを使用して、パイプラインを更新する前に新しいステージング テーブルを作成できます。また、ファサード ビューを更新して、新しいステージング テーブルの結果が含まれるようにします(場合によっては、関連するワークフロー ステップを使用します)。

次の図は、Schema B を含む Staging Table B と、プリンシパル テーブルと両方のステージング テーブルが含まれるようにファサード ビューを更新するフローを示しています。

パイプラインは Schema B を使用して、Staging Table B に書き込みます。ファセット ビューは、プリンシパル テーブル、Staging Table A、Staging Table B から読み取ります。

パイプラインの更新とは別のプロセスとして、ステージング テーブルを定期的または必要に応じてプリンシパル テーブルにマージできます。次の図は、Staging Table A がプリンシパル テーブルにマージされる仕組みを示しています。

Staging Table A はプリンシパル テーブルに統合されます。ファサード ビューは、Staging Table B とプリンシパル テーブルから読み取ります。

Dataflow ジョブのライフサイクル

Dataflow ジョブは、さまざまなジョブ状態で表されるライフサイクルを通過します。Dataflow ジョブを実行するには、ジョブをリージョンに送信します。ジョブは、リージョン内のいずれかのゾーンにある使用可能な Dataflow バックエンドに転送されます。Dataflow は、バックエンドを割り当てる前に、ジョブを実行するための割り当てと権限があることを確認します。これらのプリフライト チェックが完了し、バックエンドが割り当てられると、ジョブは JOB_STATE_PENDING 状態に移行します。FlexRS ジョブの場合、バックエンドの割り当てが遅延すると、JOB_STATE_QUEUED 状態になります。

割り当てられたバックエンドは、実行するジョブを取得し、Google Cloud プロジェクトで Dataflow ワーカーを開始します。ワーカー VM に選択されるゾーンは、要因の数によって異なります。Dataflow Shuffle を使用するバッチジョブの場合、サービスは、ゾーン間トラフィックを回避するために、Dataflow バックエンドとワーカー VM を同じゾーンに配置します。

Dataflow ワーカーが起動したら、Dataflow バックエンドに作業をリクエストします。バックエンドは、処理をワーカーに分散するバンドルという同時処理可能なチャンクに分割します。ワーカーが既存の作業を処理できないときに、自動スケーリングが有効になっていると、バックエンドは作業を処理する追加のワーカーを起動します。同様に、必要以上に多くのワーカーが起動した場合は、一部のワーカーがシャットダウンされます。

Dataflow ワーカーが起動すると、Dataflow バックエンドがコントロール プレーンとして機能し、ジョブの実行をオーケストレートします。処理中は、ジョブのデータプレーンにより、GroupByKeyCoGroupByKeyCombine などのシャッフル オペレーションが実行されます。ジョブは、シャッフル オペレーションに次のデータプレーン実装を使用します。

  • データプレーンは Dataflow ワーカー上で実行され、シャッフル データは永続ディスクに保存されます。
  • データプレーンはサービスとして実行され、ワーカー VM から外部化されます。この実装には、ジョブの作成時に指定する 2 つのバリアントがあります。バッチ パイプライン用の Dataflow Shuffle とストリーミング パイプライン用の Streaming Engine です。サービスベースのシャッフルでは、ワーカーベースのシャッフルと比べると、データ シャッフル処理のパフォーマンスとスケーラビリティが大幅に向上します。

JOB_STATE_RUNNING 状態のストリーミング ジョブは、ジョブの不具合が発生した場合、キャンセルまたはドレインされるまで、継続的に実行されます。バッチジョブは、すべての処理が完了するか、回復不能なエラーが発生すると自動的に終了します。ジョブの停止方法に応じて、Dataflow は、ジョブのステータスを複数の終了状態(JOB_STATE_CANCELLEDJOB_STATE_DRAINEDJOB_STATE_DONE など)のいずれかに設定します。

パイプラインの信頼性に関するベスト プラクティス

このセクションでは、Dataflow の使用時に発生する可能性のある障害と Dataflow ジョブのベスト プラクティスについて説明します。

分離の原則に従う

パイプラインの全体的な信頼性を向上させる一般的な推奨事項は、リージョンとゾーンの背後にある分離の原則に従うことです。パイプラインで重要なリージョン間に依存関係がないことを確認します。複数のリージョンのサービスに重要な依存関係があるパイプラインの場合、それらのリージョンのいずれかで障害が発生すると、パイプラインに影響する可能性があります。この問題を回避するには、冗長性とバックアップの確保を目的として、複数のリージョンにデプロイします。

Dataflow のスナップショットを作成する

Dataflow には、パイプラインの状態をバックアップするスナップショット機能が用意されています。パイプライン スナップショットは、別のゾーンまたはリージョンの新しいストリーミング Dataflow パイプラインに復元できます。その後、スナップショットのタイムスタンプから、Pub/Sub または Kafka トピック内のメッセージの再処理を開始できます。定期的なパイプライン スナップショットを設定すると、目標復旧時間(RTO)を最小化できます。

Dataflow スナップショットの詳細については、Dataflow スナップショットの使用をご覧ください。

ジョブ送信エラーを処理する

テンプレート以外の Dataflow ジョブを送信する場合は、Apache Beam SDK を使用します。ジョブを送信するには、Dataflow Runner を使用してパイプラインを実行します。これは、パイプラインのオプションの一部として指定されます。Apache Beam SDK が Cloud Storage 内のファイルをステージングし、ジョブ リクエスト ファイルを作成して、そのファイルを Dataflow に送信します。

あるいは、Dataflow テンプレートから作成されたジョブでは、さまざまな送信方法が使用されます。通常、このメソッドは templates API を使用しています。

テンプレート ジョブとテンプレート以外のジョブの失敗により Dataflow から返されるエラーはさまざまです。このセクションでは、さまざまな種類のジョブ送信エラーと、それらを処理または軽減するためのベスト プラクティスについて説明します。

一時的なエラーの発生後にジョブ送信を再試行する

Dataflow サービスの問題が原因でジョブを開始できない場合は、ジョブを数回再試行してください。再試行すると、一時的なサービスの問題を軽減できます。

ワーカー リージョンを指定してゾーン障害を軽減する

Dataflow は、リージョンの可用性を高め、複数のリージョンで利用可能です。ユーザーがゾーンを明示的に指定せずにリージョンにジョブを送信すると、Dataflow はリソースの可用性に基づいて、指定されたリージョンのゾーンにジョブを転送します。

ジョブを配置する場合は、可能であれば --zone フラグではなく、--region フラグを使用してワーカー リージョンを指定することをおすすめします。この手順により、Dataflow はジョブに最適なゾーンを自動的に選択するため、パイプラインのフォールト トレランスが向上します。明示的なゾーンを指定するジョブの場合、このメリットがありません。ゾーン内で問題が発生すると、ジョブが失敗します。ゾーンの問題によってジョブの送信が失敗した場合は、ゾーンを明示的に指定せずにジョブを再試行することで問題が解決することがあります。

複数のリージョンにデータを保存して、リージョン障害を軽減する

リージョン全体が使用できない場合は、別のリージョンのジョブを試してください。リージョン間でジョブが失敗する場合は、データの可用性を考慮することが重要です。複数のリージョンにデータを手動でコピーせずに単一リージョンの障害から保護するには、データを複数のリージョンに自動的に保存する Google Cloud リソースを使用します。たとえば、データセットに BigQuery マルチリージョン ロケーションを使用するか、Cloud Storage のデュアルリージョン バケットとマルチリージョン バケットを使用します。1 つのリージョンが使用不能になった場合、データが使用可能な別のリージョンでパイプラインを再実行できます。

Dataflow でマルチリージョン サービスを使用する例については、高可用性と地理的冗長性をご覧ください。

実行中のパイプラインのエラーを処理する

ジョブが送信されて受け入れられた後は、そのジョブに対する有効な操作は、次のとおりです。

  • バッチジョブのキャンセル
  • ストリーミング ジョブの更新、ドレイン、キャンセル

ジョブを送信した後に、実行中のジョブのロケーションを変更することはできません。FlexRS を使用していない場合、送信してから数分以内にデータの処理が開始します。FlexRS ジョブの場合、データ処理が開始するまでに最大で 6 時間かかることがあります。

このセクションでは、ジョブの実行時エラーとその処理のベスト プラクティスについて説明します。

一時的なエラーによる問題を特定して解決するためにジョブをモニタリングする

バッチジョブの場合、失敗した項目を含むバンドルは 4 回再試行されます。単一のバンドルが 4 回失敗した場合は、Dataflow によってジョブが終了します。この処理により、一時的な多くの問題が処理されます。ただし、長時間にわたって失敗する場合は、最大再試行回数にすぐに到達するため、ジョブの迅速な終了が可能になります。

モニタリングとインシデント管理のために、失敗したジョブを検出するアラートルールを構成します。ジョブが失敗した場合は、ジョブのログを検査して再試行上限を超えた作業項目が原因となって発生したジョブのエラーを特定します。

ストリーミング ジョブの場合、Dataflow は失敗した作業項目をいつまでも再試行します。ジョブが終了することはありません。ただし、問題が解決するまでジョブが機能停止する可能性があります。モニタリング ポリシーを作成して、システム レイテンシの増加やデータの更新頻度の低下など、機能停止したパイプラインの兆候を検出します。パイプライン コードにエラーロギングを実装することで、繰り返し失敗する作業項目によって引き起こされたパイプラインの機能停止を特定できます。

ゾーン全体が停止した場合に別のゾーンでジョブを再開する

ジョブの開始後、ユーザーコードを実行する Dataflow ワーカーは 1 つのゾーンに制限されます。ゾーンが停止した場合、その範囲によっては Dataflow ジョブが影響を受けることがあります。

Dataflow バックエンドにのみ影響を与える停止の場合、バックエンドはマネージド サービスによって自動的に別のゾーンに移され、ジョブを続行できるようになりますジョブが Dataflow Shuffle を使用する場合、バックエンドをゾーン間で移動させることはできません。Dataflow バックエンドの移行が発生すると、ジョブが一時的に停止する可能性があります。

ゾーンの停止が発生すると、実行中のジョブは失敗するか、ゾーンの可用性が復元されるまで停止する可能性があります。ゾーンが長期間使用できなくなった場合、ジョブを停止します(ストリーミング ジョブではキャンセルドレイン)。その後、再起動して、Dataflow により新しい正常なゾーンが選択されるようにします。

リージョンの停止が発生した場合に別のリージョンでバッチジョブを再開する

Dataflow ジョブを実行しているリージョンが停止した場合、ジョブが失敗するか、ジョブが停止状態になる可能性があります。バッチジョブの場合は、可能であれば、別のリージョンでジョブを再起動します。異なるリージョンでデータを利用できるようにすることが重要です。

高可用性またはフェイルオーバーを使用してリージョンの停止を回避する

ストリーミング ジョブの場合は、アプリケーションのフォールト トレランスと予算に応じて、障害を軽減する方法が異なります。リージョン全体が停止している場合、最も簡単で費用対効果の高い選択肢は、停止が解消するまで待機することです。ただし、レイテンシの影響を受けやすいアプリケーションや、データ処理が中断しないようにする必要があるか、遅延を最小限に抑えて再開する必要がある場合は、以降のセクションで説明するオプションを確認してください。

高可用性: データの損失なくレイテンシを確保

アプリケーションでデータ損失を許容できない場合は、重複するパイプラインを 2 つの異なるリージョンで同時に実行し、パイプラインで同じデータを使用します。両方のリージョンで同じデータソースを利用できる必要があります。パイプラインの出力に依存するダウンストリーム アプリケーションで、この 2 つのリージョンからの出力を切り替える必要があります。リソースの重複のため、このオプションは他のオプションと比べて最大の費用がかかります。デプロイの例については、高可用性と地理的冗長性をご覧ください。

フェイルオーバー: ある程度のデータ損失を許容してレイテンシを確保

アプリケーションでデータ損失の可能性を許容できる場合は、ストリーミング データソースを複数のリージョンで使用可能にします。たとえば、Pub/Sub を使用して、同じトピックに対して 2 つの独立したサブスクリプションをリージョンごとに 1 つずつ維持します。リージョンが停止した場合、別のリージョンで置換パイプラインを開始し、パイプラインがバックアップ サブスクリプションからデータを使用するようにします。

レイテンシを犠牲にすることなく、データ損失を最小限に抑えながら、適切な時間までバックアップ サブスクリプションを再生します。ダウンストリーム アプリケーションは、実行中のパイプラインの出力に切り替える方法を認識している必要があります。高可用性オプションと同様です。このオプションではデータの複製のみが行われるため、重複するパイプラインの実行よりもリソースの使用量が少なくなります。

高可用性と地理的冗長性

高可用性データ処理を行うために複数のストリーミング パイプラインを並行して実行できます。たとえば、異なるリージョンで 2 つのストリーミング ジョブを並行して実行できます。これにより、データ処理の地理的冗長性とフォールト トレラントが提供されます。

データソースとシンクの地理的な可用性を考慮することで、高可用性のマルチリージョン構成でエンドツーエンドのパイプラインを開発できます。次の図は、デプロイの例を示しています。

2 つのリージョン パイプラインは、個別のサブスクリプションを使用してグローバルな Pub/Sub トピックを読み取ります。パイプラインは、米国と欧州の別々のマルチリージョン BigQuery テーブルに書き込みを行います。

図は次のフローを示しています。

  1. Pub/Sub は世界中のほとんどのリージョンで実行されるため、サービスによって高速にグローバルなデータにアクセスできます。また、メッセージを保存する場所を制御できます。Pub/Sub は、パブリッシュされたメッセージをサブスクライバーに最も近い Google Cloud リージョンに自動的に保存できます。あるいは、メッセージ ストレージ ポリシーを使用して特定のリージョンまたはリージョンのセットを使用するように構成できます。

    その後、Pub/Sub は、メッセージがどこに保存されているかに関係なく、世界中のサブスクライバーにメッセージを配信します。トラフィックはグローバル負荷分散メカニズムにより、メッセージの格納場所に最も近い Google Cloud データセンターに誘導されるため、Pub/Sub クライアントは、接続先のサーバーのロケーションを意識する必要がありません。

  2. Dataflow は、特定の Google Cloud リージョンで動作します。別々の Google Cloud リージョンでパイプラインを並列実行すると、単一のリージョンに影響を与える障害からジョブを分離できます。この図では、同じパイプラインの 2 つのインスタンスを示しており、各インスタンスは別々の Google Cloud リージョンで実行されます。

  3. BigQuery は、単一リージョンとマルチリージョンのデータセット ロケーションを提供します。マルチリージョン ロケーションを選択すると、データセットは 2 つ以上のリージョンに保存されます。この図では、異なる 2 つのパイプラインを示し、それぞれが別々のマルチリージョン データセットに書き込まれます。