ParDo를 사용한 병렬 처리

ParDo는 Dataflow SDK의 코어 병렬 처리 작업입니다. 일반 병렬 처리에 ParDo를 사용합니다. ParDo 처리 스타일은 Map/Shuffle/Reduce 스타일 알고리즘의 "Mapper" 클래스 내에서 발생하는 현상과 비슷합니다. ParDo는 입력 PCollection의 각 요소를 적용하고, 해당 요소에 대한 몇 가지 처리 함수를 수행한 후에 0개, 1개 또는 여러 개의 요소를 출력 PCollection에 내보냅니다.

ParDo가 입력 PCollection의 각 요소에 수행하는 함수는 사용자가 제공합니다. 제공된 함수는 독립적으로 그리고 Dataflow의 여러 작업자 인스턴스에서 동시에 호출됩니다.

ParDo는 다음을 비롯한 다양한 데이터 처리 작업에 유용합니다.

  • 데이터 세트 필터링. ParDo를 사용하여 PCollection의 각 요소를 고려하고, 해당 요소를 새 컬렉션에 출력할 것인지 아니면 삭제할 것인지를 결정할 수 있습니다.
  • 데이터 세트에 있는 각 요소의 유형에 대한 형식 지정 또는 변환. ParDo를 사용하여 PCollection에 있는 요소의 형식을 지정할 수 있습니다. 예를 들어 키-값 쌍의 형식을 인쇄 가능한 문자열로 지정할 수 있습니다.
  • 데이터 세트에서 각 요소의 부분 추출. ParDo를 사용하여 PCollection에서 각 요소의 일부만 추출할 수 있습니다. 이 작업은 BigQuery 테이블 행에서 개별 필드를 추출하는 데 특히 유용할 수 있습니다.
  • 데이터 세트에서 각 요소의 계산 수행. ParDo를 사용하여 PCollection의 모든 요소 또는 특정 요소에서 간단하거나 복잡한 계산을 수행할 수 있습니다.

ParDo는 파이프라인에서 일반적인 중간 단계이기도 합니다. 예를 들어 ParDo를 사용하여 PCollection의 각 요소에 키를 할당하여 키-값 쌍을 만들 수 있습니다. 나중에 GroupByKey 변환을 사용하여 쌍을 그룹화할 수 있습니다.

ParDo 변환 적용

ParDo를 사용하려면 변환할 PCollection에 적용하고 반환 값을 적절한 유형의 PCollection으로 저장하세요.

ParDo에 제공하는 인수는 DoFn이라는 Dataflow SDK에서 제공하는 특정 유형의 하위 클래스여야 합니다. DoFn에 대한 자세한 내용은 이 섹션의 뒷부분에 있는 처리 논리 생성 및 지정을 참조하세요.

다음 코드 예시는 문자열의 PCollection에 적용된 기본 ParDo를 보여줍니다. 이는 각 문자열의 길이를 계산하기 위한 DoFn 기반 함수를 전달하며 문자열 길이를 정수 PCollection에 출력합니다.

자바

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

이 예시에서 코드는 입력 컬렉션("words"라고 함)에 apply를 호출합니다. ParDoPTransform 인수입니다. .of 작업에서 각 요소에 대해 수행할 DoFn을 지정합니다. 이 경우에는 ComputeWordLengthFn()입니다.

처리 논리 생성 및 지정

ParDo에 제공하는 처리 논리는 파이프라인을 생성하기 위해 사용 중인 Dataflow SDK에서 필요로 하는 특정 유형이어야 합니다.

자바

SDK 클래스 DoFn의 하위 클래스를 빌드해야 합니다.

제공하는 함수는 여러 Google Compute Engine 인스턴스에서 독립적으로 호출됩니다.

또한 DoFn은 호출 사이에 지속적인 상태에 의존해서는 안 됩니다. Cloud Platform에 있는 처리 함수의 특정 인스턴스가 해당 함수의 다른 인스턴스에서 상태 정보에 액세스하지 못할 수도 있습니다.

