WordCount 예 파이프라인

아직 빠른 시작 단계를 수행하지 않았을 경우 계속하기 전에 수행하세요.

WordCount 예시는 텍스트를 읽고, 텍스트 줄을 개별 단어로 토큰화하고, 각 단어의 출현 빈도를 세는 처리 파이프라인을 설정하는 방법을 보여줍니다. Dataflow SDK는 서로를 기반으로 하는 연속적이고 자세한 4가지 WordCount 예시를 포함합니다. 모든 예시의 입력 텍스트는 셰익스피어 작품의 모음입니다.

각 WordCount 예시는 Dataflow SDK의 서로 다른 개념을 소개합니다.

  • 최소 WordCount는 Dataflow 파이프라인 작성에 관한 기본 원칙을 보여줍니다.
  • WordCount는 재사용 및 유지 보수가 가능한 파이프라인 작성의 가장 일반적인 모범 사례를 보여줍니다.
  • 디버깅 WordCount는 로깅 및 디버깅 방법을 보여줍니다.
  • 기간 설정 WordCount는 Dataflow의 프로그래밍 모델을 사용하여 제한된 데이터 세트와 제한되지 않은 데이터 세트를 처리하는 방법을 보여줍니다.

가장 간단한 예시인 최소 WordCount 이해부터 시작합니다. 파이프라인 작성의 기본 원칙에 익숙해졌다면 WordCount에서 Dataflow 프로그램을 작성하는 모범 사례를 알아봅니다. 그런 다음 디버깅 WordCount를 읽고 일반적인 로깅 및 디버깅 사례를 활용하는 방법을 파악합니다. 마지막으로 기간 설정 WordCount의 유한 데이터 세트 및 무한 데이터 세트에서 동일한 계산 패턴을 사용하는 방법을 배웁니다.

최소 WordCount

최소 WordCount는 Google Cloud Storage의 파일에서 텍스트 블록을 읽고, 변환을 적용하여 단어를 토큰화하고 단어 수를 세고, Cloud Storage 버킷의 출력 파일에 데이터를 작성하는 간단한 파이프라인을 보여줍니다. 이 예시에서는 입출력 파일의 위치를 하드 코딩하고 오류 검사는 수행하지 않으며, Dataflow 파이프라인 작성의 '기본'만을 보여주는 것을 목적으로 합니다. 이후 예시에서는 파이프라인의 입출력 소스를 매개변수화하고 그 외 다른 모범 사례를 살펴보겠습니다.

자바

주요 개념
  1. 파이프라인 생성
  2. 파이프라인에 변환 적용
    • 입력 읽기(이 예시에서는 텍스트 파일 읽기)
    • ParDo 변환 적용
    • SDK에서 제공된 변환 적용(이 예시에서는 Count)
    • 출력 작성(이 예시에서는 Google Cloud Storage에 작성)
  3. 파이프라인 실행

다음 섹션은 이러한 개념을 최소 WordCount 파이프라인에서 발췌한 관련 코드와 함께 자세하게 설명합니다.

파이프라인 생성

Cloud Dataflow 파이프라인을 작성하는 첫 번째 단계는 파이프라인 옵션 객체를 생성하는 것입니다. 이 객체를 사용하면 파이프라인을 실행하는 파이프라인 실행자, 프로젝트 ID, 파이프라인이 파일을 저장할 스테이징 위치(클라우드에서 jar 파일에 액세스하는 데 사용) 등 파이프라인에 다양한 옵션을 설정할 수 있습니다. 이 예시에서는 이러한 옵션을 프로그래매틱 방식으로 설정하지만 명령줄 인수를 통해 파이프라인 옵션을 설정하는 경우가 더 보편적입니다.

이 예시에서는 BlockingDataflowPipelineRunnerPipelineRunner로 설정하여 Google Cloud Dataflow 서비스를 통해 파이프라인을 클라우드에서 실행합니다. 클라우드에서 파이프라인을 실행할 때 다른 옵션을 설정할 수도 있습니다. 또는 이 옵션을 완전히 생략할 수도 있습니다. 이 경우 기본 실행자는 로컬에서 파이프라인을 실행합니다. 다음 두 가지 WordCount 예에서 이러한 예를 확인할 수 있으며, 실행 매개변수 지정에서 자세히 설명합니다.

