아직 빠른 시작 단계를 수행하지 않았을 경우 계속하기 전에 수행하세요.
WordCount 예시는 텍스트를 읽고, 텍스트 줄을 개별 단어로 토큰화하고, 각 단어의 출현 빈도를 세는 처리 파이프라인을 설정하는 방법을 보여줍니다. Dataflow SDK는 서로를 기반으로 하는 연속적이고 자세한 4가지 WordCount 예시를 포함합니다. 모든 예시의 입력 텍스트는 셰익스피어 작품의 모음입니다.
각 WordCount 예시는 Dataflow SDK의 서로 다른 개념을 소개합니다.
- 최소 WordCount는 Dataflow 파이프라인 작성에 관한 기본 원칙을 보여줍니다.
- WordCount는 재사용 및 유지 보수가 가능한 파이프라인 작성의 가장 일반적인 모범 사례를 보여줍니다.
- 디버깅 WordCount는 로깅 및 디버깅 방법을 보여줍니다.
- 기간 설정 WordCount는 Dataflow의 프로그래밍 모델을 사용하여 제한된 데이터 세트와 제한되지 않은 데이터 세트를 처리하는 방법을 보여줍니다.
가장 간단한 예시인 최소 WordCount 이해부터 시작합니다. 파이프라인 작성의 기본 원칙에 익숙해졌다면 WordCount에서 Dataflow 프로그램을 작성하는 모범 사례를 알아봅니다. 그런 다음 디버깅 WordCount를 읽고 일반적인 로깅 및 디버깅 사례를 활용하는 방법을 파악합니다. 마지막으로 기간 설정 WordCount의 유한 데이터 세트 및 무한 데이터 세트에서 동일한 계산 패턴을 사용하는 방법을 배웁니다.
최소 WordCount
최소 WordCount는 Google Cloud Storage의 파일에서 텍스트 블록을 읽고, 변환을 적용하여 단어를 토큰화하고 단어 수를 세고, Cloud Storage 버킷의 출력 파일에 데이터를 작성하는 간단한 파이프라인을 보여줍니다. 이 예시에서는 입출력 파일의 위치를 하드 코딩하고 오류 검사는 수행하지 않으며, Dataflow 파이프라인 작성의 '기본'만을 보여주는 것을 목적으로 합니다. 이후 예시에서는 파이프라인의 입출력 소스를 매개변수화하고 그 외 다른 모범 사례를 살펴보겠습니다.
자바
- 파이프라인 생성
- 파이프라인에 변환 적용
- 입력 읽기(이 예시에서는 텍스트 파일 읽기)
ParDo
변환 적용- SDK에서 제공된 변환 적용(이 예시에서는
Count
) - 출력 작성(이 예시에서는 Google Cloud Storage에 작성)
- 파이프라인 실행
다음 섹션은 이러한 개념을 최소 WordCount 파이프라인에서 발췌한 관련 코드와 함께 자세하게 설명합니다.
파이프라인 생성
Cloud Dataflow 파이프라인을 작성하는 첫 번째 단계는 파이프라인 옵션 객체를 생성하는 것입니다. 이 객체를 사용하면 파이프라인을 실행하는 파이프라인 실행자, 프로젝트 ID, 파이프라인이 파일을 저장할 스테이징 위치(클라우드에서 jar 파일에 액세스하는 데 사용) 등 파이프라인에 다양한 옵션을 설정할 수 있습니다. 이 예시에서는 이러한 옵션을 프로그래매틱 방식으로 설정하지만 명령줄 인수를 통해 파이프라인 옵션을 설정하는 경우가 더 보편적입니다.
이 예시에서는 BlockingDataflowPipelineRunner
를 PipelineRunner
로 설정하여 Google Cloud Dataflow 서비스를 통해 파이프라인을 클라우드에서 실행합니다. 클라우드에서 파이프라인을 실행할 때 다른 옵션을 설정할 수도 있습니다. 또는 이 옵션을 완전히 생략할 수도 있습니다. 이 경우 기본 실행자는 로컬에서 파이프라인을 실행합니다. 다음 두 가지 WordCount 예에서 이러한 예를 확인할 수 있으며, 실행 매개변수 지정에서 자세히 설명합니다.
자바
DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(BlockingDataflowPipelineRunner.class); options.setProject("SET-YOUR-PROJECT-ID-HERE"); // The 'gs' URI means that this is a Google Cloud Storage path options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");
다음 단계는 방금 구성한 옵션으로 Pipeline
객체를 생성하는 것입니다. Pipeline
객체는 이 파이프라인과 연관되어 실행되는 변환 그래프를 작성합니다.
자바
Pipeline p = Pipeline.create(options);
파이프라인 객체와 작동 방식에 대한 자세한 내용은 파이프라인을 참조하세요.
파이프라인 변환 적용
최소 WordCount 파이프라인에는 파이프라인으로 데이터를 읽고, 데이터를 조작 또는 변환하고, 결과물을 작성하는 여러 변환이 포함되어 있습니다. 각 변환은 파이프라인에서 수행하는 작업을 나타냅니다.
각 변환은 데이터 등의 입력을 취해 출력 데이터를 생성합니다.
입력 데이터 및 출력 데이터는 SDK 클래스 PCollection
으로 나타납니다.
PCollection은 Dataflow SDK가 제공하는 특수한 클래스로 유한 데이터 세트를 포함하여 거의 모든 크기의 데이터 세트를 나타내기 위해 사용할 수 있습니다.
그림 1은 파이프라인의 데이터 흐름을 보여줍니다.

