윈도우

Dataflow SDK는 윈도우라는 개념을 사용하여 PCollection을 개별 요소의 타임스탬프에 맞게 세분화합니다. GroupByKey, Combine과 같은 여러 요소를 집계하는 Dataflow 변환은 암시적으로 개별 윈도우 기반으로 작동합니다. 즉, 전체 컬렉션 크기가 제한이 없거나 무한한 경우에도 각 PCollection을 여러 개의 유한한 윈도우의 연속으로 처리합니다.

Dataflow SDK는 트리거라는 관련 개념을 사용해 제한되지 않은 데이터가 도착할 때 유한한 각 윈도우를 '닫는' 시점을 결정합니다. 트리거를 사용하면 PCollection에 대한 윈도우 전략을 구체화해 늦게 도착하는 데이터를 처리하거나 초기 결과를 제공할 수 있습니다. 자세한 내용은 트리거를 참조하세요.

윈도우 기초

윈도우는 지속적으로 업데이트되는, 크기를 알 수 없거나 크기 제한이 없는 데이터 세트(예: 스트리밍 데이터)를 나타내는 제한되지 않은 PCollection에서 매우 유용합니다. GroupByKey, Combine과 같은 일부 Dataflow 변환은 공통 키에 따라 여러 요소를 그룹화합니다. 보통 그룹화 작업은 전체 데이터 세트에서 동일한 키를 보유한 모든 요소를 그룹화합니다. 제한되지 않은 데이터 세트에서는 새로운 요소가 계속해서 추가되기 때문에 요소를 모두 수집하는 것은 불가능합니다.

Dataflow 모델에서 PCollection이 논리적 윈도우로 세분화될 수 있습니다. PCollection의 각 요소는 PCollection윈도우 함수에 따라 윈도우 하나 이상에 할당되고, 개별 윈도우는 유한한 수의 요소를 포함합니다. 그리고 그룹화 변환은 각 PCollection의 요소를 개별 윈도우 기반으로 고려합니다. 예를 들어, GroupByKey는 암시적으로 키와 윈도우에 따라 PCollection의 요소를 그룹화합니다. Dataflow는 동일한 윈도우 내의 데이터 그룹화하며, 다른 윈도우의 데이터는 그룹화하지 않습니다.

윈도우 제약조건

PCollection에 대해 윈도우 함수를 설정하고 해당 PCollection그룹화 변환을 다음에 적용하면 요소의 윈도우가 사용됩니다. Dataflow는 필요에 따라 실제 윈도우 그룹화를 수행하며, Window 변환을 사용해 윈도우 함수를 설정한 경우 각 요소는 윈도우에 할당됩니다. 하지만 GroupByKey 또는 Combine으로 PCollection을 그룹화하기 전까지 윈도우는 고려되지 않습니다. 이로 인해 파이프라인에 다양한 영향을 미칠 수 있습니다.

아래 그림 1의 파이프라인 예시를 살펴보세요.

윈도우, ParDo, GroupByKey를 순서대로 적용하는 파이프라인
그림 1: 윈도우를 적용하는 파이프라인

위의 파이프라인에서는 PubsubIO를 사용하여 키-값 쌍 집합을 읽어 제한되지 않는 PCollection을 만든 후 Window 변환을 사용해 해당 컬렉션에 윈도우 함수를 적용했습니다. 그리고 ParDo를 컬렉션에 적용한 후 GroupByKey를 사용하여 해당 ParDo의 결과를 그룹화했습니다. GroupByKey에 윈도우가 필요해지기 전에는 윈도우가 실제로 사용되지 않으므로, 윈도우 함수는 ParDo 변환에 영향을 미치지 않습니다.

하지만 후속 변환은 GroupByKey의 결과에 적용되어 키와 윈도우에 따라 데이터를 그룹화합니다.

제한된 PCollection에서 윈도우 사용하기