자바

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

다음 단계는 방금 구성한 옵션으로 Pipeline 객체를 생성하는 것입니다. Pipeline 객체는 이 파이프라인과 연관되어 실행되는 변환 그래프를 작성합니다.

자바

Pipeline p = Pipeline.create(options);

파이프라인 객체와 작동 방식에 대한 자세한 내용은 파이프라인을 참조하세요.

파이프라인 변환 적용

최소 WordCount 파이프라인에는 파이프라인으로 데이터를 읽고, 데이터를 조작 또는 변환하고, 결과물을 작성하는 여러 변환이 포함되어 있습니다. 각 변환은 파이프라인에서 수행하는 작업을 나타냅니다.

각 변환은 데이터 등의 입력을 취해 출력 데이터를 생성합니다. 입력 데이터 및 출력 데이터는 SDK 클래스 PCollection으로 나타납니다. PCollection은 Dataflow SDK가 제공하는 특수한 클래스로 유한 데이터 세트를 포함하여 거의 모든 크기의 데이터 세트를 나타내기 위해 사용할 수 있습니다.

그림 1은 파이프라인의 데이터 흐름을 보여줍니다.

파이프라인은 TextIO.Read 변환을 사용하여 입력 데이터 파일에 저장된 데이터에서 PCollection을 생성합니다. CountWords 변환은 원시 텍스트 PCollection에서 단어 개수 PCollection을 생성합니다. TextIO.Write는 형식을 지정한 단어 개수를 출력 데이터 파일로 작성합니다.
그림 1: 파이프라인 데이터 흐름

최소 WordCount 파이프라인에는 다섯 가지 변환이 포함되어 있습니다.

  1. 텍스트 파일 Read 변환이 Pipeline 객체 자체에 적용되고 출력으로 PCollection을 생성합니다. 출력 PCollection의 각 요소는 입력 파일의 텍스트 한 줄을 나타냅니다.
  2. 자바

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. 텍스트 줄을 개별 단어로 토큰화하는 각 요소에서 DoFn(익명 클래스로 인라인으로 정의됨)을 호출하는 ParDo 변환입니다. 이 변환의 입력은 이전 TextIO.Read 변환에서 생성된 텍스트 줄의 PCollection입니다. ParDo 변환은 각 요소가 텍스트 내의 개별 단어를 나타내는 새로운 PCollection을 출력합니다.
  4. 자바

    이 예에서 수행한 .named() 작업을 사용하면 Dataflow 모니터링 인터페이스에 표시되는 변환 이름을 변환에 지정할 수 있습니다. Dataflow 서비스가 파이프라인을 실행할 때 모니터링 인터페이스는 각 ParDo 변환이 실행되는 시점을 나타냅니다.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. SDK에서 제공하는 Count 변환은 모든 유형의 PCollection을 가져와 키-값 쌍의 PCollection을 반환하는 일반 변환입니다. 각 키는 입력 컬렉션의 고유 요소를 나타내며, 각 값은 입력 컬렉션에서 해당 키가 등장하는 횟수를 나타냅니다.

    이 파이프라인에서 Count의 입력은 기존 ParDo에서 생성된 개별 단어의 PCollection이고, 출력은 각 키가 텍스트의 고유 단어를 나타내고 각 단어의 등장 횟수가 관련 값으로 나타난 키-값 쌍의 PCollection입니다.
  6. 자바

      .apply(Count.<String>perElement())
    
  7. 다음은 각 고유 단어와 등장 횟수를 나타내는 키-값 쌍을 출력 파일에 작성하기 적합한 인쇄 가능한 문자열로 형식을 지정하는 변환입니다.
  8. 자바

    MapElements는 간단한 ParDo를 캡슐화하는 상위 수준 복합 변환입니다. 입력 PCollection의 각 요소에 대해 MapElements는 출력 요소 하나만 생성하는 함수를 적용합니다. 이 MapElements는 형식을 지정하는 SimpleFunction(익명 클래스로 인라인으로 정의됨)을 호출합니다. 입력으로서 이 MapElementsCount로 생성한 키-값 쌍의 PCollection을 가지고 인쇄 가능한 문자열의 새로운 PCollection을 생성합니다.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. 텍스트 파일 Write입니다. 이 변환은 형식이 지정된 String의 최종 PCollection을 입력으로 취하고 각 요소를 출력 텍스트 파일로 작성합니다. 입력 PCollection의 각 요소는 결과 출력 파일의 한 줄을 나타냅니다.
  10. 자바

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Write 변환은 PDone 유형의 사소한 결과 값을 생성하며, 이 경우에서 이 값은 무시됩니다.