최소 WordCount 파이프라인에는 다섯 가지 변환이 포함되어 있습니다.
- 텍스트 파일 Read 변환이
Pipeline
객체 자체에 적용되고 출력으로PCollection
을 생성합니다. 출력PCollection
의 각 요소는 입력 파일의 텍스트 한 줄을 나타냅니다. - 텍스트 줄을 개별 단어로 토큰화하는 각 요소에서
DoFn
(익명 클래스로 인라인으로 정의됨)을 호출하는 ParDo 변환입니다. 이 변환의 입력은 이전TextIO.Read
변환에서 생성된 텍스트 줄의PCollection
입니다.ParDo
변환은 각 요소가 텍스트 내의 개별 단어를 나타내는 새로운PCollection
을 출력합니다. - SDK에서 제공하는
Count
변환은 모든 유형의PCollection
을 가져와 키-값 쌍의PCollection
을 반환하는 일반 변환입니다. 각 키는 입력 컬렉션의 고유 요소를 나타내며, 각 값은 입력 컬렉션에서 해당 키가 등장하는 횟수를 나타냅니다.
이 파이프라인에서Count
의 입력은 기존ParDo
에서 생성된 개별 단어의PCollection
이고, 출력은 각 키가 텍스트의 고유 단어를 나타내고 각 단어의 등장 횟수가 관련 값으로 나타난 키-값 쌍의PCollection
입니다. - 다음은 각 고유 단어와 등장 횟수를 나타내는 키-값 쌍을 출력 파일에 작성하기 적합한 인쇄 가능한 문자열로 형식을 지정하는 변환입니다.
- 텍스트 파일 Write입니다. 이 변환은 형식이 지정된
String
의 최종PCollection
을 입력으로 취하고 각 요소를 출력 텍스트 파일로 작성합니다. 입력PCollection
의 각 요소는 결과 출력 파일의 한 줄을 나타냅니다.
자바
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
자바
이 예에서 수행한 .named()
작업을 사용하면 Dataflow 모니터링 인터페이스에 표시되는 변환 이름을 변환에 지정할 수 있습니다. Dataflow 서비스가 파이프라인을 실행할 때 모니터링 인터페이스는 각 ParDo
변환이 실행되는 시점을 나타냅니다.
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); } } } }))
자바
.apply(Count.<String>perElement())
자바
MapElements
는 간단한 ParDo를 캡슐화하는 상위 수준 복합 변환입니다. 입력 PCollection
의 각 요소에 대해 MapElements
는 출력 요소 하나만 생성하는 함수를 적용합니다. 이 MapElements
는 형식을 지정하는 SimpleFunction
(익명 클래스로 인라인으로 정의됨)을 호출합니다. 입력으로서 이 MapElements
는 Count
로 생성한 키-값 쌍의 PCollection
을 가지고 인쇄 가능한 문자열의 새로운 PCollection
을 생성합니다.
.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> element) { return element.getKey() + ": " + element.getValue(); } }))
자바
.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
Write
변환은 PDone
유형의 사소한 결과 값을 생성하며, 이 경우에서 이 값은 무시됩니다.
파이프라인 실행
run
메소드를 호출하여 파이프라인을 실행합니다. 그러면 파이프라인 생성 시 지정한 파이프라인 실행자로 실행될 파이프라인이 전송됩니다.
자바
p.run();
WordCount 예시
이 WordCount 예시에서는 파이프라인의 읽기, 작성, 유지를 간편하게 하는 권장 프로그래밍 사례를 알아봅니다. 이러한 사례는 반드시 필요하지는 않지만 파이프라인을 보다 유연하게 실행하고, 파이프라인 테스트를 지원하고, 파이프라인 코드를 재사용하는 데 도움을 줍니다.
이 섹션에서는 파이프라인 작성의 기본 개념을 잘 이해하고 있다고 가정하겠습니다. 아직 기본 개념을 숙지하지 못한 경우 이전 섹션인 최소 WordCount를 읽어보세요.
자바
- 명시적
DoFn
으로ParDo
적용 - 복합 변환 생성
- 매개변수화 가능한
PipelineOptions
사용
다음 섹션에서는 이러한 핵심 개념을 자세히 살펴보고 파이프라인 코드를 세부 섹션으로 나누어 설명합니다.
명시적 DoFn 지정
ParDo
변환 사용 시 입력 PCollection
의 각 요소에 적용되는 처리 작업을 지정해야 합니다. 이 처리 작업은 SDK 클래스 DoFn
의 하위 클래스입니다. 이전 섹션(최소 WordCount)의 예시 파이프라인은 각 ParDo
의 DoFn
하위 클래스를 인라인으로 익명 내부 클래스 인스턴스로 생성합니다.
하지만 DoFn
을 전역 수준에서 정의하면 단위 테스트가 간편해지고 ParDo
코드를 더 쉽게 읽을 수 있습니다.
이전 예시(최소 WordCount)에서 언급했듯이 파이프라인을 실행할 때 Dataflow 모니터링 인터페이스는 각 ParDo
변환이 실행되는 시점을 나타냅니다. Dataflow 서비스는 ParDo
변환의 변환 이름을 전달한 DoFn
이름에서 자동으로 생성합니다. 예를 들어 FormatAsTextFn()
이 적용되는 ParDo
는 모니터링 인터페이스에 ParDo(FormatAsText)
로 나타납니다.
자바
이 예시에서 DoFn
은 정적 클래스로 정의됩니다.
/** A DoFn that converts a Word and Count into a printable string. */ static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { ... @Override public void processElement(ProcessContext c) { ... } } public static void main(String[] args) throws IOException { Pipeline p = ... // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform. p.apply(...) .apply(...) .apply(ParDo.of(new FormatAsTextFn())) ... }
ParDo
변환에 DoFn
하위 클래스를 만들고 지정하는 방법에 대한 자세한 내용은 ParDo를 사용한 병렬 처리를 참조하세요.
복합 변환 생성
여러 변환 또는 ParDo
단계로 구성된 처리 작업이 있을 경우, 이를 PTransform
의 하위 클래스로 생성할 수 있습니다. PTransform
하위 클래스를 만들면 복잡하고 재사용 가능한 변환을 만들고, 파이프라인의 구조를 더 명확하고 모듈화되도록 할 수 있으며, 손쉽게 단위 테스트를 수행할 수 있습니다.
PTransform
하위 클래스를 사용하여 파이프라인의 논리적 구조를 명시하면 파이프라인 모니터링도 간편해집니다. Dataflow 서비스가 파이프라인의 최종 최적화 구조를 작성할 때 Dataflow 모니터링 인터페이스는 작성한 변환을 사용하여 파이프라인의 구조를 더욱 정확하게 반영합니다.
자바
이 예시에서는 변환 두 개가 PTransform
하위 클래스 CountWords
로 캡슐화됩니다. CountWords
에는 ExtractWordsFn
과 SDK에서 제공한 Count
변환을 실행하는 ParDo
가 포함되어 있습니다.
CountWords
가 정의되면 최종 입력 및 출력을 지정합니다. 입력은 추출 작업의 PCollection<String>
이고, 출력은 개수 계산 작업에서 생성한 PCollection<KV<String, Long>>
입니다.
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } public static void main(String[] args) throws IOException { Pipeline p = ... p.apply(...) .apply(new CountWords()) ... }
매개변수화 가능한 PipelineOptions
사용
이전 예시인 최소 WordCount에서는 파이프라인을 생성할 때 다양한 실행 옵션을 설정했습니다. 이 예시에서는 PipelineOptions
를 확장하여 자체적인 커스텀 구성 옵션을 정의하겠습니다.
명령줄 파서에서 처리할 자체 인수를 추가하고 해당 인수의 기본값을 지정할 수 있습니다. 그런 다음 파이프라인 코드의 옵션 값에 액세스할 수 있습니다.
최소 WordCount 예시에서는 파이프라인 옵션을 하드 코딩했습니다. 하지만 명령줄 인수 파싱을 통해 PipelineOptions
를 구성하는 것이 가장 일반적입니다.
자바
public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); ... } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); ... }
디버깅 WordCount 예시
디버깅 WordCount 예시에서는 파이프라인 코드를 검사하는 모범 사례를 살펴보겠습니다. Dataflow 모니터링 인터페이스와 애그리게이터를 사용하여 실행 중인 파이프라인에 대한 가시성을 추가로 확보할 수 있습니다.
자바
또한 SDK의 DataflowAssert를 사용하여 파이프라인의 서로 다른 단계에서 변환의 출력을 테스트할 수 있습니다.
자바
- Dataflow 모니터링 인터페이스에서 로그 확인
- Dataflow 작업자 로그 수준 제어
Aggregators
만들기DataflowAssert
를 통한 파이프라인 테스트
다음 섹션에서는 이러한 핵심 개념을 자세히 살펴보고 파이프라인 코드를 세부 섹션으로 나누어 설명합니다.
Dataflow 모니터링 인터페이스에서 로그 확인
Google Cloud Logging은 모든 Dataflow 작업자의 로그를 Google Cloud Platform Console의 단일 위치에 집계합니다. 그런 다음 Dataflow 모니터링 인터페이스를 사용하여 Dataflow에서 생성한 모든 Compute Engine 인스턴스의 로그를 확인하고 Dataflow 작업을 완료할 수 있습니다. 파이프라인의 DoFn
인스턴스에 파이프라인이 실행될 때 모니터링 인터페이스에 표시되는 로깅 구문을 추가할 수 있습니다.
자바
다음 SLF4J 로거는 로거 이름으로 정규화된 FilterTextFn
클래스 이름을 사용합니다. 이 로거에서 출력한 모든 로그 구문은 이 이름으로 참조되며 적절한 로그 수준 설정에 따라 Dataflow 모니터링 인터페이스에 표시됩니다.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DebuggingWordCount { public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); ... public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI // only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); ... } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. LOG.trace("Did not match: " + c.element().getKey()); ... } } } }
Dataflow 작업자 로그 수준 제어
자바
사용자 코드를 실행하는 Dataflow 작업자는 기본적으로 INFO 로그 수준 이상에서 Cloud Logging에 로깅되도록 구성됩니다. 다음 내용을 지정하여 특정 로깅 네임스페이스의 로그 수준을 재정의할 수 있습니다.
--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}
예를 들어 파이프라인을 실행할 때 Dataflow 서비스로 다음 내용을 지정한다고 가정합니다.
--workerLogLevelOverrides={"com.example":"DEBUG"}
Dataflow 서비스를 사용하여 이 파이프라인을 실행하면 모니터링 인터페이스에 기본 INFO 수준 이상의 로그 외에 com.example
패키지의 DEBUG 수준 이상의 로그만 포함됩니다.
또한 다음 내용을 지정하여 기본 Dataflow 작업자 로깅 구성을 재정의할 수 있습니다.
--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>
예를 들어 파이프라인을 실행할 때 Dataflow 서비스로 다음 내용을 지정한다고 가정합니다.
--defaultWorkerLogLevel=DEBUG
Dataflow 서비스를 사용하여 이 파이프라인을 실행하면 모니터링 인터페이스는 DEBUG 수준 이상의 로그를 모두 포함합니다. 기본 작업자 로그 수준을 TRACE 또는 DEBUG로 변경하면 로그 정보의 양이 크게 증가합니다.
자세한 내용은 Cloud Dataflow에서 로깅을 참조하세요.
애그리게이터 생성
커스텀 애그리게이터는 파이프라인이 실행될 때 값을 추적할 수 있습니다. 이러한 값은 Dataflow 서비스에서 파이프라인을 실행할 때 Dataflow 모니터링 인터페이스에 표시됩니다.
애그리게이터는 시스템에서 애그리게이터를 생성하는 ParDo
변환을 실행하거나 초기 값이 변경될 때까지 표시되지 않을 수 있습니다. 애그리게이터는 모니터링 인터페이스의 작업 요약 하단에서 확인할 수 있습니다.
아래의 커스텀 애그리게이터는 일치하는 단어와 일치하지 않는 단어의 수를 추적합니다.
자바
public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); private final Aggregator<Long, Long> unmatchedWords = createAggregator("umatchedWords", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { ... matchedWords.addValue(1L); ... } else { ... unmatchedWords.addValue(1L); } } } }
배치 및 스트리밍 파이프라인의 애그리게이터
배치 파이프라인의 애그리게이터는 일관성을 제공합니다. 이 애그리게이터는 성공적인 번들에서는 딱 한 번 커밋되고 실패한 번들에서는 커밋되지 않습니다.
스트리밍 파이프라인에서 애그리게이터는 보다 관대한 시맨틱스를 제공합니다. 성공적인 번들은 최고 효율에 기여하고 실패한 번들은 최종 값에 반영될 수 있습니다.
DataflowAssert
를 통한 파이프라인 테스트
자바
DataflowAssert는 파이프라인 수준 테스트를 작성하여 PCollections
콘텐츠의 유효성을 검사할 때 사용할 수 있는 Hamcrest 컬렉션 일치자 스타일의 편리한 PTransforms
집합입니다. DataflowAssert
는 작은 데이터 세트를 단위 테스트할 때 사용하는 것이 가장 좋지만 여기서는 교육용 도구로 사용하겠습니다.
다음 사례에서는 필터링한 단어 세트가 예측 개수와 일치하는지 확인합니다. DataflowAssert
는 출력을 제공하지 않으며 파이프라인이 성공적으로 완료되면 예측이 적중했다는 것을 의미합니다. 파이프라인 테스트 방법에 대해 자세히 알아보고 DebuggingWordCountTest에서 예 단위 테스트를 참조하세요.
public static void main(String[] args) { ... List<KV<String, Long>> expectedResults = Arrays.asList( KV.of("Flourish", 3L), KV.of("stomach", 1L)); DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); ... }
WindowedWordCount
자바
이 예시 WindowedWordCount
에서는 이전 예시처럼 텍스트의 단어 수를 계산하지만 여러 고급 개념을 도입합니다. WindowedWordCount
의 입력은 이전 예시와 같은 고정 데이터 세트 또는 제한되지 않은 데이터 스트림입니다.
Dataflow SDK는 제한된 유형과 제한되지 않은 유형의 입력을 모두 처리하는 단일 파이프라인을 생성할 수 있다는 점에서 편리합니다. 입력이 제한되지 않을 경우 파이프라인의 모든 PCollections
도 제한되지 않습니다. 제한된 입력의 경우 PCollection도 제한됩니다.
이 섹션을 읽기 전에 파이프라인 작성의 기본 원칙을 잘 이해하고 숙지하도록 하세요.
새로운 개념- 제한되지 않은 입력 및 제한된 입력 읽기
- 데이터에 타임스탬프 추가
- 기간 설정
- 제한되지 않은 출력 및 제한된 출력 작성
다음 섹션에서는 이러한 핵심 개념을 자세히 살펴보고 파이프라인 코드를 세부 섹션으로 나누어 설명합니다.
제한되지 않은 입력 및 제한된 입력 읽기
WindowedWordCount
입력은 제한된 입력 또는 제한되지 않은 입력일 수 있습니다. 입력에 고정된 개수의 요소가 있을 경우 '제한된' 데이터 세트로 간주됩니다. 입력이 지속적으로 업데이트될 경우 '제한되지 않은' 데이터 세트로 간주됩니다. 입력 유형에 대한 자세한 내용은 제한된 PCollection과 제한되지 않은 PCollection을 참조하세요.
이 예에서는 입력의 제한 여부를 선택할 수 있습니다. 모든 예시의 입력은 셰익스피어 작품의 모음이라는 것을 기억해 봅시다. 셰익스피어 작품은 유한한 제한된 입력입니다. 하지만 이 예시에서는 새로운 개념을 설명하기 위해 입력에 셰익스피어 작품의 재연을 사용합니다.
이 예에서 입력이 제한되지 않을 경우, Google Cloud Pub/Sub 주제에서 입력을 읽습니다. 이 경우, 파이프라인에 적용되는 Read
변환은 PubSubIO.Read
입니다. 그렇지 않을 경우 입력은 Google Cloud Storage에서 읽습니다.
public static void main(String[] args) throws IOException { ... PCollection<String> input; if (options.isUnbounded()) { LOG.info("Reading from PubSub."); // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg. input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic())); } else { // Else, this is a bounded pipeline. Read from the Google Cloud Storage file. input = pipeline.apply(TextIO.Read.from(options.getInputFile())) ... } ... }
데이터에 타임스탬프 추가
PCollection
의 각 요소에는 연결된 타임스탬프가 있습니다. 각 요소의 타임스탬프는 PCollection
을 생성하는 소스에서 지정됩니다. 이 예시에서는 파이프라인 입력으로 제한되지 않은 입력을 선택한 경우 타임스탬프를 Pub/Sub 데이터 소스에서 가져옵니다. 제한된 입력을 선택할 경우 ParDo
에서 호출한 AddTimestampsFn
이라는 이름의 DoFn
메소드가 PCollection
의 각 요소에 타임스탬프를 설정합니다.
public static void main(String[] args) throws IOException { ... input = pipeline .apply(...) // Add an element timestamp, using an artificial time. .apply(ParDo.of(new AddTimestampFn())); }
다음은 ParDo
에서 호출한 DoFn
인 AddTimestampsFn
의 코드입니다. 이 코드는 요소 자체에서 지정한 타임스탬프의 데이터 요소를 설정합니다. 예를 들어 요소가 로그 줄일 경우 이 ParDo
는 로그 문자열에서 시간을 파싱하여 요소의 타임스탬프로 설정합니다. 셰익스피어 작품에는 자체 타임스탬프가 없으므로 여기에서는 개념을 설명하기 위해 타임스탬프를 임의로 만들었습니다. 입력 텍스트의 각 줄에는 2시간이라는 기간 내에 무작위로 관련 타임스탬프가 부여됩니다.
static class AddTimestampFn extends DoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; AddTimestampFn() { this.minTimestamp = new Instant(System.currentTimeMillis()); } @Override public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past 2 hours. long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); Instant randomTimestamp = minTimestamp.plus(randMillis); // Set the data element with that timestamp. c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); } }
타임스탬프에 대한 자세한 내용은 PCollection 요소 타임스탬프를 참조하세요.
기간 설정
Dataflow SDK는 윈도우라는 개념을 사용하여 PCollection
을 개별 요소의 타임스탬프에 맞게 세분화합니다.
여러 요소를 집계하는 Dataflow 변환은 전체 컬렉션의 크기가 무한한 경우(제한되지 않음)에도 각 PCollection
을 여러 개의 유한한 기간의 연속으로 처리합니다.
WindowingWordCount
예시는 고정 기간 설정을 적용합니다. 여기서 각 기간은 고정된 시간 간격을 나타냅니다. 이 예시에서 고정 기간 크기는 기본적으로 1분입니다(명령줄 옵션으로 변경 가능). 그런 다음 파이프라인은 CountWords
변환을 적용합니다.
PCollection<KV<String, Long>> wordCounts = input .apply(Window.<String>into( FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) .apply(new WordCount.CountWords());
제한되지 않은 출력 및 제한된 출력 작성
이 예시에서 입력이 제한된 입력 또는 제한되지 않은 입력일 수 있듯이 출력 PCollection
에서도 마찬가지로 적절한 출력 싱크를 선택해야 합니다. 일부 출력 싱크는 제한된 출력 또는 제한되지 않은 출력 중 하나만을 지원합니다. 예를 들어 텍스트 파일은 제한된 데이터만을 수신할 수 있는 싱크입니다. BigQuery 출력 소스는 제한된 입력과 제한되지 않은 입력을 모두 지원합니다.
이 예시에서는 결과를 BigQuery 테이블로 스트리밍합니다. 그러면 결과가 BigQuery 테이블에 대해 형식이 지정되고 BigQueryIO.Write
를 통해 BigQuery에 작성됩니다.
wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));