제한된 PCollection의 크기가 고정된 데이터 세트에서 윈도우를 사용할 수 있습니다. 하지만 윈도우는 PCollection의 각 요소에 첨부된 암시적 타임스탬프만 고려하며, 고정된 데이터 세트(예: TextIOBigQueryIO)를 생성하는 데이터 소스는 매 요소에 동일한 타임스탬프를 할당합니다. 따라서 모든 요소는 기본적으로 단일 전역 윈도우에 포함됩니다. 모든 요소가 동일한 윈도우에 할당되면 파이프라인이 기본 MapReduce 배치 스타일로 실행됩니다.

고정된 데이터 세트에 윈도우를 사용하려면 각 요소에 자체 타임스탬프를 할당하면 됩니다. 요소에 타임스탬프를 할당하려면 새 타임스탬프가 있는 각 요소를 출력하는 DoFnParDo 변환을 사용합니다.

제한된 PCollection에 윈도우를 사용하면 파이프라인이 데이터를 처리하는 방식에 영향을 미칠 수 있습니다. 예를 들어 다음 파이프라인을 살펴보겠습니다.

제한된 컬렉션에 GroupByKey 후 ParDo를 적용하는 파이프라인
그림 2: 제한된 컬렉션에서 윈도우가 없는 GroupByKey 및 ParDo

위의 파이프라인에서는 TextIO를 사용하여 키-값 쌍 집합을 읽어 제한된 PCollection을 만들었습니다. 그런 다음 GroupByKey를 사용하여 컬렉션을 그룹화하고 그룹화된 PCollectionParDo 변환을 적용했습니다. 이 예에서 GroupByKey는 고유 키 컬렉션을 만들고 ParDo는 각 키에 정확히 한 번씩 적용됩니다.

이제 동일한 파이프라인에 윈도우 함수를 사용하는 예시를 살펴보겠습니다.

제한된 컬렉션에 윈도우를 적용하고 GroupByKey, ParDo를 순서대로 적용하는 파이프라인
그림 3: 제한된 컬렉션에서 윈도우가 있는 GroupByKey 및 ParDo

위와 동일하게 파이프라인에서는 키/값 쌍의 제한된 PCollection을 생성합니다. 그런 다음 해당 PCollection에 대해 윈도우 함수를 설정합니다. 이제 GroupByKey 변환은 키와 윈도우에 따라 PCollection의 요소를 그룹화합니다. 후속 ParDo 변환은 각 키에 여러 번 적용되며 각 윈도우에는 한 번씩 적용됩니다.

윈도우 함수

Dataflow SDK를 사용하면 여러 유형의 윈도우를 정의하여 PCollection의 요소를 나눌 수 있습니다. SDK는 다음을 비롯한 여러 윈도우 함수를 제공합니다.

  • 고정 시간 윈도우
  • 슬라이딩 시간 윈도우
  • 세션별 윈도우
  • 단일 전역 윈도우

각 요소는 사용하는 윈도우 함수에 따라 논리적으로 하나 이상의 윈도우에 속할 수 있습니다. 예를 들어 슬라이딩 시간 윈도우는 단일 요소가 여러 윈도우에 할당되는 중첩 윈도우를 생성합니다.

고정 시간 윈도우

고정 시간 윈도우는 가장 단순한 형태의 윈도우입니다. 예를 들어, 지속적으로 업데이트되고 타임스탬프가 적용된 특정 PCollection에서 각 윈도우는 5분간의 요소를 캡처할 수 있습니다.

고정 시간 윈도우는 처리할 데이터 번들을 정의하는 데이터 스트림의 시간 간격을 나타냅니다. 5분 간격으로 작동하는 윈도우를 살펴보겠습니다. 타임스탬프 값이 0:00:00과 0:04:59 사이인 제한되지 않은 PCollection의 모든 요소는 첫 번째 윈도우에 속합니다. 타임스탬프 값이 0:05:00과 0:09:59 사이인 요소는 두 번째 윈도우에 속합니다. 이런 방식으로 계속됩니다.

고정 시간 윈도우를 보여주는 다이어그램
그림 4: 30초 크기의 고정 시간 윈도우

슬라이딩 시간 윈도우