참고: Dataflow SDK는 변경이 불가능한 영구 데이터를 사용자 코드의 각 호출에 부차 입력으로 전달하는 데 사용할 수 있는 ParDo의 변형을 제공합니다.

자바

DoFn은 입력 PCollection에서 한 번에서 한 요소씩 처리합니다. DoFn의 하위 클래스를 생성할 때 입력 요소의 유형과 출력 요소의 유형을 type 매개변수로 지정합니다. 다음 코드 샘플은 입력 String을 적용하고 출력 Integer를 생성하는 이전 예시에서 ComputeWordLengthFn() 함수를 정의하는 방법을 보여줍니다.

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

DoFn의 하위 클래스는 입력 요소를 실제로 사용하기 위해 코드를 제공하는 요소 처리 메소드, processElement를 재정의해야 합니다. 다음 코드 샘플은 완전한 ComputeWordLengthFn()을 보여줍니다.

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

입력 컬렉션에서 요소를 수동으로 추출할 필요는 없습니다. Dataflow 자바 SDK가 각 요소를 추출하고 DoFn 하위 클래스에 전달합니다. processElement를 재정의할 때, 재정의 메소드가 ProcessContext 유형의 객체를 적용해야 처리할 요소에 액세스할 수 있습니다. ProcessContext.element() 메소드를 사용하여 DoFn에 전달되는 요소에 액세스합니다.

PCollection에 있는 요소가 키-값 쌍인 경우에는 ProcessContext.element().getKey()를 사용하여 키에 액세스하고, ProcessContext.element().getValue()를 사용하여 값에 액세스할 수 있습니다.

자바

자바용 Dataflow SDK는 출력 요소를 결과 PCollection에 수집하는 것을 자동으로 처리합니다. ProcessContext 객체를 사용하여 processElement의 결과 요소를 출력 컬렉션에 출력합니다. 결과 컬렉션의 요소를 출력하려면 ProcessContext.output() 메소드를 사용하세요.

간단한 DoFn

Dataflow SDK는 DoFn 구현을 간단하게 제공할 수 있는 언어별 방법을 제공합니다.

자바

종종 ParDo에 간단한 DoFn 인수를 익명 내부 클래스 인스턴스로 생성할 수 있습니다. DoFn이 몇 줄 되지 않는 경우에는 줄 안에 지정하는 것이 더 깔끔할 수 있습니다. 다음 코드 샘플은 ParDoComputeWordLengthFn 함수와 함께 익명 DoFn으로 적용하는 방법을 보여줍니다.

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

요소당 정확히 하나의 출력을 생성하기 위해 입력의 각 요소에 함수를 적용하는 위와 같은 변환에서는 더 높은 레벨의 MapElements 변환을 사용할 수 있습니다. 이 방법은 자바 8에서 특히 간결하며, MapElements가 람다 함수를 적용하기 때문입니다.

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

마찬가지로, 자바 8 람다 함수를 Filter, FlatMapElements, Partition 변환과 함께 사용할 수 있습니다. 이러한 변환에 대한 자세한 내용은 Dataflow SDK에 미리 작성된 변환을 참조하세요.

변환 이름

변환 이름은 Dataflow 모니터링 인터페이스에서 파이프라인을 볼 때 실행 그래프에 나타납니다. 그래프에서 인식하기 위해 변환에 명시적인 이름을 지정하는 것이 특히 중요합니다.

자바

.named 작업은 파이프라인의 이 단계에서 변환 이름을 지정합니다. 변환 이름은 Dataflow 모니터링 인터페이스에서 파이프라인을 볼 때 실행 그래프에 나타납니다. 모니터링 인터페이스에서 단계에 대해 읽기 쉬운 이름을 볼 수 있도록 익명 DoFn 인스턴스를 ParDo와 함께 사용할 때 명시적 이름을 지정하는 것이 특히 중요합니다.

