Dataflow パイプラインの開発とテスト

このページでは、Dataflow パイプラインの開発とテストのベスト プラクティスについて説明します。

概要

パイプラインのコードの実装方法は、本番環境でのパイプラインのパフォーマンスに大きく影響します。適正かつ効率的に機能するパイプライン コードを作成できるように、このドキュメントでは次のことについて説明します。

  • 開発とデプロイのさまざまな段階でコードの実行をサポートするパイプライン ランナー。
  • デプロイ環境。開発、テスト、本番前環境、本番環境でパイプラインを実行できます。
  • オープンソースのパイプライン コードとテンプレート。そのまま使用することも、コードを迅速に開発するために新しいパイプラインのベースとして使用することもできます。
  • パイプライン コードをテストするためのベスト プラクティス。まず、このドキュメントでは、単体テスト、統合テスト、エンドツーエンド テストなどのテストの範囲とその関係などの概要について説明します。次に、テストデータを作成して統合する方法と各テストで使用するパイプライン ランナーについて詳しく説明します。

パイプライン ランナー

開発とテストでは、さまざまな Apache Beam ランナーを使用してパイプライン コードを実行します。Apache Beam SDK には、ローカルの開発用とテスト用に Direct Runner が用意されています。単体テストと統合テストでは、リリース自動化ツールを使って Direct Runner を使用することもできます。たとえば、継続的インテグレーション(CI)パイプラインで Direct Runner を使用できます。

Dataflow にデプロイされるパイプラインでは、Dataflow Runner を使用します。これにより、本番環境に似た環境でパイプラインが実行されます。また、アドホック開発のテストやエンドツーエンドのパイプライン テストにも Dataflow Runner を使用できます。

このページでは、Apache Beam Java SDK を使用してビルドされたパイプラインの実行に焦点を合わせていますが、Dataflow は Python と Go を使用して開発された Apache Beam パイプラインもサポートしています。Apache Beam Java、Python、Go SDK が Dataflow で一般提供されています。SQL デベロッパーは Apache Beam SQL を使用し、使い慣れた SQL 言語でパイプラインを作成することもできます。

デプロイ環境を設定する

開発のステージごとにユーザー、データ、コード、その他のリソースを分離するには、デプロイ環境を作成します。パイプライン開発のさまざまなステージで分離された環境を用意するときに、可能であれば、別々の Google Cloud プロジェクトを使用してください。

以降のセクションでは、一般的なデプロイ環境のセットについて説明します。

ローカル環境

ローカル環境は、デベロッパー ワークステーションです。開発と迅速なテストを行うには、Direct Runner を使用してパイプライン コードをローカルで実行します。

Direct Runner を使用してローカルで実行されるパイプラインは、Pub/Sub トピックや BigQuery テーブルなどのリモート Google Cloud リソースとやり取りできます。個々のデベロッパーに個別の Google Cloud プロジェクトを用意すると、Google Cloud サービスでアドホック テストを行うためのサンドボックスを使用できます。

Pub/SubBigtable などの一部の Google Cloud サービスでは、ローカル開発用のエミュレータが用意されています。Direct Runner でこれらのエミュレータを使用すると、ローカルでエンドツーエンドの開発とテストを行うことができます。

サンドボックス環境

サンドボックス環境は、デベロッパーがコード開発時に Google Cloud サービスにアクセスできるようにする Google Cloud プロジェクトです。パイプラインのデベロッパーは、Google Cloud プロジェクトを他のデベロッパーと共有したり、各自の独立したプロジェクトを使用したりできます。独立したプロジェクトを使用することで、共有リソースの使用量と割り当ての管理に関する計画の複雑さを軽減できます。

デベロッパーは、サンドボックス環境を使用して、Dataflow Runner を使用したアドホック パイプラインを実行します。サンドボックス環境は、コード開発フェーズで本番環境のランナーに対してコードのデバッグとテストを行う場合に役立ちます。たとえば、アドホック パイプラインを実行すると、デベロッパーは次のことが可能になります。

  • コード変更によるスケーリングの動作への影響を確認する。
  • Direct Runner と Dataflow Runner で考えられる動作の違いを把握する。
  • Dataflow でグラフの最適化がどのように行われるのかを理解する。

