컬렉션 및 값 결합

파이프라인을 사용하는 동안 데이터에서 값 컬렉션을 결합하거나 기타 방법으로 병합해야 하는 경우가 종종 있습니다. 예를 들어 특정 달의 주문으로 구성되고 각 항목은 달러 값이 포함된 판매 데이터 컬렉션이 있다고 가정합니다. 파이프라인에서 모든 주문값을 그 달에 대한 총 달러 주문 금액, 가장 큰 주문 또는 주문당 평균 달러 금액을 나타내는 단일 값으로 결합하고자 할 수 있습니다. 이러한 종류의 데이터를 얻기 위해 컬렉션에서 값을 결합합니다.

Dataflow SDK에는 파이프라인의 PCollection 객체에 있는 값을 결합하거나 키 그룹 값을 결합하는 데 사용할 수 있는 다수의 연산자가 포함되어 있습니다.

Combine 코어 변환은 PCollection에 있는 요소 또는 값을 결합하는 데 사용할 수 있는 다양한 일반 메소드를 캡슐화합니다. Combine에는 전체 PCollection에서 작동하는 변형 그리고 키/값 쌍의 PCollection에 있는 개별 값 스트림을 결합하는 일부 변형도 있습니다. 또한 Combine에는 합계, 최솟값, 최댓값, 평균값 등 특정한 숫자 결합 연산을 위한 하위 클래스도 있습니다.

Combine 변환을 적용할 때는 요소 또는 값을 결합하기 위한 실제 논리를 포함하는 함수를 제공해야 합니다. 자세한 내용은 이 섹션의 뒷부분에 있는 결합 함수 생성 및 지정을 참조하세요.

이러한 측면에서 Combine 변환은 각 요소에 제공한 처리 함수에 있는 논리를 적용하는 ParDo 변환과 유사합니다.

PCollection을 단일 값으로 결합

특정 PCollection에 있는 모든 요소를 단일 값으로 결합하고, 파이프라인에서 한 요소를 포함하고 있는 새로운 PCollection으로 나타낼 수 있습니다. PCollection의 요소를 결합하려면 전역 결합 변환을 사용합니다.

윈도우를 사용하여 입력 PCollection을 나눈 경우에는 Combine이 다르게 동작합니다. 그럴 경우에는 전역 결합이 윈도우별로 하나의 요소를 반환합니다.

전역 결합은 각 요소 값을 결합하기 위해 제공되는 결합 함수를 사용합니다. 자세한 내용은 이 섹션의 뒷부분에 있는 결합 함수 생성 및 지정을 참조하세요.

Dataflow SDK는 합계, 최소값, 최대값, 평균값 등 일반적인 숫자 결합 연산을 위해 몇 가지 사전 빌드된 결합 함수를 제공합니다. 결합 함수를 직접 만드는 대신 이러한 함수를 전역 결합과 함께 사용할 수 있습니다. 자세한 내용은 포함된 통계적 결합 연산 사용을 참조하세요.

자바

다음 코드 예시는 Combine.globally 변환을 apply하여 Integer 유형의 PCollection에 대한 단일 합계 값을 생성하는 방법을 보여줍니다.

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()));

이 예에서 Sum.SumIntegerFn()은 변환이 입력 PCollection에 있는 요소를 결합하는 데 사용하는 CombineFn입니다. sum이라는 결과 PCollection에는 입력 PCollection에 있는 모든 요소의 합계만 포함됩니다.

전역 윈도우

입력 PCollection에서 기본 전역 윈도우를 사용하는 경우, Dataflow의 기본 동작은 한 항목을 포함한 PCollection을 반환하는 것입니다. 해당 항목의 값은 Combine 적용 시 지정한 결함 함수의 누산기에서 가져옵니다. 예를 들어 Dataflow의 Sum 결합 함수는 0 값(빈 입력의 합계)을 반환하는 반면, Min 결합 함수는 최대값 또는 무한값을 반환합니다.

입력이 비어 있을 경우 Combine이 대신 빈 PCollection을 반환하도록 하려면 다음 코드 예시와 같이 Combine 변환을 적용할 때 .withoutDefaults를 지정하세요.

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

비전역 윈도우

PCollection에서 비전역 윈도우 함수를 사용하는 경우, Dataflow는 기본 동작을 제공하지 않습니다. Combine 적용 시 다음 옵션 중 하나를 지정해야 합니다.

  • 입력 PCollection에서 비어 있는 윈도우가 출력 컬렉션에서도 비어 있는 경우 .withoutDefaults를 지정합니다.
  • 출력이 PCollectionView로 즉시 변환되는 .asSingletonView를 지정합니다. 그러면 부차 입력으로 사용되면 각각의 빈 윈도우에 대한 기본값이 제공됩니다. 일반적으로 파이프라인의 Combine 결과를 파이프라인의 부차 입력으로 나중에 사용할 경우에만 이 옵션을 사용하면 됩니다.