부차 입력

주 입력 PCollection 외에도 부차 입력의 형태로 추가적인 입력을 ParDo 변환에 제공할 수 있습니다. 부차 입력은 DoFn이 입력 PCollection에서 요소를 처리할 때마다 액세스할 수 있는 추가적인 입력입니다. 부차 입력을 지정하면 각 요소를 처리하는 동안 ParDo 변환의 DoFn 내에서 읽을 수 있는 일부 다른 데이터 뷰를 생성합니다.

부차 입력은 입력 PCollection의 각 요소 처리 시 ParDo가 추가 데이터를 입력해야 하지만 런타임에서 추가 데이터를 결정해야 하는 경우(그리고 하드 코딩되지 않은 경우)에 유용합니다. 이러한 값은 입력 데이터에 의해 결정되거나 파이프라인의 다른 브랜치에 달려 있습니다. 예를 들어 파이프라인이 실행되는 동안에 원격 서비스로부터 값을 가져와서 부차 입력으로 사용할 수 있습니다. 또는 파이프라인의 개별 브랜치에서 계산된 값을 사용하여 다른 브랜치의 ParDo에 부차 입력으로 추가할 수 있습니다.

부차 입력 표현

자바

부차 입력은 항상 PCollectionView 유형입니다. PCollectionViewPCollectionParDo에 부착 입력으로 전달할 수 있는 단일 항목으로 표현하기 위한 방법입니다. PCollection을 다음 유형 중 하나로 표현하는 PCollectionView를 만들 수 있습니다.

보기 유형 용도
View.asSingleton PCollection을 개별 값으로 표현합니다. 일반적으로 Combine.globally를 사용하여 PCollection을 결합한 후에 이 보기를 사용합니다. 부차 입력이 단일 계산 값일 때 이 보기를 사용합니다. 일반적으로 Combine.globally(...).asSingletonView()를 사용하여 싱글톤 보기를 생성해야 합니다.
View.asList PCollectionList로 표현합니다. 부차 입력이 개별 값의 컬렉션일 때 이 보기를 사용합니다.
View.asMap PCollectionMap으로 표현합니다. 부차 입력이 키-값 쌍(PCollection<K, V>)으로 구성되어 있고 각 키에 대한 값이 하나일 때 이 보기를 사용합니다.
View.asMultimap PCollectionMultiMap으로 표현합니다. 부차 입력이 키-값 쌍(PCollection<K, V>)으로 구성되어 있고 각 키에 대한 값이 여러 개일 때 이 보기를 사용합니다.

참고: 다른 파이프라인 데이터와 마찬가지로 PCollectionView는 생성되면 변경될 수 없습니다.

ParDo에 부차 입력 전달

자바

.withSideInputs를 호출하여 ParDo 변환에 부차 입력을 전달합니다. DoFn 내부에서 DoFn.ProcessContext.sideInput 메소드를 사용하여 부차 입력에 액세스합니다.

다음 코드 예시는 PCollection<Integer>로부터 싱글톤 부차 입력을 생성하여 후속 ParDo에 전달합니다.

이 예시에는 개별 단어의 컬렉션을 나타내는 words라는 PCollection<String> 그리고 단어 길이를 나타내는 PCollection<Integer>가 있습니다. 후자를 사용하여 최대 단어 길이 컷오프를 싱글톤 값으로 계산한 후에 컷오프를 기준으로 words를 필터링하는 ParDo에 해당 계산 값을 부차 입력으로 전달할 수 있습니다.

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

부차 입력 및 윈도우

무한이고 단일 값(또는 단일 컬렉션 클래스)으로 압축될 수 없는 윈도우된 PCollectionPCollectionView를 만드는 경우, PCollectionView윈도우별 단일 개체를 나타냅니다. 즉, PCollectionView는 윈도우별로 하나의 싱글톤, 윈도우별로 하나의 목록 등을 나타냅니다.