アドホック テストでは、デベロッパーはサンドボックス環境で Dataflow を実行するために、ローカル環境からコードをデプロイできます。

本番前環境

本番前環境は、エンドツーエンド テストのように本番環境に相当する条件が求められる開発フェーズに使用する環境です。本番前環境用に別のプロジェクトを使用します。可能であれば、本番環境と同様に構成します。同様に、本番環境と同様の規模でエンドツーエンドのテストを実施する場合は、Dataflow やその他のサービスの Google Cloud プロジェクトの割り当てを本番環境とできるだけ近いものにします。

必要であれば、本番前環境をさらに複数の環境に分割することもできます。たとえば、品質管理環境は、さまざまなワークロード条件でデータの正確性、鮮度、パフォーマンスなどのサービスレベル目標(SLO)をテストする品質アナリストの作業をサポートします。

エンドツーエンド テストでは、テスト範囲内でデータソースとシンクを統合します。これらを本番前環境で使用可能にする方法を検討してください。テストデータは本番前環境で保存できます。たとえば、テストデータは入力データと一緒に Cloud Storage バケットに保存されます。その他のケースでは、本番環境の別のサブスクリプションを介して Pub/Sub トピックなどの本番前環境以外からテストデータを受け取る場合があります。ストリーミング パイプラインの場合は、Dataflow Streaming Data Generator などを使用し、生成されたデータでエンドツーエンド テストを実行して、本番環境に似たデータ特性とボリュームをエミュレートすることもできます。

ストリーミング パイプラインの場合は、本番環境に変更を加える前に、本番前環境でパイプラインの更新をテストします。特に、ダウンタイムを回避するために並列パイプラインを実行するなど、複数のステップを調整する必要がある場合は、ストリーミング パイプラインの更新手順をテストして検証することが重要です。

本番環境

本番環境は専用の Google Cloud プロジェクトです。継続的デリバリーは、すべてのエンドツーエンド テストに合格すると、本番環境にデプロイ アーティファクトをコピーします。

開発に関するベスト プラクティス

Dataflow パイプラインのベスト プラクティスをご覧ください。

パイプラインをテストする

ソフトウェア開発、単体テスト、統合テスト、エンドツーエンド テストは、ソフトウェア テストの一般的な種類です。これらのテストの種類は、データ パイプラインにも適用できます。

Apache Beam SDK には、これらのテストを有効にする機能があります。テストのタイプごとに異なるデプロイ環境が理想的です。次の図は、単体テスト、統合テスト、エンドツーエンド テストがパイプラインとデータのさまざまな部分でどのように適用されるかを示しています。

テストの種類と、変換、パイプライン、データソース、データシンクとの関係

この図は、さまざまなテストの範囲と、それらの変換(DoFnPTransform サブクラス)、パイプライン、データソース、データシンクとの関係を示しています。

次のセクションで、Dataflow を使用して一般的なソフトウェア テストをデータ パイプラインに適用する方法を説明します。このセクションを読み終えたら、この図に戻り、さまざまな種類のテストがどのように関係しているかを確認してください。

データ サンプリング

Dataflow パイプラインの各ステップでデータをモニタリングするには、テスト中のデータ サンプリングを有効にします。これにより、変換の出力を表示して、出力が正しいことを確認できます。

単体テスト

単体テストでは、変換の出力と検証済みのデータ入力および出力のセットを比較して、DoFn サブクラスと複合変換PTransform サブクラス)が正しく機能しているかどうか評価します。通常、デベロッパーはこれらのテストをローカル環境で実行できます。ビルド環境で継続的インテグレーション(CI)の単体テストの自動化を利用して、テストを自動的に実行することもできます。

Direct Runner は、変換のビジネス ロジックのテストに焦点を当てた、参照用のテストデータのサブセットを使用して単体テストを実行します。テストデータは、テストを実行するマシンのローカルメモリに収まるサイズでなければなりません。

