파이프라인 테스트

파이프라인 테스트는 효과적인 데이터 처리 솔루션을 개발하는 데 있어 특히 중요한 단계입니다. 사용자 코드가 Google Cloud Platform에서 원격으로 실행되도록 파이프라인 그래프를 구성하는 Cloud Dataflow 모델의 간접적 특성으로 인해, 실행이 실패했을 때 디버깅하는 작업이 쉽지 않을 수 있습니다. 파이프라인의 원격 실행을 디버그하는 것보다 파이프라인 코드에서 로컬 단위로 테스트하는 것이 더 빠르고 간편한 경우가 많습니다.

Cloud Dataflow 서비스로 전체 실행을 해 보기 전에 파이프라인 코드를 로컬 단위로 테스트하는 것이 파이프라인 코드에서 버그를 식별하고 수정하는 데 가장 적합하고 직접적인 방법인 경우가 많습니다. 또한 로컬에서 파이프라인을 단위 테스트하면 사용자가 익숙하고 즐겨 사용하는 로컬 디버깅 도구를 사용할 수 있습니다.

Dataflow SDK는 가장 낮은 수준에서 가장 높은 수준까지 파이프라인 코드를 단위 테스트할 수 있는 다양한 방법을 제공합니다. 가장 낮은 수준에서 가장 높은 수준까지 다음 작업을 할 수 있습니다.

  • 파이프라인의 핵심 변환 내에서 DoFn과 같은 개별 함수 객체를 테스트할 수 있습니다.
  • 전체 복합 변환을 단위로 하여 테스트할 수 있습니다.
  • 전체 파이프라인에 대해 엔드 투 엔드 테스트를 수행할 수 있습니다.

자바

자바용 Dataflow SDK는 단위 테스트를 지원하기 위해 com.google.cloud.dataflow.sdk.testing 패키지를 통해 다양한 테스트 클래스를 제공합니다. 또한 SDK에 포함되어 있는 변환에 단위 테스트가 있고 SDK의 예 프로그램에도 테스트가 포함되어 있습니다. 이러한 테스트를 참조 및 가이드로 사용할 수 있습니다.

개별 DoFn 객체 테스트

파이프라인의 DoFn 함수에 있는 코드는 자주 실행되며 여러 Compute Engine 인스턴스 간에 실행되는 경우가 많습니다. DoFn 객체를 Dataflow 실행에서 사용하기 전에 단위 테스트하면 상당한 디버깅 시간 및 노력을 절약할 수 있습니다.

자바

자바용 Dataflow SDK는 SDK Transforms 패키지에 포함되어 있는 개별 DoFn 호출 DoFnTester를 테스트할 수 있는 편리한 방법을 제공합니다.

DoFnTesterJUnit 프레임워크를 사용합니다. DoFnTester를 사용하려면 다음 작업이 필요합니다.

  1. DoFnTester를 만듭니다. 테스트할 DoFn의 인스턴스를 DoFnTester의 정적 팩토리 메소드에 전달해야 합니다.
  2. DoFn에 적합한 유형의 기본 테스트 입력을 하나 이상 만듭니다. DoFn이 부차 입력을 사용하거나 부차 출력을 생성하는 경우 부차 입력 및 부차 출력 태그도 만들어야 합니다.
  3. 기본 입력을 처리하려면 DoFnTester.processBatch를 호출합니다.
  4. JUnit의 Assert.assertThat 메소드를 사용하여 processBatch에서 반환된 테스트 출력이 예상 값과 일치하는지 확인합니다.

DoFnTester 만들기

DoFnTester를 만들려면 테스트할 DoFn의 인스턴스를 먼저 만들어야 합니다. 그런 다음 .of() 정적 팩토리 메소드를 사용하여 DoFnTester를 만들 때 이 인스턴스를 사용합니다.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

테스트 입력 만들기

DoFnTester의 테스트 입력을 하나 이상 만들어 DoFn에 보내야 합니다. 테스트 입력을 만들려면 DoFn에서 허용하는 것과 동일한 입력 유형을 갖는 입력 변수를 하나 이상 만들면 됩니다. 위 예제에서는 다음과 같습니다.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";