파이프라인 실행

run 메소드를 호출하여 파이프라인을 실행합니다. 그러면 파이프라인 생성 시 지정한 파이프라인 실행자로 실행될 파이프라인이 전송됩니다.

자바

p.run();

WordCount 예시

WordCount 예시에서는 파이프라인의 읽기, 작성, 유지를 간편하게 하는 권장 프로그래밍 사례를 알아봅니다. 이러한 사례는 반드시 필요하지는 않지만 파이프라인을 보다 유연하게 실행하고, 파이프라인 테스트를 지원하고, 파이프라인 코드를 재사용하는 데 도움을 줍니다.

이 섹션에서는 파이프라인 작성의 기본 개념을 잘 이해하고 있다고 가정하겠습니다. 아직 기본 개념을 숙지하지 못한 경우 이전 섹션인 최소 WordCount를 읽어보세요.

자바

새로운 개념
  1. 명시적 DoFn으로 ParDo 적용
  2. 복합 변환 생성
  3. 매개변수화 가능한 PipelineOptions 사용

다음 섹션에서는 이러한 핵심 개념을 자세히 살펴보고 파이프라인 코드를 세부 섹션으로 나누어 설명합니다.

명시적 DoFn 지정

ParDo 변환 사용 시 입력 PCollection의 각 요소에 적용되는 처리 작업을 지정해야 합니다. 이 처리 작업은 SDK 클래스 DoFn의 하위 클래스입니다. 이전 섹션(최소 WordCount)의 예시 파이프라인은 각 ParDoDoFn 하위 클래스를 인라인으로 익명 내부 클래스 인스턴스로 생성합니다.

하지만 DoFn을 전역 수준에서 정의하면 단위 테스트가 간편해지고 ParDo 코드를 더 쉽게 읽을 수 있습니다.

이전 예시(최소 WordCount)에서 언급했듯이 파이프라인을 실행할 때 Dataflow 모니터링 인터페이스는 각 ParDo 변환이 실행되는 시점을 나타냅니다. Dataflow 서비스는 ParDo 변환의 변환 이름을 전달한 DoFn 이름에서 자동으로 생성합니다. 예를 들어 FormatAsTextFn()이 적용되는 ParDo는 모니터링 인터페이스에 ParDo(FormatAsText)로 나타납니다.

자바

이 예시에서 DoFn은 정적 클래스로 정의됩니다.

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

ParDo 변환에 DoFn 하위 클래스를 만들고 지정하는 방법에 대한 자세한 내용은 ParDo를 사용한 병렬 처리를 참조하세요.

복합 변환 생성

여러 변환 또는 ParDo 단계로 구성된 처리 작업이 있을 경우, 이를 PTransform의 하위 클래스로 생성할 수 있습니다. PTransform 하위 클래스를 만들면 복잡하고 재사용 가능한 변환을 만들고, 파이프라인의 구조를 더 명확하고 모듈화되도록 할 수 있으며, 손쉽게 단위 테스트를 수행할 수 있습니다.

PTransform 하위 클래스를 사용하여 파이프라인의 논리적 구조를 명시하면 파이프라인 모니터링도 간편해집니다. Dataflow 서비스가 파이프라인의 최종 최적화 구조를 작성할 때 Dataflow 모니터링 인터페이스는 작성한 변환을 사용하여 파이프라인의 구조를 더욱 정확하게 반영합니다.