키 그룹 컬렉션에서 값 결합

GroupByKey 변환을 사용하는 방법 등으로 키 그룹 컬렉션을 만든 후에는 각 키와 연관된 값 컬렉션을 병합된 단일 값으로 결합하는 것이 일반적인 패턴입니다.

GroupByKey의 이전 예에 이어서 groupedWords라는 키 그룹 PCollection은 다음과 같습니다.

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

위의 PCollection에서는 각 요소에 문자열 키(예: "cat")가 있고 해당 값으로는 반복 가능한 정수가 사용됩니다(첫 번째 요소는 [1, 5, 9]를 포함하고 있음). 여기에서 각 정수는 원래 텍스트에서 키가 나타난 줄 번호를 나타냅니다. 파이프라인의 다음 처리 단계에서 값을 개별적으로 판단하는 대신 값을 결합하는 경우, 반복 가능한 정수를 각 키에 쌍으로 연결할 병합된 단일 값으로 결합할 수 있습니다.

값을 결합하는 방법의 실제 논리는 사용자에게 달려 있습니다. 예를 들어 키 및 정수 값의 키 그룹 컬렉션이 제공되는 경우에는 각 키와 연관된 정수 컬렉션을 합산하도록 선택할 수 있습니다(값이 발생 횟수를 나타내는 경우 등에 유용함). 현재의 예에서는 값이 줄 번호를 나타내기 때문에 처음 나타난 곳을 나타내는 최소값을 사용하는 것이 좋습니다.

자바

  PCollection<KV<String, Integer>> occurrences = ...;
  PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());
  PCollection<KV<String, Integer>> firstOccurrences = pc.apply(
    Combine.groupedValues(new Min.MinIntegerFn()));

Combine PerKey 사용

Dataflow SDK에서 GroupByKey를 사용한 후에 값 컬렉션을 병합하는 이 전체 패턴을 Combine PerKey 변환이라고 합니다.

Combine PerKey는 패턴의 두 단계를 모두 수행합니다. GroupByKey와 마찬가지로 키-값 쌍의 PCollectionapply합니다. 그러면 Combine PerKey 변환이 각 키와 연관된 모든 값의 세트에 제공된 결합 함수를 적용합니다. 결합 연산은 새로운 키/값 쌍의 PCollection을 생성하고, 여기에는 고유 키 그리고 각 키에 대해 병합된 단일 값이 포함됩니다.

Combine PerKey는 연산 과정의 일환으로 GroupByKey 변환을 수행합니다. 따라서 윈도우를 사용하여 PCollection을 나누면 GroupByKey처럼 동작합니다. 즉, Combine PerKey는 키 및 윈도우별로 결합합니다.

ParDo를 생성하여 이러한 종류의 함수를 수행할 수 있지만(즉, 컬렉션을 키 그룹화한 후에 값을 결합하는 방식), 보다 구조화된 CombineFn 변환의 의미 체계는 Cloud Dataflow 서비스가 결합 연산을 더 효율적으로 실행할 수 있도록 해줍니다.

Combine PerKey에 제공되는 결합 함수는 연관된 감소 함수이거나 CombineFn의 하위 클래스여야 합니다. CombineFn 생성 방법에 대한 자세한 내용은 이 섹션의 뒷부분에 있는 결합 함수 생성 및 지정을 참조하세요.

자바

Combine.perKey를 사용할 때는 입력 PCollection에 있는 키 및 값의 유형에 따라 type 매개변수를 전달하고, 출력 PCollection에 있는 병합된 값이 입력 PCollection에 있는 값과 다른 유형일 경우 추가적인 type 매개변수를 전달합니다.

각 값 컬렉션에 적용할 결합 논리를 포함하는 결합 함수도 Combine.perKey에 전달해야 합니다. 일반적으로 줄 밖에서 결합 함수를 정의합니다.

다음 코드 예시는 Combine.perKey 변환을 apply하는 방법을 보여줍니다. 이 예시에서는 입력 PCollection이 키별로 그룹화되고, 각 키와 연관된 Double 값의 컬렉션이 단일 합계 값으로 결합됩니다.

  PCollection<KV<String, Double>> salesRecords = ...;
  PCollection<KV<String, Double>> totalSalesPerPerson =
    salesRecords.apply(Combine.<String, Double, Double>perKey(
      new Sum.SumDoubleFn()));