부차 입력 및 출력

DoFn이 부차 입력을 허용하는 경우 DoFnTester.setSideInputs 메소드를 사용하여 부차 입력을 만들 수 있습니다.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  PCollectionView<List<Integer>> sideInput = ...;
  Iterable<Integer> value = ...;
  fnTester.setSideInputInGlobalWindow(sideInput, value);

DoFn이 부차 출력을 생성하는 경우 각 출력에 액세스하는 데 사용할 적합한 TupleTag 객체를 설정해야 합니다. 부차 출력이 있는 DoFn은 각 부차 출력에 대해 PCollectionTuple을 생성하므로 해당 튜플의 각 부차 출력에 해당하는 TupleTagList를 제공해야 합니다.

DoFnStringInteger 유형의 부하 출력을 생성한다고 가정해 봅시다. 각 유형에 대해 TupleTag 객체를 만들고 이들 객체를 TupleTagList로 결합한 후 DoFnTester에 대해 다음과 같이 설정합니다.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  TupleTag<String> tag1 = ...;
  TupleTag<Integer> tag2 = ...;
  TupleTagList tags = TupleTagList.of(tag1).and(tag2);

  fnTester.setSideOutputTags(tags);

자세한 내용은 부차 입력에 대한 ParDo 문서를 참조하세요.

테스트 입력 처리 및 결과 확인

입력을 처리하고 DoFn에서 테스트를 실행하려면 DoFnTester.processBatch 메소드를 호출합니다. processBatch를 호출하는 경우 DoFn의 기본 테스트 입력 값을 하나 이상 전달합니다. 부차 입력을 설정하는 경우 제공하는 기본 입력의 각 배치에 부차 입력을 사용할 수 있습니다.

DoFnTester.processBatch는 출력 List 즉, DoFn의 지정된 출력 유형과 동일한 유형의 객체를 반환합니다. DoFn<String, Integer>의 경우 processBatchList<Integer>를 반환합니다.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

processBatch의 결과를 확인하려면 JUnit의 Assert.assertThat 메소드를 사용하여 출력 List에 예상한 값이 포함되어 있는지 테스트합니다.

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

  Assert.assertThat(testOutputs, Matchers.hasItems(...));

  // Process a larger batch in a single step.
  Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));

복합 변환 테스트

만든 복합 변환을 테스트하려면 다음 패턴을 사용합니다.

  • TestPipeline을 만듭니다.
  • 정적이고 알려진 테스트 입력 데이터를 만듭니다.
  • Create 변환을 사용하여 입력 데이터의 PCollection을 만듭니다.
  • 입력 PCollection에 복합 변환을 Apply하고 그 결과인 출력 PCollection을 저장합니다.
  • DataflowAssert 및 해당 하위 클래스를 사용하여 출력 PCollection에 예상한 요소가 포함되어 있는지 확인합니다.

자바

SDK 테스트 클래스 사용

TestPipelineDataflowAssert는 특히 변환 테스트를 위해 Cloud Dataflow 자바 SDK에 포함된 클래스입니다. TestPipelineDataflowAssert는 로컬에서 실행되거나 원격 Cloud Dataflow 서비스에 대해 실행하도록 구성된 테스트와 함께 작동합니다.

TestPipeline

테스트하려면 파이프라인 객체를 만들 때 Pipeline 대신 TestPipeline을 사용합니다. TestPipeline.createPipeline.create과는 달리 PipelineOptions 설정을 내부에서 처리합니다.

다음과 같이 TestPipeline을 만듭니다.

  Pipeline p = TestPipeline.create();

DataflowAssert

DataflowAssertPCollection의 콘텐츠에 대한 어설션입니다. DataflowAssert를 사용하여 PCollection에 특정 예상 요소 세트가 포함되어 있는지 확인할 수 있습니다.