Apache Beam SDK には、個々の変換(DoFn サブクラス)、複合変換(PTransform サブクラス)、パイプライン全体に対して単体テストを実行するための TestPipeline という JUnit ルールが用意されています。JUnit テストクラスの次のコード スニペットに示されているように、Direct Runner や Dataflow Runner などの Apache Beam パイプライン ランナー上で TestPipeline を使用し、PAssert を使用して PCollection オブジェクトのコンテンツにアサーションを適用できます。

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

個別の変換の単体テスト

コードを再利用可能な変換(たとえば、トップレベルまたは静的なネストされたクラスとして)に分解することで、パイプラインのさまざまな部分を対象とするテストを作成できます。再利用可能な変換にはテスト以外のメリットもあります。パイプラインのビジネス ロジックをコンポーネント部分にカプセル化することで、コードの保守性と再利用性を高めることができます。対照的に、パイプラインで匿名の個々の内部クラスを使用して変換を実装している場合は、パイプラインの個々の部分のテストが難しくなることがあります。

次の Java スニペットでは匿名の内部クラスとして変換が実装されているため、テストを簡単に行うことができません。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

次の例では、匿名の内部クラスが名前付きの具体的な DoFn サブクラスにリファクタリングされています。前の例と比べてみてください。エンドツーエンドのパイプラインを構成する具体的な DoFn サブクラスごとに、個別の単体テストを作成できます。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

DoFn サブクラスのテストは、単一変換を含むバッチ パイプラインの単体テストに似ています。Create 変換を使用してテストデータの PCollection オブジェクトを作成し、DoFn オブジェクトに渡します。PAssert を使用して、PCollection オブジェクトのコンテンツが正しいことを確認します。次の Java コードの例では、正しい出力形式を確認するために PAssert クラスを使用しています。

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

統合テスト

統合テストでは、パイプライン全体が正しく機能していることを確認します。次のような統合テストについて考えてみましょう。

  • データ パイプラインを構成する個々の変換が統合された機能を評価する変換統合テスト。変換統合テストは、外部のデータソースとデータシンクとの統合を除けば、パイプライン全体の単体テストと考えることができます。Apache Beam SDK には、データ パイプラインにテストデータを提供し、処理の結果を検証するメソッドが用意されています。変換統合テストを実行するために Direct Runner が使用されます。
  • データ パイプラインのライブ データソースとデータシンクとの統合を評価するシステム統合テスト。パイプラインが外部システムと通信するには、外部サービスにアクセスするための適切な認証情報を使用してテストを構成する必要があります。ストリーミング パイプラインは無期限に実行されるため、実行中のパイプラインをいつ、どのように停止するかを決めておく必要があります。Direct Runner を使用してシステム統合テストを実行することで、Dataflow ジョブを送信して完了を待つ必要がなくなり、パイプラインと他のシステムの統合をすばやく検証できます。

デベロッパーの生産性を低下させずに欠陥を迅速に検出してフィードバックを提供するように、変換とシステム統合テストを設計する必要があります。Dataflow ジョブとして実行されるテストなど、実行時間の長いテストの場合は、実行頻度の低いエンドツーエンド テストが適している可能性があります。

データ パイプラインは 1 つ以上の関連する変換と考えることができます。パイプラインのカプセル化複合変換を作成し、TestPipeline を使用してパイプライン全体の統合テストを実行できます。パイプラインのテストモード(バッチモードまたはストリーミング モード)によって、Create 変換または TestStream 変換のどちらかを使用してテストデータを提供します。

統合テストにテストデータを使用する

本番環境では、パイプラインがさまざまなデータソースやデータシンクと統合されている可能性があります。しかし、単体テストと変換統合テストでは、テスト入力を提供して出力を直接検証することで、パイプライン コードのビジネス ロジックの検証に焦点を合わせる必要があります。このアプローチにより、テストの簡素化だけでなく、パイプライン固有の問題とデータソースやデータシンクに起因する可能性のある問題を切り分けることができます。

バッチ パイプラインをテストする