슬라이딩 시간 윈도우도 데이터 번들을 정의하는 데이터 스트림의 시간 간격을 사용하지만 슬라이딩 시간 윈도우를 사용하면 윈도우가 겹칩니다. 각 윈도우는 5분간의 데이터를 캡처할 수 있지만 새 윈도우는 10초마다 시작합니다. 슬라이딩 윈도우가 시작하는 간격을 주기라고 합니다. 따라서 위의 예의 윈도우는 크기가 5분이고 주기가 10초가 됩니다.

여러 윈도우가 겹치기 때문에 데이터 세트의 많은 요소는 하나 이상의 윈도우에 속합니다. 이러한 윈도우는 데이터의 누적 평균을 구하는 데 유용합니다. 슬라이딩 시간 윈도우를 사용하면 이 예시의 경우 10초마다 업데이트되는 지난 5분간의 데이터의 누적 평균을 계산할 수 있습니다.

슬라이딩 시간 윈도우를 보여주는 다이어그램
그림 5: 윈도우 크기가 1분이고 윈도우 주기가 30초인 슬라이딩 시간 윈도우

세션 윈도우

세션 윈도우 함수는 데이터가 집중된 영역 주변의 윈도우를 정의합니다. 세션 윈도우는 시간과 관련하여 불규칙적으로 분포된 데이터에 유용합니다. 예를 들어 사용자 마우스 활동을 나타내는 데이터 스트림은 일부에 클릭이 크게 집중되어 있어 유휴 시간의 주기가 길 수 있습니다. 세션 윈도우는 크게 집중된 데이터를 별도의 윈도우로 그룹화하고 데이터 스트림의 유휴 섹션을 필터링합니다.

세션 윈도우는 개별 키 기반으로 적용됩니다. 따라서 세션으로 그룹화하면 키가 동일한 데이터 고려합니다. 이로 인해 데이터 컬렉션의 각 키는 크기가 다른 분리된 윈도우로 그룹화됩니다.

가장 단순한 유형의 세션 윈도우는 최소 갭 기간을 지정합니다. 최소 지연 시간 기준값 이전에 도착하는 모든 데이터는 동일한 윈도우로 그룹화됩니다. 데이터가 지정된 최소 갭 기간이 지난 후에 도착하면 새로운 윈도우가 시작됩니다.

세션 윈도우를 보여주는 다이어그램
그림 5: 최소 갭 기간이 있는 세션 윈도우. 각 데이터 키가 데이터 분포에 따라 다른 윈도우에 있음

단일 전역 윈도우

기본적으로 PCollection의 모든 데이터는 단일 전역 윈도우에 할당됩니다. 데이터 세트의 크기가 고정된 경우, PCollection에 대한 전역 윈도우가 기본값으로 유지됩니다. PCollection의 요소가 모두 단일 전역 윈도우에 속한 경우, 파이프라인은 일괄 처리 작업(MapReduce 기반 처리에서와 동일)과 매우 유사한 작업을 실행합니다.

기타 윈도우 함수

Dataflow SDK는 고정, 슬라이딩, 세션, 전역 윈도우 외에도 캘린더 기반 윈도우 등 다양한 윈도우 함수를 제공합니다.

자바

자바용 Dataflow SDK에서 이용할 수 있는 전체 윈도우 함수 목록은 com.google.cloud.dataflow.sdk.transforms.windowing 패키지를 참조하세요.

PCollection의 윈도우 함수 설정

Window 변환을 적용하여 PCollection에 대해 윈도우 함수를 설정할 수 있습니다. Window 변환을 적용할 때는 WindowFn를 제공해야 합니다. WindowFn은 고정 또는 슬라이딩 시간 윈도우와 같이 PCollection이 후속 그룹화 변환에 사용할 윈도우 함수를 결정합니다.

Dataflow SDK가 기본 윈도우 함수에 대해 사전 정의된 WindownFn을 제공하거나 사용자가 이후에 자체 WindowFn을 정의할 수 있습니다.

기술적으로는 모든 변환과 동일하게 Window는 입력 PCollection을 적용하고 하나 이상의 유한한 논리 윈도우에 할당된 각 요소에 새 PCollection을 출력합니다.