자바

이 예시에서는 변환 두 개가 PTransform 하위 클래스 CountWords로 캡슐화됩니다. CountWords에는 ExtractWordsFn과 SDK에서 제공한 Count 변환을 실행하는 ParDo가 포함되어 있습니다.

CountWords가 정의되면 최종 입력 및 출력을 지정합니다. 입력은 추출 작업의 PCollection<String>이고, 출력은 개수 계산 작업에서 생성한 PCollection<KV<String, Long>>입니다.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

매개변수화 가능한 PipelineOptions 사용

이전 예시인 최소 WordCount에서는 파이프라인을 생성할 때 다양한 실행 옵션을 설정했습니다. 이 예시에서는 PipelineOptions를 확장하여 자체적인 커스텀 구성 옵션을 정의하겠습니다.

명령줄 파서에서 처리할 자체 인수를 추가하고 해당 인수의 기본값을 지정할 수 있습니다. 그런 다음 파이프라인 코드의 옵션 값에 액세스할 수 있습니다.

최소 WordCount 예시에서는 파이프라인 옵션을 하드 코딩했습니다. 하지만 명령줄 인수 파싱을 통해 PipelineOptions를 구성하는 것이 가장 일반적입니다.

자바

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

디버깅 WordCount 예시

디버깅 WordCount 예시에서는 파이프라인 코드를 검사하는 모범 사례를 살펴보겠습니다. Dataflow 모니터링 인터페이스애그리게이터를 사용하여 실행 중인 파이프라인에 대한 가시성을 추가로 확보할 수 있습니다.

자바

또한 SDK의 DataflowAssert를 사용하여 파이프라인의 서로 다른 단계에서 변환의 출력을 테스트할 수 있습니다.

자바

새로운 개념
  1. Dataflow 모니터링 인터페이스에서 로그 확인
  2. Dataflow 작업자 로그 수준 제어
  3. Aggregators 만들기
  4. DataflowAssert를 통한 파이프라인 테스트

다음 섹션에서는 이러한 핵심 개념을 자세히 살펴보고 파이프라인 코드를 세부 섹션으로 나누어 설명합니다.

Dataflow 모니터링 인터페이스에서 로그 확인

Google Cloud Logging은 모든 Dataflow 작업자의 로그를 Google Cloud Platform Console의 단일 위치에 집계합니다. 그런 다음 Dataflow 모니터링 인터페이스를 사용하여 Dataflow에서 생성한 모든 Compute Engine 인스턴스의 로그를 확인하고 Dataflow 작업을 완료할 수 있습니다. 파이프라인의 DoFn 인스턴스에 파이프라인이 실행될 때 모니터링 인터페이스에 표시되는 로깅 구문을 추가할 수 있습니다.

자바

다음 SLF4J 로거는 로거 이름으로 정규화된 FilterTextFn 클래스 이름을 사용합니다. 이 로거에서 출력한 모든 로그 구문은 이 이름으로 참조되며 적절한 로그 수준 설정에 따라 Dataflow 모니터링 인터페이스에 표시됩니다.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Dataflow 작업자 로그 수준 제어

자바

사용자 코드를 실행하는 Dataflow 작업자는 기본적으로 INFO 로그 수준 이상에서 Cloud Logging에 로깅되도록 구성됩니다. 다음 내용을 지정하여 특정 로깅 네임스페이스의 로그 수준을 재정의할 수 있습니다.

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

예를 들어 파이프라인을 실행할 때 Dataflow 서비스로 다음 내용을 지정한다고 가정합니다.

--workerLogLevelOverrides={"com.example":"DEBUG"}

Dataflow 서비스를 사용하여 이 파이프라인을 실행하면 모니터링 인터페이스에 기본 INFO 수준 이상의 로그 외에 com.example 패키지의 DEBUG 수준 이상의 로그만 포함됩니다.