バッチ パイプラインでは、Create 変換を使用して、標準のインメモリ コレクション(Java List オブジェクトなど)から入力テストデータの PCollection オブジェクトを作成します。テストデータが小さく、コードに含めることができる場合は、Create 変換を使用します。その後、出力の PCollection オブジェクトに PAssert を使用して、パイプライン コードが正しいかどうかを確認できます。この方法は、Direct Runner と Dataflow Runner でサポートされています。

次の Java コード スニペットは、出力の PCollection オブジェクトに対するパイプライン(WeatherStatsPipeline)を構成する個々の変換の一部、または全部を含む複合変換のアサーションを示しています。このアプローチは、パイプラインの個々の変換の単体テストに似ています。

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

ウィンドウ処理の動作をテストするには、次のコード スニペットのように、Create 変換を使用してタイムスタンプ付きの要素を作成することもできます。

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

ストリーミング パイプラインをテストする

ストリーミング パイプラインには、制限なしのデータの処理方法を定義する前提条件が存在します。こうした前提条件は多くの場合、実際の状況でのデータに適時性に関するものであるため、前提条件が true か false かに応じて正確性に影響を及ぼします。したがって、ストリーミング パイプラインの統合テストでは、ストリーミング データの到着に関する非決定的な特性をシミュレートするテストを含めるのが理想的です。

このようなテストを可能にするため、Apache Beam SDK には、データ パイプラインの結果で要素のタイミング(早い、時間どおり、遅い)の効果をモデル化する TestStream クラスが用意されています。これらのテストを PAssert クラスとともに使用して、想定される結果と比較します。

TestStream は、Direct Runner と Dataflow Runner でサポートされています。次のコードサンプルでは、TestStream 変換を作成します。

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

TestStream の詳細については、Apache Beam での制限なしのパイプラインのテストをご覧ください。単体テストで Apache Beam SDK を使用する方法については、Apache Beam のドキュメントをご覧ください。

統合テストで Google Cloud サービスを使用する

Direct Runner は Google Cloud サービスと統合できるため、必要に応じてローカル環境のアドホック テストやシステム統合テストで Pub/Sub、BigQuery などのサービスを使用できます。Direct Runner を使用する場合、パイプラインは、gcloud コマンドライン ツールで構成されたユーザー アカウント、または GOOGLE_APPLICATION_CREDENTIALS 環境変数に指定したサービス アカウントとして実行されます。したがって、パイプラインを実行する前に、必要なすべてのリソースに対する十分な権限をこのアカウントに付与する必要があります。詳細については、Dataflow のセキュリティと権限をご覧ください。

完全にローカルな統合テストでは、一部の Google Cloud サービスでローカル エミュレータを使用できます。ローカル エミュレータは Pub/SubBigtable で使用できます。

ストリーミング パイプラインのシステム統合テストでは、setBlockOnRun メソッド(DirectOptions インターフェースに定義)を使用して、Direct Runner でパイプラインを非同期で実行できます。それ以外の場合は、パイプラインが手動で終了されるまで、パイプラインの実行で呼び出し元の親プロセス(ビルド パイプラインのスクリプトなど)がブロックされます。パイプラインを非同期で実行する場合は、次のコード例のように、返された PipelineResult インスタンスを使用してパイプラインの実行をキャンセルできます。

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

エンドツーエンド テスト

エンドツーエンド テストでは、本番環境とよく似ている条件において、Dataflow Runner でエンドツーエンド パイプラインを実行して、オペレーションが正しく行われたことを検証します。このテストでは、Dataflow Runner を使用したビジネス ロジックが正しく機能することや、本番環境相当の負荷でパイプラインが期待どおりに動作するかどうかを確認します。通常、エンドツーエンド テストは、本番前環境として指定された専用の Google Cloud プロジェクトで実行します。

さまざまなスケールでパイプラインをテストするには、さまざまなタイプのエンドツーエンド テストを使用します。次に例を示します。

  • テスト データセットの一部分(全体の 1% 程度)を使用して、小規模なエンドツーエンド テストを実行し、本番前環境でパイプライン機能をすばやく検証します。
  • 完全なテスト データセットを使用して、大規模なエンドツーエンド テストを実行し、本番環境相当のデータ量と条件でパイプラインの機能を検証します。

