パイプラインをテストする

パイプラインのテストは、効果的なデータ処理ソリューションの開発で特に重要なステップです。Google Cloud Platform でリモートに実行されるパイプライン グラフをユーザーコードで作成するという Cloud Dataflow のモデルの間接的な性質により、失敗した実行のデバッグが複雑なタスクになることがあります。多くの場合、パイプラインのリモート実行をデバッグするよりも、パイプライン コードのローカル ユニットテストを実行するほうが高速かつ簡単です。

通常、Cloud Dataflow サービスで完全に実行する前にパイプライン コードをローカルでユニットテストすることは、パイプライン コードのバグを特定して修正するための最善で最も直接的な方法です。パイプラインのローカル ユニットテストでは、使い慣れたお好きなローカル デバッグツールを使用することもできます。

Dataflow SDK には、最小レベルから最大レベルまで、パイプライン コードのユニットテストを行う多数の方法が用意されています。最小レベルから最大レベルまで、次のテストがあります。

  • パイプラインのコア変換の内部の DoFn など、個々の機能オブジェクトをテストできます。
  • 複合変換全体を 1 つのユニットとしてテストできます。
  • パイプライン全体についてエンドツーエンドのテストを実行できます。

Java

ユニットテストをサポートするために、Dataflow SDK for Java には、パッケージ com.google.cloud.dataflow.sdk.testing にいくつかのテストクラスが用意されています。また、SDK に含まれる変換にはユニットテストが含まれ、SDK 内のプログラム例にもテストが含まれています。これらのテストをリファレンスおよびガイドとして使用できます。

個々の DoFn オブジェクトをテストする

パイプラインの DoFn 関数のコードは頻繁に、それも複数の Compute Engine インスタンスにわたって頻繁に実行されます。Dataflow の実行で DoFn オブジェクトを使用する前に、それらのユニットテストを実行すると、デバッグの時間とエネルギーを大幅に節約できます。

Java

Dataflow SDK for Java には、個々の DoFn をテストする DoFnTesterという便利な方法が用意されており、これは SDK Transforms パッケージに含まれています。

DoFnTester は、JUnit フレームワークを使用します。DoFnTester を使用するには、次の操作を行う必要があります。

  1. DoFnTester を作成します。DoFnTester の静的ファクトリ メソッドにテストする DoFn のインスタンスを渡す必要があります。
  2. DoFn に適切な型の、1 つまたは複数のテスト入力を作成します。DoFn が副入力を受け取るか副出力を生成する場合、あるいはその両方の場合は、副入力と副出力のタグも作成する必要があります。
  3. DoFnTester.processBatch を呼び出してメイン入力を処理します。
  4. JUnit の Assert.assertThat メソッドを使用して、processBatch から返されたテスト出力が期待される値と一致することを確認します。

DoFnTester を作成する

DoFnTester を作成するには、まずテストする DoFn のインスタンスを作成します。.of() 静的ファクトリ メソッドを使用して DoFnTester を作成するときに、そのインスタンスを使用します。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

テスト入力を作成する

ユーザーは、DoFnTester 用に 1 つまたは複数のテスト入力を作成して、DoFn に送信する必要があります。テスト入力を作成するには、DoFn が受け付ける同じ入力タイプの入力変数を単に 1 つ以上作成します。上記のケースでは、次のようになります。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";

副入力と副出力

DoFn が副入力を受け付ける場合、メソッド DoFnTester.setSideInputs を使用してそのような副入力を作成できます。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  PCollectionView<List<Integer>> sideInput = ...;
  Iterable<Integer> value = ...;
  fnTester.setSideInputInGlobalWindow(sideInput, value);

DoFn が副出力を生成する場合、各出力にアクセスする際に使用する適切な TupleTag オブジェクトを設定する必要があります。副出力を持つ DoFn は、各副出力に対して PCollectionTuple を生成します。そのタプルの各副出力に対応する TupleTagList を指定する必要があります。

DoFn がタイプ StringInteger の副出力を生成するとします。それぞれに対して TupleTag オブジェクトを作成し、それらを TupleTagList にバンドルしてから、次のように DoFnTester に対して設定します。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  TupleTag<String> tag1 = ...;
  TupleTag<Integer> tag2 = ...;
  TupleTagList tags = TupleTagList.of(tag1).and(tag2);

  fnTester.setSideOutputTags(tags);

詳しくは副入力ParDo のドキュメントをご覧ください。

テスト入力を処理し、結果をチェックする

入力を処理(したがって DoFn でテストを実行)するには、メソッド DoFnTester.processBatch を呼び出します。processBatch の呼び出しでは、DoFn のメインのテスト入力値を 1 つ以上渡します。副入力を設定した場合、副入力は提供するメイン入力の各バッチで使用可能になります。

DoFnTester.processBatch は、出力の List、つまり DoFn の指定した出力タイプと同じタイプのオブジェクトを返します。DoFn<String, Integer> には、processBatchList<Integer> を返します。

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