또한 다음 내용을 지정하여 기본 Dataflow 작업자 로깅 구성을 재정의할 수 있습니다.

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

예를 들어 파이프라인을 실행할 때 Dataflow 서비스로 다음 내용을 지정한다고 가정합니다.

--defaultWorkerLogLevel=DEBUG

Dataflow 서비스를 사용하여 이 파이프라인을 실행하면 모니터링 인터페이스DEBUG 수준 이상의 로그를 모두 포함합니다. 기본 작업자 로그 수준을 TRACE 또는 DEBUG로 변경하면 로그 정보의 양이 크게 증가합니다.

자세한 내용은 Cloud Dataflow에서 로깅을 참조하세요.

애그리게이터 생성

커스텀 애그리게이터는 파이프라인이 실행될 때 값을 추적할 수 있습니다. 이러한 값은 Dataflow 서비스에서 파이프라인을 실행할 때 Dataflow 모니터링 인터페이스에 표시됩니다.

애그리게이터는 시스템에서 애그리게이터를 생성하는 ParDo 변환을 실행하거나 초기 값이 변경될 때까지 표시되지 않을 수 있습니다. 애그리게이터는 모니터링 인터페이스의 작업 요약 하단에서 확인할 수 있습니다.

아래의 커스텀 애그리게이터는 일치하는 단어와 일치하지 않는 단어의 수를 추적합니다.

자바

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

배치 및 스트리밍 파이프라인의 애그리게이터

배치 파이프라인의 애그리게이터는 일관성을 제공합니다. 이 애그리게이터는 성공적인 번들에서는 딱 한 번 커밋되고 실패한 번들에서는 커밋되지 않습니다.

스트리밍 파이프라인에서 애그리게이터는 보다 관대한 시맨틱스를 제공합니다. 성공적인 번들은 최고 효율에 기여하고 실패한 번들은 최종 값에 반영될 수 있습니다.

DataflowAssert를 통한 파이프라인 테스트

자바

DataflowAssert는 파이프라인 수준 테스트를 작성하여 PCollections 콘텐츠의 유효성을 검사할 때 사용할 수 있는 Hamcrest 컬렉션 일치자 스타일의 편리한 PTransforms 집합입니다. DataflowAssert는 작은 데이터 세트를 단위 테스트할 때 사용하는 것이 가장 좋지만 여기서는 교육용 도구로 사용하겠습니다.

다음 사례에서는 필터링한 단어 세트가 예측 개수와 일치하는지 확인합니다. DataflowAssert는 출력을 제공하지 않으며 파이프라인이 성공적으로 완료되면 예측이 적중했다는 것을 의미합니다. 파이프라인 테스트 방법에 대해 자세히 알아보고 DebuggingWordCountTest에서 예 단위 테스트를 참조하세요.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

자바

이 예시 WindowedWordCount에서는 이전 예시처럼 텍스트의 단어 수를 계산하지만 여러 고급 개념을 도입합니다. WindowedWordCount의 입력은 이전 예시와 같은 고정 데이터 세트 또는 제한되지 않은 데이터 스트림입니다.

Dataflow SDK는 제한된 유형과 제한되지 않은 유형의 입력을 모두 처리하는 단일 파이프라인을 생성할 수 있다는 점에서 편리합니다. 입력이 제한되지 않을 경우 파이프라인의 모든 PCollections도 제한되지 않습니다. 제한된 입력의 경우 PCollection도 제한됩니다.

이 섹션을 읽기 전에 파이프라인 작성의 기본 원칙을 잘 이해하고 숙지하도록 하세요.

새로운 개념
  1. 제한되지 않은 입력 및 제한된 입력 읽기
  2. 데이터에 타임스탬프 추가
  3. 기간 설정
  4. 제한되지 않은 출력 및 제한된 출력 작성

다음 섹션에서는 이러한 핵심 개념을 자세히 살펴보고 파이프라인 코드를 세부 섹션으로 나누어 설명합니다.

제한되지 않은 입력 및 제한된 입력 읽기

