트리거

PCollection에서 데이터를 수집하고 한정된 윈도우에서 데이터를 그룹화하는 경우 Dataflow는 여러 방법을 사용해 각 윈도우의 집계 결과를 내보낼 시점을 결정해야 합니다. 시간차와 데이터 지연을 고려해 Dataflow는 트리거라는 메커니즘을 사용하여 '충분한' 데이터가 윈도우에 수집되는 시점을 결정합니다. 해당 윈도우의 집계된 결과를 내보낸 후에 각 시점을 이라고 합니다.

Dataflow의 트리거 시스템은 다양한 방법을 통해 시스템의 데이터 처리 필요에 따라 특정 윈도우의 집계된 결과를 내보낼 시점을 결정합니다. 예를 들어 프롬프트가 필요한 시스템이나 시간에 민감한 업데이트는 N초마다 윈도우를 내보내는 트리거를 이용할 수 있습니다. 이 트리거는 데이터 완전성보다 신속함에 중점을 둡니다. 정확한 결과 시점보다 데이터 완전성을 중시하는 시스템은 윈도우가 끝나기 전에 일정 수의 데이터 레코드가 누적될 때까지 대기하는 데이터 기반 트리거를 사용할 수 있습니다.

트리거는 특히 파이프라인의 두 가지 조건을 처리할 때 유용합니다.

  • 트리거를 이용해 지연 데이터 인스턴스를 처리할 수 있습니다.
  • 트리거를 이용해 특정 윈도우의 데이터가 모두 도착하기 전에 초기 결과를 내보낼 수 있습니다.

참고: 윈도우 함수에 단일 전역 윈도우를 사용하는 제한되지 않은 PCollection 트리거를 설정할 수 있습니다. 파이프라인을 통해 제한되지 않은 데이터 집합(예: 현재 시간에 제공되고 N초 또는 N개 요소마다 업데이트되는 모든 데이터의 누적 평균)의 정기 업데이트를 제공하려는 경우에 이 설정이 유용할 수 있습니다.

트리거 유형

Dataflow는 PCollection으로 설정할 수 있는 사전 빌드된 여러 트리거를 제공합니다. 세 가지 유형의 주요 트리거가 있습니다.

  • 시간 기반 트리거. 이 트리거는 시간을 기준으로 작동합니다. 이벤트 시간(각 데이터 요소의 타임스탬프에서 나타내는 시간)이나 처리 시간(파이프라인의 모든 특정 단계에서 데이터 요소가 처리되는 시간)을 기준으로 합니다.
  • 데이터 기반 트리거. 이 트리거는 데이터가 각 윈도우에 도착한 시점에 데이터를 검사하고 지정한 데이터 조건이 충족되면 발생하는 방식으로 작동합니다. 예를 들어 윈도우가 일정 수의 데이터 요소를 수신한 시점에 윈도우에서 결과를 내보내도록 트리거를 설정할 수 있습니다.
  • 복합 트리거. 이 트리거는 여러 시간 기반 트리거 또는 데이터 기반 트리거를 논리적인 방식으로 결합한 것입니다. 모든 트리거의 조건이 충족되었거나(논리적 AND) 하나의 트리거 조건이 충족되는 경우(논리적 OR), 복합 트리거가 발생하도록 설정할 수 있습니다.

시간 기반 트리거

Dataflow의 시간 기반 트리거는 AfterWatermarkAfterProcessingTime을 포함합니다. 이 트리거는 이벤트 시간이나 처리 시간을 기준으로 하며, 해당 시간 기준에 맞춰 타이머를 설정합니다.

AfterWatermark

AfterWatermark 트리거는 이벤트 시간에 작동합니다. AfterWatermark 트리거는 워터마크가 윈도우의 끝을 통과한 후 윈도우 콘텐츠를 내보냅니다. 이때 데이터 요소에 첨부된 타임스탬프를 기준으로 합니다. 워터마크는 전역적 진행 상황 측정항목으로, 특정 파이프라인 시점에서 Dataflow의 입력 완전성을 판단하는 개념입니다.

AfterWatermark 트리거는 워터마크가 윈도우 끝을 통과한 시점에 실행합니다. Dataflow는 기본 트리거를 사용하여 시스템이 지정된 시간 기반 윈도우에 모든 데이터가 있다고 예측하면 결과를 내보냅니다.

AfterProcessingTime

AfterProcessingTime 트리거는 처리 시간에 작동합니다. AfterProcessingTime 트리거는 윈도우 시작과 같은 시간 기준 이후 일정 처리 시간이 지난 다음에 윈도우를 내보냅니다. 처리 시간은 데이터 요소의 타임스탬프가 아닌 시스템 시계로 결정합니다.