processBatch の結果を確認するには、JUnit の Assert.assertThat メソッドを使用して、出力の Listに期待される値が含まれているかどうかテストします。

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

  Assert.assertThat(testOutputs, Matchers.hasItems(...));

  // Process a larger batch in a single step.
  Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));

複合変換をテストする

作成した複合変換をテストするには、次のパターンを使用できます。

  • TestPipeline を作成します。
  • いくつかの静的な既知のテスト入力データを作成します。
  • Create 変換を使用して、入力データの PCollection を作成します。
  • 複合変換を入力 PCollectionApply し、結果として生成される出力 PCollection を保存します。
  • DataflowAssert とそのサブクラスを使用して、出力 PCollection に期待される要素が含まれているか確認します。

Java

SDK テストクラスを使用する

TestPipelineDataflowAssert は、変換のテスト用に Cloud Dataflow Java SDK に含まれているクラスです。TestPipelineDataflowAssert は、ローカルでもリモート Cloud Dataflow サービスに対しても実行されるように設定されたテストで動作します。

TestPipeline

テストの場合は、パイプライン オブジェクトの作成時に Pipeline の代わりに TestPipeline を使用します。Pipeline.create と違い、TestPipeline.create は設定 PipelineOptions を内部で処理します。

TestPipeline を次のように作成します。

  Pipeline p = TestPipeline.create();

DataflowAssert

DataflowAssert は、PCollection のコンテンツに関するアサーションです。DataflowAssert を使用して、PCollection が期待される要素セットを含むことを確認できます。

特定の PCollectionに対して、DataflowAssert を使用して次のように内容を確認します。

  PCollection<String> output = ...;

  // Check whether a PCollection contains some elements in any order.
  DataflowAssert.that(output)
    .containsInAnyOrder(
      "elem1",
      "elem3",
      "elem2");

DataflowAssert を使用するすべてのコードは、JUnitHamcrest にリンクする必要があります。Maven を使用している場合は、プロジェクトの pom.xml ファイルに次の依存関係を追加すれば Hamcrest にリンクできます。

  <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
  </dependency>

これらのクラスの動作について詳しくは、com.google.cloud.dataflow.sdk.testing パッケージのドキュメントをご覧ください。

Create 変換を使用する

Create 変換を使用して、Java List などの標準のインメモリ コレクション クラスから PCollection を作成できます。詳しくは、PCollection を作成するをご覧ください。

複合変換のテスト例

Java

次のコードは、複合変換の完全なテストを示します。テストでは、Count 変換を String 要素の入力 PCollection に適用します。テストでは Create 変換を使用して、Java List<String> から入力 PCollection を作成します。

  @RunWith(JUnit4.class)
  public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
    "hi", "there", "hi", "hi", "sue", "bob",
    "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    @Test
    public void testCount() {
      // Create a test pipeline.
      Pipeline p = TestPipeline.create();

      // Create an input PCollection.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Apply the Count transform under test.
      PCollection<KV<String, Long>> output =
        input.apply(Count.<String>perElement());

      // Assert on the results.
      DataflowAssert.that(output)
        .containsInAnyOrder(
            KV.of("hi", 4L),
            KV.of("there", 1L),
            KV.of("sue", 2L),
            KV.of("bob", 2L),
            KV.of("", 3L),
            KV.of("ZOW", 1L));

      // Run the pipeline.
      p.run();
  }

パイプラインをエンドツーエンドでテストする

Dataflow SDK のテストクラス(たとえば Java 用の Dataflow SDK の中の TestPipelineDataflowAssert)を使用して、パイプライン全体をエンドツー エンドでテストできます。通常、パイプライン全体をテストするには、次の操作を実行します。

  • パイプラインへの入力データのすべてのソースについて、いくつかの既知の静的テスト入力データを作成します。
  • パイプラインの最終出力 PCollection で期待される内容と一致する静的テスト出力データをいくつか作成します。
  • 標準 Pipeline.create の代わりに TestPipeline を作成します。
  • パイプラインの Read 変換の代わりに Create 変換を使用して、静的入力データから 1 つ以上の PCollection を作成します。
  • パイプラインの変換を適用します。
  • パイプラインの Write 変換の代わりに DataflowAssert を使用して、パイプラインが生成する最終 PCollection のコンテンツが静的出力データ内の期待される値と一致することを確認します。

WordCount パイプラインをテストする

Java

次のコード例は、WordCount サンプル パイプラインをテストする方法を示しています。WordCount は通常、テキスト ファイルから入力データの行を読み込みます。一方、テストはテキスト行を含む Java List<String> を作成し、Create 変換を使用して最初の PCollection を作成します。

WordCount の最終変換(複合変換 CountWords から)では、印刷に適した書式の単語数の PCollection<String> が生成されます。その PCollection を出力テキスト ファイルに書き込む代わりに、テスト パイプラインでは DataflowAssert を使用して、PCollection の要素が、期待される出力データを含む静的 String 配列の要素と一致することを確認します。

  @RunWith(JUnit4.class)
  public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.
    @Test
    @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
  }