Dataflow는 주 입력 요소의 윈도우를 사용하여 부차 입력 요소에 대해 적절한 윈도우를 찾습니다. Dataflow는 주 입력 요소의 윈도우를 부차 입력의 윈도우 세트에 반영한 후에 결과 윈도우의 부차 입력을 사용합니다. 주 입력과 부차 입력의 윈도우가 동일한 경우에는 정확한 윈도우가 반영됩니다. 하지만 두 입력의 윈도우가 다른 경우에는 Dataflow가 가장 적절한 부차 입력 윈도우를 선택합니다.

자바

예를 들어 주 입력이 1분이라는 고정 시간 윈도우로 설정되어 있고 부차 입력이 1시간이라는 고정 시간 윈도우로 설정된 경우, Dataflow는 주 입력 윈도우를 부차 입력 윈도우 집합에 반영하고 1시간 길이의 부차 입력 윈도우에서 부차 입력 값을 선택합니다.

부차 입력에서 여러 개의 트리거가 실행되는 경우, Dataflow는 가장 최근에 실행된 트리거의 값을 사용합니다. 이는 부차 입력을 단일 전역 윈도우와 함께 사용하고 트리거를 지정하는 경우에 특히 유용합니다.

부차 출력

ParDo는 항상 apply의 반환 값으로 주 출력 PCollection을 생성하지만, ParDo가 추가적인 출력 PCollection을 생성하도록 할 수도 있습니다. 여러 번 출력되도록 선택한 경우에는 ParDo가 모든 출력 PCollection(주 출력 포함)을 함께 번들링하여 반환합니다. 예를 들어, 자바에서는 출력 PCollection이 유형에 구애를 받지 않는 PCollectionTuple로 번들링됩니다.

부차 출력을 위한 태그

자바

부차 출력 PCollection에 요소를 내보내려면 ParDo가 생성하는 각 컬렉션을 식별하기 위한 TupleTag 객체를 생성해야 합니다. 예를 들어 ParDo가 세 개의 출력 PCollection(주 출력과 두 개의 부차 출력)을 생성하는 경우 세 개의 연관된 TupleTag를 생성해야 합니다.

다음 코드 예시는 주 출력과 두 개의 부차 출력이 있는 ParDo에 대한 TupleTag를 생성하는 방법을 보여줍니다.

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

ParDo에 출력 태그 전달

ParDo 출력에 TupleTag를 지정한 후에는 .withOutputTags를 호출하여 해당 태그를 ParDo에 전달해야 합니다. TupleTagList에서 먼저 주 출력의 태그를 전달한 후에 부차 출력의 태그를 전달합니다.

이전 예시를 바탕으로, 세 개의 TupleTag(주 출력에 대해 한 개, 부차 출력에 대해 두 개)를 ParDo에 전달하는 방법은 다음과 같습니다.

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

모든 출력(주 출력 PCollection 포함)이 PCollectionTuple라는 반환된 results.에 번들링됩니다.

DoFn에서 부차 출력에 내보내기

자바

ParDoDoFn 내부에서 ProcessContext.sideOutput 메소드를 사용하여 부차 출력에 요소를 내보낼 수 있습니다. ProcessContext.sideOutput을 호출할 때 대상 부차 출력 컬렉션에 대해 적절한 TupleTag를 전달해야 합니다.

이전 예시에서 주 출력 및 부차 출력에 내보내는 DoFn은 다음과 같습니다.

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

ParDo 후에는 결과로 생성되는 주 출력 및 부차 출력 PCollection을 반환된 PCollectionTuple에서 추출해야 합니다. 튜플에서 개별 PCollection을 추출하는 방법을 보여주는 몇 가지 예에 대해서는 PCollectionTuple 섹션을 참조하세요.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.