Combine.perKey가 세 개의 type 매개변수를 적용합니다. 키 유형에는 String, 값에는 Double을 적용합니다. 이는 각 키의 값 컬렉션이 Double 유형이기 때문이고, 결과 결합 값도 Double 유형이 됩니다.

다음 예시 코드는 결합된 값이 키마다 원래 값 컬렉션과 다른 유형인 위치에 Combine.perKey 변환을 apply하는 방법을 보여줍니다. 이 예시에서는 입력 PCollectionString 유형의 키와 Integer 유형의 값이 있고, 결합된 값은 키마다 모든 값의 평균값을 나타내는 Double입니다.

  PCollection<KV<String, Integer>> playerAccuracy = ...;
  PCollection<KV<String, Double>> avgAccuracyPerPlayer =
    playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
      new MeanInts())));

자바는 종종 CombineFn 유형에서 이러한 type 매개변수를 추론할 수 있습니다. 자바 8은 이전 버전의 자바보다 이러한 추론 능력이 더 뛰어납니다.

결합 함수 생성 및 지정

Combine 변환을 사용할 때는 변형에 관계없이, 여러 요소를 단일 값으로 결합하는 방법을 지정하는 몇 가지 처리 논리를 제공해야 합니다.

결합 함수를 설계할 때, 이 함수는 해당 키를 사용하는 모든 값에서 딱 한 번씩만 호출되는 것은 아니라는 점에 유의하세요. 입력 데이터(값 컬렉션 포함)는 여러 작업자 인스턴스에 분산될 수 있기 때문에 값 컬렉션의 하위 세트에 대한 부분 결합을 수행하기 위해 결합 함수가 여러 번 호출될 수도 있습니다. 일반적으로 결합은 트리 구조에서 반복적으로 적용될 수 있습니다. 트리 구조는 지정되어 있지 않기 때문에 결합 함수는 가환적이고 결합적이어야 합니다.

결합 함수를 생성할 때 몇 가지 옵션이 있습니다. 일반적으로, sum 같은 간단한 결합 연산은 값 세트를 같은 유형의 단일 값으로 결합하는 간단한 함수로 구현될 수 있습니다. 함수는 결합적이고 가환적이어야 합니다. 다시 말해, 결합 함수 f 및 값 집합 v1, ..., vn의 경우 논리적으로 다음과 같아야 합니다. f(v1, v2, ... , vn) = f(v1, v2, ... , f(vk, ... , vn)) = f(f(v1, v2), ... , f(vk, ... , vn)) = f(f(vk, ... , vn), ... , f(v2, v1)) 등등

좀더 복잡한 결합 연산의 경우에는 입력/출력 유형과 별개의 누적 유형이 있는 CombineFn의 하위 클래스를 만들어야 할 수도 있습니다. 마지막으로, Dataflow SDK는 Sum, Min, Max, Mean 등 다양한 통계적 결합을 처리하는 몇 가지 공통 함수를 제공합니다. 자세한 내용은 포함된 통계적 결합 연산 사용을 참조하세요.

간단한 함수를 사용하는 간단한 결합

자바

Integer 값 컬렉션을 단일 Integer로 합산하는 등의 간단한 결합 함수의 경우, SerializableFunction 인터페이스를 구현하는 간단한 함수를 생성할 수 있습니다. SerializableFunctionIterable<V>V 유형의 단일 값으로 변환해야 합니다.

다음 코드 예시는 Integer 값 컬렉션을 합산하기 위한 간단한 결합 함수를 보여줍니다. SumInts 함수가 SerializableFunction 인터페이스를 구현합니다.

  public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
      int sum = 0;
      for (int item : input) {
        sum += item;
      }
      return sum;
    }
  }

CombineFn을 사용한 고급 결합

좀 더 복잡한 결합 함수의 경우에는 SDK 클래스 CombineFn의 하위 클래스를 정의할 수 있습니다. 출력 유형을 변경하거나, 더 정교한 누산기가 필요하거나, 키를 고려해야 하는 사전 또는 사후 처리를 결합 함수가 추가로 수행해야 할 때는 CombineFn을 사용해야 합니다.

계산의 분산 특성을 고려하세요. 각 요소의 전체 키/값 컬렉션이 지정된 시점에 같은 Compute Engine 인스턴스에 존재하지 않을 수 있습니다. 모든 값을 고려하지 않고서는 제대로 수행할 수 없는 계산의 경우 CombineFn을 사용해야 합니다.

예를 들어 특정 키와 연관된 값 컬렉션의 평균값을 계산한다고 가정해보세요. 평균값을 계산하려면 결합 함수가 모든 값을 합산한 후에 결과 합계를 합산된 값의 개수로 나눠야 합니다. 하지만 여러 분산된 위치에서 나누기를 수행했어야 하는 경우에는 결과 평균값이 정확하지 않을 수 있습니다. CombineFn을 사용하면 Cloud Dataflow 서비스가 누적 합계와 표시된 요소 수를 모두 누적하고, 모든 요소가 누적될 때까지 최종 계산(이 경우에는 나누기)을 보류합니다.

