パイプラインをテストする

パイプラインのテストは、効果的なデータ処理ソリューションの開発で特に重要なステップです。Google Cloud Platform でリモートに実行されるパイプライン グラフをユーザーコードで作成するという Cloud Dataflow のモデルの間接的な性質により、失敗した実行のデバッグが複雑なタスクになることがあります。多くの場合、パイプラインのリモート実行をデバッグするよりも、パイプライン コードのローカル ユニットテストを実行するほうが高速かつ簡単です。

通常、Cloud Dataflow サービスで完全に実行する前にパイプライン コードをローカルでユニットテストすることは、パイプライン コードのバグを特定して修正するための最善で最も直接的な方法です。パイプラインのローカル ユニットテストでは、使い慣れたお好きなローカル デバッグツールを使用することもできます。

Dataflow SDK には、最小レベルから最大レベルまで、パイプライン コードのユニットテストを行う多数の方法が用意されています。最小レベルから最大レベルまで、次のテストがあります。

  • パイプラインのコア変換の内部の DoFn など、個々の機能オブジェクトをテストできます。
  • 複合変換全体を 1 つのユニットとしてテストできます。
  • パイプライン全体についてエンドツーエンドのテストを実行できます。

Java

ユニットテストをサポートするために、Dataflow SDK for Java には、パッケージ com.google.cloud.dataflow.sdk.testing にいくつかのテストクラスが用意されています。また、SDK に含まれる変換にはユニットテストが含まれ、SDK 内のプログラム例にもテストが含まれています。これらのテストをリファレンスおよびガイドとして使用できます。

個々の DoFn オブジェクトをテストする

パイプラインの DoFn 関数のコードは、複数の Compute Engine インスタンスで何度も実行されます。Dataflow の実行で DoFn オブジェクトを使用する前に、それらのユニットテストを実行すると、デバッグの時間とエネルギーを大幅に節約できます。

Java

Dataflow SDK for Java には、個々の DoFn をテストする DoFnTester という便利な方法が用意されており、これは SDK Transforms パッケージに含まれています。

DoFnTester は、JUnit フレームワークを使用します。DoFnTester を使用するには、次の操作を行う必要があります。

  1. DoFnTester を作成します。テストする DoFn のインスタンスを DoFnTester の静的ファクトリ メソッドに渡す必要があります。
  2. DoFn に対して適切なタイプのメインテスト入力を 1 つ以上作成します。DoFn が副入力を受け取るか副出力を生成する場合、あるいはその両方の場合は、副入力と副出力のタグも作成する必要があります。
  3. DoFnTester.processBatch を呼び出してメイン入力を処理します。
  4. JUnit の Assert.assertThat メソッドを使用して、processBatch から返されたテスト出力が期待される値と一致することを確認します。

DoFnTester を作成する

DoFnTester を作成するには、テストする DoFn のインスタンスを最初に作成します。次に、.of() 静的ファクトリ メソッドを使用して DoFnTester を作成する場合に、そのインスタンスを使用します。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

テスト入力を作成する

DoFnTester の 1 つ以上のテスト入力を作成して、DoFn に送信する必要があります。テスト入力を作成するには、DoFn が受け入れるのと同じ入力タイプの入力変数を 1 つ以上作成するだけです。上記のケースでは、次のようになります。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";

副入力と副出力

DoFn が副入力を受け入れる場合は、メソッド DoFnTester.setSideInputs を使用してそれらの副入力を作成できます。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  PCollectionView<List<Integer>> sideInput = ...;
  Iterable<Integer> value = ...;
  fnTester.setSideInputInGlobalWindow(sideInput, value);

DoFn が副出力を生成する場合は、各出力にアクセスするために使用する適切な TupleTag オブジェクトを設定する必要があります。副出力がある DoFn は各副出力の PCollectionTuple を生成します。そのタプルの各副出力に対応する TupleTagList を提供する必要があります。

DoFn がタイプ String および Integer の副出力を生成するとします。それぞれに対して TupleTag オブジェクトを作成し、それらを TupleTagList にバンドルしてから、次のように DoFnTester に対して設定します。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  TupleTag<String> tag1 = ...;
  TupleTag<Integer> tag2 = ...;
  TupleTagList tags = TupleTagList.of(tag1).and(tag2);

  fnTester.setSideOutputTags(tags);

詳しくは、ParDo のドキュメントで副入力の説明をご覧ください。

テスト入力を処理し、結果をチェックする

入力を処理(したがって DoFn でテストを実行)するには、メソッド DoFnTester.processBatch を呼び出します。processBatch を呼び出すときに、DoFn の 1 つ以上のメインテスト入力値を渡します。副入力を設定した場合、副入力は提供するメイン入力の各バッチで使用可能になります。

DoFnTester.processBatch は、出力の List、つまり DoFn の指定した出力タイプと同じタイプのオブジェクトを返します。DoFn<String, Integer> の場合、processBatchList<Integer> を返します。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