고정 시간 윈도우 설정

다음 코드 예시는 Window를 적용해 PCollection을 1분 길이의 고정된 윈도우로 나누는 방법을 보여줍니다.

자바

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

슬라이딩 시간 윈도우 설정

다음 코드 예시는 Window를 적용해 PCollection을 슬라이딩 시간 윈도우로 나누는 방법을 보여줍니다. 각 윈도우의 길이는 30분이며, 5초마다 새 윈도우가 시작됩니다.

자바

  PCollection<String> items = ...;
  PCollection<String> sliding_windowed_items = items.apply(
    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

세션 윈도우 설정

다음 코드 예시는 Window를 적용해 PCollection을 세션 윈도우로 나누는 방법을 보여줍니다. 이때 윈도우의 각 세션은 10분 이상의 시간 간격으로 구분해야 합니다.

자바

  PCollection<String> items = ...;
  PCollection<String> session_windowed_items = items.apply(
    Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));

세션은 키별로 다릅니다. 컬렉션의 각 키에는 데이터 분포에 따라 자체 세션 그룹화가 있습니다.

단일 전역 윈도우 설정

PCollection이 제한되어 있는 경우(크기가 고정됨) 단일 전역 윈도우에 모든 요소를 할당할 수 있습니다. 다음 코드 예시는 PCollection에 대해 단일 전역 윈도우를 설정하는 방법을 보여줍니다.

PCollection에 대해 단일 전역 윈도우를 설정하려면 Window 변환을 적용할 때 new GlobalWindows()WindowFn으로 전달합니다. 다음 코드 예시는 Window를 적용해 PCollection을 단일 전역 윈도우에 할당하는 방법을 보여줍니다.

자바

  PCollection<String> items = ...;
  PCollection<String> batch_items = items.apply(
    Window.<String>into(new GlobalWindows()));

시간차, 데이터 지연, 지연 데이터

모든 데이터 처리 시스템에서는 데이터 이벤트가 발생하는 시간('이벤트 시간', 데이터 요소 자체의 타임스탬프에서 결정)과 파이프라인의 각 단계에서 실제 데이터 요소가 처리되는 시간('처리 시간', 요소를 처리하는 시스템의 시계에서 결정) 사이에 일정 수준의 지연이 발생합니다.

완벽한 시스템에서 각 데이터 요소의 이벤트 시간과 처리 시간은 동일하거나 적어도 델타가 일관적입니다. 하지만 실제 컴퓨터 시스템에서는 데이터 생성과 전달에 여러 시간적인 제한이 적용됩니다. 고객 주문이나 로그 파일을 생성하는 웹 프런트 엔드의 분산 컬렉션과 같은 대규모 시스템 혹은 분산 시스템에서 데이터 이벤트가 웹상의 다양한 위치에 생성된 순서와 동일하게 파이프라인에 나타나지 않을 수도 있습니다.

예를 들어 5분 길이의 고정 시간 윈도우를 사용하는 PCollection이 있다고 가정해 보겠습니다. 각 윈도우에서 Dataflow는 특정 윈도우 범위(예: 첫 윈도우에서 0:00와 4:59 사이)의 이벤트 시간 타임스탬프가 있는 데이터를 모두 수집해야 합니다. 범위 밖의 타임스탬프가 있는 데이터(5:00 이후의 데이터)는 다른 윈도우에 속합니다.

하지만 데이터가 항상 정확한 시간 순서대로 파이프라인에 도착하거나 항상 예측 가능한 간격으로 도착한다고 보장할 수 없습니다. Dataflow는 시스템이 특정 윈도우의 모든 데이터가 파이프라인에 도착했다고 예측할 수 있는 시점인 워터마크를 추적합니다. 워터마크 이후에 도착하는 타임스탬프가 있는 데이터는 지연 데이터로 간주합니다.

본문의 예시에서 데이터 타임스탬프와 데이터가 파이프라인에 나타나는 시간(처리 시간) 사이에 약 30초의 지연이 있다고 가정하는 간단한 워터마크가 있다고 가정합니다. 그리고 Dataflow는 첫 윈도우를 5:30에 닫습니다. 데이터 레코드가 5:34에 도착했지만 0:00-4:59 윈도우(예: 3:38)에 들어가는 타임스탬프가 있다면 해당 레코드는 지연 데이터입니다.