AfterProcessingTime 트리거는 특히 단일 전역 윈도우처럼 시간대가 긴 윈도우에서 초기 결과를 트리거하는 데 유용합니다.

데이터 기반 트리거

Dataflow에서 현재 데이터 기반 트리거는 AfterPane.elementCountAtLeast 트리거 하나만 제공합니다. 이 트리거는 연속적인 요소 수를 기반으로 작동합니다. 현재 창이 요소를 N개 이상 수집하면 트리거가 발생합니다.

특히 단일 전역 윈도우인 경우 모든 데이터가 누적되기 전에 윈도우에서 초기 결과를 내보내도록 하려면 AfterPane.elementCountAtLeast()를 이용하는 것이 좋습니다.

기본 트리거

PCollection의 기본 트리거는 이벤트 시간을 기반으로 하며 시스템 워터마크(Dataflow에서 모든 데이터가 '수집되어야 한다고' 판단하는 시점)가 윈도우의 끝을 지나가면 윈도우 결과를 내보냅니다. 기본 트리거 구성은 정확히 한 번 내보내며, 지연 데이터는 삭제됩니다. 이는 기본 윈도우와 트리거 구성이 허용하는 지연값이 0이기 때문입니다. 이 동작의 수정에 대한 자세한 내용은 지연 데이터 처리를 참조하세요.

워터마크는 데이터 소스에 따라 다르며, 예측값인 경우도 있습니다. 시스템에서 할당한 타임스탬프가 있는 Pub/Sub 등 대부분의 경우에 워터마크는 파이프라인이 처리하는 데이터의 정확한 범위를 제공합니다.

지연 데이터 처리

파이프라인에서 지연 데이터(워터마크가 윈도우 끝을 지난 후에 도착한 데이터)를 처리해야 하는 경우, 윈도우와 트리거 구성을 설정할 때 허용 지연값을 적용할 수 있습니다. 이렇게 하면 트리거가 지연 데이터에 반응할 수 있으며, 기본 트리거 구성에서 지연 데이터가 도착할 때마다 바로 새 결과를 내보냅니다.

다음과 같이 윈도우와 트리거를 설정할 때 .withAllowedLateness()를 사용하여 허용 지연값을 설정할 수 있습니다.

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .withAllowedLateness(Duration.standardMinutes(30));

이 허용 지연값은 원래 PCollection에 변환을 적용한 결과로 파생된 모든 PCollection에 적용됩니다. 나중에 파이프라인에서 허용 지연값을 변경하려는 경우, Window.withAllowedLateness()를 다시 명시적으로 적용하면 됩니다.

트리거 설정

Window 변환을 사용해 PCollection에 대한 윈도우 함수를 설정하는 경우 트리거를 지정할 수도 있습니다.

다음과 같이 Window.into() 변환의 결과에 .triggering() 메소드를 호출해 PCollection에 대한 트리거를 설정할 수 있습니다.

자바

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());

위의 코드 샘플은 PCollection에 대한 트리거를 설정합니다. 트리거는 시간 기반으로, 윈도우의 첫 요소를 처리하고 일 분 후에 각 윈도우를 내보냅니다. 코드 샘플의 마지막 줄인 .discardingFiredPanes()는 윈도우의 누적 모드입니다.

윈도우 누적 모드

트리거를 지정할 때 윈도우의 누적 모드도 설정해야 합니다. 트리거가 발생하면 윈도우의 현재 콘텐츠를 창으로 내보냅니다. 트리거가 여러 번 발생하면 누적 모드는 트리거가 발생할 때 시스템이 윈도우 창을 누적하거나 삭제할지 결정합니다.

윈도우에 트리거가 발생할 때 생성되는 창을 누적하도록 설정하려면, 트리거를 설정할 때 .accumulatingFiredPanes()를 호출합니다. 윈도우가 발생한 창을 삭제하도록 설정하려면 .discardingFiredPanes()를 호출합니다.

PCollection을 고정 시간 기간 설정 및 데이터 기반 트리거와 함께 사용하는 예를 살펴보겠습니다. 예를 들어, 각 윈도우의 누적 평균이 10분이지만 UI 평균의 현재 값을 10분 간격보다 더 자주 표시하려는 경우에 사용할 수 있습니다. 다음과 같은 조건을 가정해 보겠습니다.

  • PCollection의 고정 시간 윈도우는 10분입니다.
  • PCollection에는 3개 요소가 도착할 때마다 발생하는 반복 트리거가 있습니다.

다음 그림은 데이터 이벤트가 PCollection에 도착하고 윈도우에 할당됐을 때의 데이터 이벤트를 보여줍니다.