WindowedWordCount 입력은 제한된 입력 또는 제한되지 않은 입력일 수 있습니다. 입력에 고정된 개수의 요소가 있을 경우 '제한된' 데이터 세트로 간주됩니다. 입력이 지속적으로 업데이트될 경우 '제한되지 않은' 데이터 세트로 간주됩니다. 입력 유형에 대한 자세한 내용은 제한된 PCollection과 제한되지 않은 PCollection을 참조하세요.

이 예에서는 입력의 제한 여부를 선택할 수 있습니다. 모든 예시의 입력은 셰익스피어 작품의 모음이라는 것을 기억해 봅시다. 셰익스피어 작품은 유한한 제한된 입력입니다. 하지만 이 예시에서는 새로운 개념을 설명하기 위해 입력에 셰익스피어 작품의 재연을 사용합니다.

이 예에서 입력이 제한되지 않을 경우, Google Cloud Pub/Sub 주제에서 입력을 읽습니다. 이 경우, 파이프라인에 적용되는 Read 변환은 PubSubIO.Read입니다. 그렇지 않을 경우 입력은 Google Cloud Storage에서 읽습니다.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

데이터에 타임스탬프 추가

PCollection의 각 요소에는 연결된 타임스탬프가 있습니다. 각 요소의 타임스탬프는 PCollection을 생성하는 소스에서 지정됩니다. 이 예시에서는 파이프라인 입력으로 제한되지 않은 입력을 선택한 경우 타임스탬프를 Pub/Sub 데이터 소스에서 가져옵니다. 제한된 입력을 선택할 경우 ParDo에서 호출한 AddTimestampsFn이라는 이름의 DoFn 메소드가 PCollection의 각 요소에 타임스탬프를 설정합니다.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

다음은 ParDo에서 호출한 DoFnAddTimestampsFn의 코드입니다. 이 코드는 요소 자체에서 지정한 타임스탬프의 데이터 요소를 설정합니다. 예를 들어 요소가 로그 줄일 경우 이 ParDo는 로그 문자열에서 시간을 파싱하여 요소의 타임스탬프로 설정합니다. 셰익스피어 작품에는 자체 타임스탬프가 없으므로 여기에서는 개념을 설명하기 위해 타임스탬프를 임의로 만들었습니다. 입력 텍스트의 각 줄에는 2시간이라는 기간 내에 무작위로 관련 타임스탬프가 부여됩니다.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

타임스탬프에 대한 자세한 내용은 PCollection 요소 타임스탬프를 참조하세요.

기간 설정

Dataflow SDK는 윈도우라는 개념을 사용하여 PCollection을 개별 요소의 타임스탬프에 맞게 세분화합니다. 여러 요소를 집계하는 Dataflow 변환은 전체 컬렉션의 크기가 무한한 경우(제한되지 않음)에도 각 PCollection을 여러 개의 유한한 기간의 연속으로 처리합니다.

WindowingWordCount 예시는 고정 기간 설정을 적용합니다. 여기서 각 기간은 고정된 시간 간격을 나타냅니다. 이 예시에서 고정 기간 크기는 기본적으로 1분입니다(명령줄 옵션으로 변경 가능). 그런 다음 파이프라인은 CountWords 변환을 적용합니다.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

제한되지 않은 출력 및 제한된 출력 작성

이 예시에서 입력이 제한된 입력 또는 제한되지 않은 입력일 수 있듯이 출력 PCollection에서도 마찬가지로 적절한 출력 싱크를 선택해야 합니다. 일부 출력 싱크는 제한된 출력 또는 제한되지 않은 출력 중 하나만을 지원합니다. 예를 들어 텍스트 파일은 제한된 데이터만을 수신할 수 있는 싱크입니다. BigQuery 출력 소스는 제한된 입력과 제한되지 않은 입력을 모두 지원합니다.

이 예시에서는 결과를 BigQuery 테이블로 스트리밍합니다. 그러면 결과가 BigQuery 테이블에 대해 형식이 지정되고 BigQueryIO.Write를 통해 BigQuery에 작성됩니다.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.