processBatch の結果をチェックするには、JUnit の Assert.assertThat メソッドを使用して、出力の List に期待される値が含まれるかどうかをテストします。

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

  Assert.assertThat(testOutputs, Matchers.hasItems(...));

  // Process a larger batch in a single step.
  Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));

複合変換をテストする

作成した複合変換をテストするには、次のパターンを使用できます。

  • TestPipeline を作成します。
  • いくつかの静的な既知のテスト入力データを作成します。
  • Create 変換を使用して、入力データの PCollection を作成します。
  • 複合変換を入力 PCollectionApply し、結果として生成される出力 PCollection を保存します。
  • DataflowAssert とそのサブクラスを使用して、出力 PCollection に期待される要素が含まれることを確認します。

Java

SDK テストクラスを使用する

TestPipelineDataflowAssert は、変換のテスト用に Cloud Dataflow Java SDK に含まれているクラスです。TestPipelineDataflowAssert は、ローカルでもリモート Cloud Dataflow サービスに対しても実行されるように設定されたテストで動作します。

TestPipeline

テストでは、パイプライン オブジェクトの作成時に Pipeline の代わりに TestPipeline を使用します。Pipeline.create とは異なり、TestPipeline.createPipelineOptions の設定を内部的に処理します。

TestPipeline は次のように作成します。

  Pipeline p = TestPipeline.create();

DataflowAssert

DataflowAssert は、PCollection のコンテンツに対するアサーションです。DataflowAssert を使用して、PCollection に期待される要素の特定のセットが含まれていることを確認できます。

特定の PCollection について、DataflowAssert を使用してコンテンツを次のように確認できます。

  PCollection<String> output = ...;

  // Check whether a PCollection contains some elements in any order.
  DataflowAssert.that(output)
    .containsInAnyOrder(
      "elem1",
      "elem3",
      "elem2");

DataflowAssert を使用するコードは JUnitHamcrest でリンクする必要があります。Maven を使用している場合は、次の依存関係をプロジェクトの pom.xml ファイルに追加することによって Hamcrest でリンクできます。

  <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
  </dependency>

これらのクラスの動作について詳しくは、com.google.cloud.dataflow.sdk.testing パッケージのドキュメントをご覧ください。

Create 変換を使用する

Create 変換を使用して、Java List などの標準のメモリ内コレクション クラスから PCollection を作成できます。詳しくは、PCollection の作成をご覧ください。

複合変換のテスト例

Java

次のコードは、複合変換の完全なテストを示します。テストは、Count 変換を String 要素の入力 PCollection に適用します。テストでは、Create 変換を使用して、入力 PCollection を Java List<String> から作成します。

  @RunWith(JUnit4.class)
  public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
    "hi", "there", "hi", "hi", "sue", "bob",
    "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    @Test
    public void testCount() {
      // Create a test pipeline.
      Pipeline p = TestPipeline.create();

      // Create an input PCollection.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Apply the Count transform under test.
      PCollection<KV<String, Long>> output =
        input.apply(Count.<String>perElement());

      // Assert on the results.
      DataflowAssert.that(output)
        .containsInAnyOrder(
            KV.of("hi", 4L),
            KV.of("there", 1L),
            KV.of("sue", 2L),
            KV.of("bob", 2L),
            KV.of("", 3L),
            KV.of("ZOW", 1L));

      // Run the pipeline.
      p.run();
  }

パイプラインをエンドツーエンドでテストする

Dataflow SDK のテストクラス(Dataflow SDK for Java の TestPipelineDataflowAssert など)を使用して、パイプライン全体をエンドツーエンドでテストできます。通常、パイプライン全体をテストするには、次の操作を実行します。

  • パイプラインへの入力データのすべてのソースについて、いくつかの既知の静的テスト入力データを作成します。
  • パイプラインの最終出力 PCollection で期待される内容と一致する静的テスト出力データをいくつか作成します。
  • 標準 Pipeline.create の代わりに TestPipeline を作成します。
  • パイプラインの Read 変換の代わりに、Create 変換を使用して、静的入力データから 1 つ以上の PCollection を作成します。
  • パイプラインの変換を適用します。
  • パイプラインの Write 変換の代わりに、DataflowAssert を使用して、パイプラインが生成する最終的な PCollection のコンテンツが静的出力データの期待される値と一致することを確認します。

WordCount パイプラインをテストする

Java

次のコード例は、WordCount サンプル パイプラインをテストする方法を示しています。 通常、WordCount は入力データのテキスト ファイルから行を読み取ります。代わりに、テストではいくつかのテキスト行を含む Java List<String> を作成し、Create 変換を使用して初期 PCollection を作成します。

WordCount の最終変換(複合変換 CountWords から)は、印刷に適した書式の単語数の PCollection<String> が生成されます。その PCollection を出力テキスト ファイルに書き込む代わりに、テスト パイプラインでは DataflowAssert を使用して、PCollection の要素が、期待される出力データを含む静的 String 配列の要素と一致することを確認します。

  @RunWith(JUnit4.class)
  public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.
    @Test
    @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
  }
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。