고정 시간 윈도우에서 PCollection의 키별 데이터 다이어그램.
그림 1: 고정 시간 윈도우에서 PCollection의 데이터 이벤트

참고: 다이어그램을 단순하게 보여주기 위해 파이프라인의 이벤트가 모두 순서대로 도착했다고 가정했습니다.

여기에서는 간단하게 Key X에 연결된 값만 고려하겠습니다.

누적 모드

트리거가 .accumulatingFiredPanes로 설정된 경우, 트리거가 발생할 때마다 다음 값을 내보냅니다(트리거는 요소 3개가 도착할 때마다 발생).

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [5, 8, 3, 15, 19, 23]
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]

삭제 모드

트리거가 .discardingFiredPanes로 설정된 경우, 트리거가 발생할 때마다 다음 값을 내보냅니다.

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [15, 19, 23]
    Third trigger firing:  [9, 13, 10]

누적 효과와 삭제 효과 비교

이제 파이프라인에 키별 계산을 추가하겠습니다. 트리거가 발생할 때마다 파이프라인은 윈도우의 각 키에 연결된 모든 값의 산술 평균을 계산하는 Combine.perKey를 적용합니다.

다시 한번 Key X가 다음과 같다고 가정해 보겠습니다.

트리거가 .accumulatingFiredPanes로 설정된 경우:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [5, 8, 3, 15, 19, 23]
      Average after second trigger firing: 12.167
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
      Average after third trigger firing: 11.667

트리거가 .discardingFiredPanes로 설정된 경우:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [15, 19, 23]
      Average after second trigger firing: 19
    Third trigger firing:  [9, 13, 10]
      Average after third trigger firing: 10.667

산술 평균을 계산하는 Combine.perKey는 각 사례에 다른 결과를 생성합니다.

일반적으로 .accumulatingFiredPanes로 설정된 트리거는 이전에 트리거된 모든 요소를 포함하여 항상 지정된 윈도우에서 모든 데이터를 출력합니다. .discardingFiredPanes로 설정된 트리거는 트리거가 마지막으로 발생한 이후에 단계적 변경값을 출력합니다. 누적 모드는 요소를 결합하거나 업데이트하는 작업을 그룹화하기 전에 가장 적절하며, 다른 경우에는 삭제 모드를 이용합니다.

트리거 연속

GroupByKey 또는 Combine.perKey와 같은 집계 변환을 트리거를 지정한 PCollection에 적용하면 GroupByKey 또는 Combine.perKey가 새로운 PCollection 출력을 생성한다는 점에 유의하세요. 입력 컬렉션으로 설정한 트리거는 새로운 출력 컬렉션에 적용되지 않습니다.

대신 Dataflow SDK는 입력 컬렉션으로 지정한 트리거를 기반으로 PCollection 출력에 대한 비교 가능한 트리거를 생성합니다. 새로운 트리거는 PCollection 입력의 원래 트리거에서 지정한 비율과 유사한 비율로 가능한 한 빠르게 요소를 내보려고 시도합니다. Dataflow는 다음의 입력 트리거에 제공한 매개변수를 기준으로 새로운 트리거의 속성을 결정합니다.

  • AfterWatermark의 연속 트리거는 기본적으로 원래 트리거와 동일합니다. AfterWatermark 트리거가 일찍 또는 늦게 발생하도록 지정된 경우, 원래 트리거의 일찍 또는 늦은 발생이 연속됩니다.
  • AfterProcessingTime의 기본 연속 트리거가 병합된 요소의 동기화된 처리 시간 후에 연속적으로 발생하고 다른 추가 지연에 전파되지 않습니다. 예를 들어 AfterProcessingTime.pastFirstElementInPane().alignedTo(15 min).plusDelayOf(1 hour)와 같은 트리거를 가정해 보겠습니다. GroupByKey 후에 Dataflow가 출력 컬렉션에 공급한 트리거가 각 키에 할당된 동일한 시간으로 동기화되지만 1시간 지연은 유지하지 않습니다.
  • AfterCount의 기본 연속 트리거는 각 요소에서 발생합니다. 예를 들어 입력 컬렉션의 AfterCount(n)는 출력 컬렉션의 AfterCount(1)가 됩니다.

Dataflow가 GroupByKey 또는 Combine.perKeyPCollection 출력에 대해 생성한 트리거가 부족하다고 판단되면 해당 컬렉션의 새 트리거를 명시적으로 설정해야 합니다.

트리거 결합