참고: 여기에서는 간단하게 지연 시간/시간차를 추정하는 아주 간단한 워터마크를 사용한다고 가정합니다. 실제로는 PCollection의 데이터 소스가 워터마크를 결정하며, 워터마크는 더 정확하거나 복잡할 수 있습니다.

시간차 및 지연 데이터 관리

PCollection의 윈도우 전략을 설정할 때 .withAllowedLateness 작업을 호출해 지연 데이터를 허용할 수 있습니다. 다음 코드 예시는 윈도우가 끝난 후 최대 2일까지 지연 데이터를 허용하는 윈도우 전략을 보여줍니다.

자바

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
          .withAllowedLateness(Duration.standardDays(2)));

PCollection.withAllowedLateness를 설정하면 허용된 지연이 이후에 허용된 지연을 처음 적용하는 PCollection에서 파생된 모든 후속 PCollection에 전파됩니다. 나중에 파이프라인에서 허용된 지연을 변경하려면 Window.withAllowedLateness()를 다시 적용하여 명시적으로 수행해야 합니다.

또한 Dataflow의 Triggers API를 사용하여 PCollection에 대한 윈도우 전략을 구체화할 수 있습니다. 트리거를 사용하면 윈도우가 지연 요소를 내보내는 방법을 포함해 개별 윈도우가 결과를 집계하고 보고하는 정확한 방식을 결정할 수 있습니다.

참고: Dataflow의 기본 윈도우 및 트리거 전략은 지연 데이터를 삭제합니다. 파이프라인이 지연 데이터의 인스턴스를 처리하도록 하려면 이에 맞게 PCollection의 윈도우 전략을 설정하고 PCollection에 대해 트리거를 설정할 때 명시적으로 .withAllowedLateness를 설정해야 합니다.

PCollection의 요소에 타임스탬프 추가하기

설정한 타임스탬프가 있는 새 요소를 출력하는 ParDo 변환을 적용하여 PCollection의 요소에 새 타임스탬프를 할당할 수 있습니다. 타임스탬프 할당은 Dataflow의 윈도우 기능을 사용하고 싶지만 데이터 세트가 암시적인 타임스탬프가 없는 소스(예: TextIO의 파일)에서 추출된 경우에 유용할 수 있습니다.

데이터 세트에 타임스탬프 데이터가 포함되었지만 Dataflow 데이터 소스에서 타임스탬프를 생성하지 않은 경우에 따르기 좋은 패턴입니다. 파이프라인이 입력 파일에서 로그 레코드를 읽을 때 각 로그 레코드에 타임스탬프 필드가 포함된 경우를 한 가지 예로 들 수 있습니다. 파이프라인이 파일에서 레코드를 읽기 때문에 파일 소스는 타임스탬프를 자동으로 할당하지 않습니다. 각 레코드에서 타임스탬프 필드를 파싱하고 ParDo 변환을 사용해 PCollection의 각 요소에 타임스탬프를 연결할 수 있습니다.

자바

ParDo 변환은 기본 출력 컬렉션으로 요소를 출력하는 데 사용되는 일반 ProcessContext.output 대신 ProcessContext.outputWithTimestamp로 요소를 출력하는 DoFn을 사용해 타임스탬프를 할당해야 합니다. 다음 코드 예는 새 타임스탬프가 있는 요소를 출력하는 ParDoDoFn을 보여줍니다.

  PCollection<LogEntry> unstampedLogs = ...;
  PCollection<LogEntry> stampedLogs =
      unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
        public void processElement(ProcessContext c) {
          // Extract the timestamp from log entry we're currently processing.
          Instant logTimeStamp = extractTimeStampFromLogEntry(c.element());
          // Use outputWithTimestamp to emit the log entry with timestamp attached.
          c.outputWithTimestamp(c.element(), logTimeStamp);
        }
      }));
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.