주어진 PCollection의 경우 다음과 같이 DataflowAssert를 사용하여 콘텐츠를 확인할 수 있습니다.

  PCollection<String> output = ...;

  // Check whether a PCollection contains some elements in any order.
  DataflowAssert.that(output)
    .containsInAnyOrder(
      "elem1",
      "elem3",
      "elem2");

DataflowAssert를 사용하는 코드는 JUnitHamcrest에서 링크를 걸어야 합니다. Maven을 사용하는 경우 프로젝트의 pom.xml 파일에 다음과 같은 종속성을 추가하여 Hamcrest에서 링크를 걸 수 있습니다.

  <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
  </dependency>

이러한 클래스가 작동하는 방법에 대한 자세한 내용은 com.google.cloud.dataflow.sdk.testing 패키지 문서를 참조하세요.

Create 변환 사용

Create 변환을 사용하여 자바 List와 같은 표준 인메모리 컬렉션 클래스로 PCollection을 만들 수 있습니다. 자세한 내용은 PCollection 만들기를 참조하세요.

복합 변환 예제 테스트

자바

다음 코드는 복합 변환에 대한 전체 테스트를 보여줍니다. 이 테스트는 Count 변환을 String 요소의 입력 PCollection에 적용합니다. 이 테스트는 Create 변환을 사용하여 자바 List<String>에서 입력 PCollection을 만듭니다.

  @RunWith(JUnit4.class)
  public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
    "hi", "there", "hi", "hi", "sue", "bob",
    "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    @Test
    public void testCount() {
      // Create a test pipeline.
      Pipeline p = TestPipeline.create();

      // Create an input PCollection.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Apply the Count transform under test.
      PCollection<KV<String, Long>> output =
        input.apply(Count.<String>perElement());

      // Assert on the results.
      DataflowAssert.that(output)
        .containsInAnyOrder(
            KV.of("hi", 4L),
            KV.of("there", 1L),
            KV.of("sue", 2L),
            KV.of("bob", 2L),
            KV.of("", 3L),
            KV.of("ZOW", 1L));

      // Run the pipeline.
      p.run();
  }

파이프라인 엔드 투 엔드 테스트

Dataflow SDK의 테스트 클래스(예: 자바용 Dataflow SDK의 TestPipelineDataflowAssert)를 사용하여 전체 파이프라인을 엔드 투 엔드 테스트할 수 있습니다. 일반적으로 전체 파이프라인을 테스트하는 방법은 다음과 같습니다.

  • 파이프라인에 대한 입력 데이터 소스별로 알려진 정적 테스트 입력 데이터를 만듭니다.
  • 파이프라인의 최종 출력 PCollection에서 예상과 일치하는 정적 테스트 출력 데이터를 만듭니다.
  • 표준 Pipeline.create 대신 TestPipeline을 만듭니다.
  • 파이프라인의 Read 변환 대신 Create 변환을 사용하여 정적 입력 데이터에서 PCollection을 하나 이상 만듭니다.
  • 파이프라인의 변환을 적용합니다.
  • 파이프라인의 Write 변환 대신 DataflowAssert를 사용하여 파이프라인이 생성하는 최종 PCollection 콘텐츠가 정적 출력 데이터의 예상 값과 일치하는지 확인합니다.

WordCount 파이프라인 테스트

자바

다음 예제 코드는 WordCount 예제 파이프라인을 테스트하는 방법을 보여줍니다. WordCount는 일반적으로 입력 데이터의 텍스트 파일에서 줄을 읽습니다. 이 테스트는 텍스트 몇 줄이 포함되어 있는 자바 List<String>을 만들고 Create 변환을 사용하여 초기 PCollection을 만듭니다.

복합 변환 CountWords에서 WordCount의 최종 변환은 인쇄에 적합한 형식의 단어 개수로 이루어진 PCollection<String>을 생성합니다. 테스트 파이프라인은 출력 텍스트 파일에 해당 PCollection을 쓰는 대신 DataflowAssert를 사용하여 PCollection의 요소가 예상 출력 데이터를 포함하는 정적 String 배열의 요소와 일치하는지 확인합니다.

  @RunWith(JUnit4.class)
  public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.
    @Test
    @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
  }
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.