Dataflow에서 여러 트리거를 결합해 복합 트리거를 형성할 수 있습니다. Dataflow의 복합 트리거 시스템을 사용해 논리적으로 여러 트리거를 결합할 수 있습니다. 또한 트리거를 지정해 반복적, 최대 한 번, 커스텀 조건에서 결과를 내보낼 수 있습니다.

복합 트리거 유형

Dataflow는 다음의 복합 트리거를 포함합니다.

  • AfterWatermark.pastEndOfWindow로 일찍 또는 늦은 발생을 추가할 수 있습니다.
  • Repeatedly.forever는 영구적으로 실행하는 트리거를 지정합니다. 트리거 조건을 충족할 때마다 윈도우는 결과를 내보낸 다음 재설정하고 처음부터 다시 시작합니다. Repeatedly.forever.orFinally를 결합해 반복 트리거를 중단하는 조건을 지정할 수 있습니다.
  • AfterEach.inOrder는 여러 트리거를 결합해 특정 시퀀스에서 트리거를 발생합니다. 시퀀스의 트리거가 윈도우를 내보낼 때마다 시퀀스는 다음 트리거를 진행합니다.
  • AfterFirst는 여러 트리거를 사용하고 해당 인수 트리거의 조건이 하나라도 처음 충족되는 시점에 결과를 내보냅니다. 이는 여러 트리거에 대한 OR 논리 연산자와 동일합니다.
  • AfterAll은 여러 트리거를 사용하고 해당 인수 트리거의 조건이 모두 충족되는 시점에 결과를 내보냅니다. 이는 여러 트리거에 대한 AND 논리 연산자와 동일합니다.
  • orFinally는 트리거가 마지막으로 한 번 발생하고 다시 발생하지 않도록 하는 최종 조건으로 작동할 수 있습니다.

AfterWatermark.pastEndOfWindow 구성

가장 유용한 복합 트리거의 일부는 시스템이 모든 데이터가 도착했다고 예측하는 시점(즉, 워터마크가 윈도우의 끝을 지나가는 시점)에 다음 중 하나 또는 둘 다를 결합해 한 번 발생합니다.

  • 부분 결과를 더 빠르게 처리하기 위해 워터마크가 윈도우 끝을 통과하기 전에 예측 발생
  • 늦게 도착하는 데이터를 처리하기 위해 워터마크가 윈도우 끝을 통과한 후에 발생하는 지연 발생

AfterWatermark.pastEndOfWindow를 사용해 이 패턴을 표현할 수 있습니다. 예를 들어 다음 트리거 코드 예시는 아래 조건에서 발생합니다.

  • 시스템이 모든 데이터가 도착했다고 예측(워터마크가 윈도우 끝을 지나가는 시점)
  • 10분 지연 후에 지연 데이터가 도착
  • 2일 후에 원하는 데이터가 더 이상 들어오지 않는다고 가정하고 트리거 실행을 중지

자바

  .apply(Window
      .triggering(AfterWatermark
           .pastEndOfWindow()
           .withLateFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(10))))
      .withAllowedLateness(Duration.standardDays(2)));

기타 복합 트리거

다른 유형의 복합 트리거를 만들 수도 있습니다. 다음 코드 예시는 창에 요소가 100개 이상이거나 1분이 지날 때마다 발생하는 간단한 복합 트리거를 보여줍니다.

자바

Repeatedly.forever(AfterFirst.of(
    AfterPane.elementCountAtLeast(100),
    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))

트리거 문법

다음 문법에서는 트리거를 결합하여 복합 트리거를 생성하는 다양한 방법을 설명합니다.

TRIGGER ::=
   ONCE_TRIGGER
   Repeatedly.forever(TRIGGER)
   TRIGGER.orFinally(ONCE_TRIGGER)
   AfterEach.inOrder(TRIGGER, TRIGGER, ...)

ONCE_TRIGGER ::=
  TIME_TRIGGER
  WATERMARK_TRIGGER
  AfterPane.elementCountAtLeast(Integer)
  AfterFirst.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)
  AfterAll.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)

TIME_TRIGGER ::=
  AfterProcessingTime.pastFirstElementInPane()
  TIME_TRIGGER.alignedTo(Duration)
  TIME_TRIGGER.alignedTo(Duration, Instant)
  TIME_TRIGGER.plusDelayOf(Duration)
  TIME_TRIGGER.mappedBy(Instant -> Instant)

WATERMARK_TRIGGER ::=
  AfterWatermark.pastEndOfWindow()
  WATERMARK_TRIGGER.withEarlyFirings(ONCE_TRIGGER)
  WATERMARK_TRIGGER.withLateFirings(ONCE_TRIGGER)

Default = Repeatedly.forever(AfterWatermark.pastEndOfWindow())
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.