ストリーミング パイプラインで同じデータを使用できる場合は、本番環境パイプラインと並行してテスト パイプラインを実行することをおすすめします。これにより、自動スケーリングやパフォーマンスなどの結果とオペレーションの動作を比較できます。

エンドツーエンド テストは、パイプラインが本番環境の SLO をどの程度満たすかを予測するのに役立ちます。本番前環境は、本番環境と同様の条件でパイプラインをテストします。エンドツーエンド テストでは、Dataflow Runner を使用してパイプラインを実行し、本番環境のデータセットと一致する(またはよく似た)完全な参照データセットを処理します。

実際のデータを正確にシミュレートするテスト用の合成データを生成できない場合があります。この問題に対処するためのアプローチの一つは、本番環境のデータソースからの抽出をクレンジングして参照データセットを作成することです。ここでは、機密データが適切な変換によって匿名化されます。この目的のため、機密データ保護を使用することをおすすめします。機密データ保護は、さまざまなコンテンツ タイプとデータソースから機密データを検出し、秘匿化、マスキング、フォーマット保持暗号化、日付シフトなど、さまざまな匿名化手法を適用できます。

バッチ パイプラインとストリーミング パイプラインのエンドツーエンド テストの違い

大規模なテスト データセットに完全なエンドツーエンド テストを実行する前に、テストデータの一部(全体の 1% 程度)を使用してテストを行い、短時間で期待される動作を確認することをおすすめします。Direct Runner を使用する統合テストと同様に、Dataflow Runner を使用してパイプラインを実行するときに PCollection オブジェクトに PAssert を使用できます。PAssert の詳細については、このページの単体テストのセクションをご覧ください。

ユースケースによっては、エンドツーエンド テストで非常に大規模な出力を検証するのは、実用的でなかったり、費用がかかる場合があります。その場合は、代わりに出力結果セットからの代表サンプルで検証できます。たとえば、BigQuery を使用して出力行をサンプリングし、期待される結果の参照データセットと比較できます。

ストリーミング パイプラインでは、合成データでの現実的なストリーミング条件のシミュレーションが困難な場合があります。エンドツーエンド テストにストリーミング データを提供する一般的な方法は、本番環境のデータソースとテストを統合することです。Pub/Sub をデータソースとして使用する場合、既存のトピックへの追加のサブスクリプションにより、エンドツーエンド テスト用に別のデータ ストリームを有効にできます。次に、同じデータを使用する異なるパイプラインの結果を比較できます。これは、他の本番前環境および本番環境のパイプラインに対してパイプラインの候補を検証する際に役立ちます。

次の図は、この方法によって本番環境パイプラインとテスト パイプラインを異なるデプロイ環境で並行して実行する方法を示しています。

単一の Pub/Sub ストリーミング ソースを使用した本番環境パイプラインと並行したテスト パイプラインの実行

この図では、両方のパイプラインが同じ Pub/Sub トピックから読み取りますが、個別のサブスクリプションを使用します。この設定により、2 つのパイプラインが同じデータを独立して処理するため、結果を比較できます。テスト パイプラインは本番環境プロジェクトとは別のサービス アカウントを使用するため、本番環境プロジェクトの Pub/Sub サブスクライバーの割り当ては使用されません。

バッチ パイプラインとは異なり、ストリーミング パイプラインは明示的にキャンセルされるまで継続されます。エンドツーエンド テストでは、パイプラインをある時期まで(たとえばエンドツーエンド テストの実行まで)そのまま実行するか、結果を確認できるようテストの完了を表すポイントでパイプラインをキャンセルするかを決定する必要があります。

使用するテストデータの種類は、この決定に影響します。たとえば、ストリーミング パイプラインに渡される制限付きのテストデータのセットを使用する場合、すべての要素の処理が完了したらパイプラインをキャンセルできます。あるいは、現実のデータソース(本番環境で使用されている既存の Pub/Sub トピックなど)を使用する場合や、継続的にテストデータを別に生成している場合は、より長い期間テスト パイプラインの実行を継続することをおすすめします。後者の場合、本番環境での挙動や他のテスト パイプラインとの動作を比較できます。