일반적인 결합 연산은 네 가지 연산으로 구성됩니다. CombineFn의 하위 클래스를 생성할 때 해당 메소드를 재정의함으로써 네 가지 연산을 제공해야 합니다.

  • 누산기 생성
  • 입력 추가
  • 누산기 병합
  • 출력 추출

어떠한 이유로든 결합 함수가 키/값 컬렉션 쌍에 있는 키에 액세스해야 하는 경우에는 KeyedCombineFn을 대신 사용할 수 있습니다. KeyedCombineFn은 키를 추가적인 type 매개변수로 전달합니다.

누산기 생성은 새로운 "로컬" 누산기를 생성합니다. 평균값을 사용하는 이 예의 경우, 로컬 누산기는 값의 누적 합계(최종 평균 나누기의 분자 값)와 지금까지 합산된 값의 개수(분모 값)를 추적합니다. 분산 방식으로 몇 번이든 호출될 수 있습니다.

입력 추가는 누산기 값을 반환하는 입력 요소를 누산기에 추가합니다. 이 예에서는 합계를 업데이트하고 개수를 1씩 늘립니다. 동시에 호출될 수도 있습니다.

누산기 병합은 여러 누산기를 단일 누산기로 병합합니다. 이는 여러 누산기에 있는 데이터를 최종 계산 전에 결합하는 방법입니다. 평균값 계산의 경우에는 나누기의 분모와 분자를 나타내는 누산기가 함께 병합됩니다. 해당 출력에서 여러 차례 다시 호출될 수 있습니다.

출력 추출은 최종 계산을 수행합니다. 평균값 계산의 경우에는 모든 값의 결합된 합계를 합산한 값의 개수로 나누는 것입니다. 마지막 병합된 누산기에서 한 번 호출됩니다.

다음 코드 예시는 평균값을 계산하도록 CombineFn을 정의하는 방법을 보여줍니다.

자바

 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
   public static class Accum {
     int sum = 0;
     int count = 0;
   }

   @Override
   public Accum createAccumulator() { return new Accum(); }

   @Override
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }

   @Override
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }

   @Override
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

CombineFn의 하위 클래스를 만드는 데 필요한 유형 매개변수와 일반사항에 대한 자세한 내용은 CombineFn을 위한 자바용 API 참조 문서를 참조하세요.

누산기 유형 AccumT는 인코딩이 가능해야 합니다. 데이터 유형에 대한 기본 코더를 지정하는 방법은 데이터 인코딩을 참조하세요.

포함된 통계적 결합 연산 사용

자바

Dataflow SDK에는 기본적인 수학 결합 함수를 제공하는 여러 개의 클래스가 포함되어 있습니다. Combine.globally 또는 Combine.perKey를 사용할 때 이러한 클래스의 결합 함수를 결합 함수로 사용할 수 있습니다. 다음과 같은 함수가 여기에 해당합니다.

  • Sum: 모든 값을 함께 더하여 단일 합계 값을 생성합니다.
  • Min: 사용 가능한 모든 값 중에서 최소값만 유지합니다.
  • Max: 사용 가능한 모든 값 중에서 최대값만 유지합니다.
  • Mean: 사용 가능한 모든 값에서 단일 평균값을 계산합니다.

각 클래스는 Integer, Long, Double 유형의 결합 함수를 정의합니다(type 매개변수를 전달해야 하는 일반 누적 결합 함수를 정의하는 Mean 클래스는 제외).

예를 들어 Sum.SumIntegerFn을 사용하여 Integer 값 컬렉션을 합산할 수 있습니다. 이 예에서는 키별로 합산합니다.

  PCollection<KV<String, Integer>> playerScores = ...;
  PCollection<KV<String, Integer>> totalPointsPerPlayer =
    playerScores.apply(Combine.<String, Integer, Integer>perKey(
      new Sum.SumIntegerFn()));

마찬가지로, 아래와 같은 PCollection<Long>에서 Max.MaxLongFn을 사용하여 최대 Long 값을 계산할 수 있습니다.

  PCollection<Long> waitTimes = ...;
  PCollection<Long> longestWaitTime = waitTimes.apply(
    Combine.globally(new Max.MaxLongFn()));

Sum, Min, Max, Mean 클래스는 모두 com.google.cloud.dataflow.sdk.transforms 패키지에서 정의됩니다. 자세한 내용은 Cloud Dataflow 자바 API 참조를 참조하세요.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.