컬렉션 및 값 결합

파이프라인을 사용하는 동안 데이터에서 값 컬렉션을 결합하거나 기타 방법으로 병합해야 하는 경우가 종종 있습니다. 예를 들어 특정 달의 주문으로 구성되고 각 항목은 달러 값이 포함된 판매 데이터 컬렉션이 있다고 가정합니다. 파이프라인에서 모든 주문값을 그 달에 대한 총 달러 주문 금액, 가장 큰 주문 또는 주문당 평균 달러 금액을 나타내는 단일 값으로 결합하고자 할 수 있습니다. 이러한 종류의 데이터를 얻기 위해 컬렉션에서 값을 결합합니다.

Dataflow SDK에는 파이프라인의 PCollection 객체의 값을 결합하거나 키-그룹화된 값을 결합하는 데 사용할 수 있는 여러 작업이 포함됩니다.

Combine 코어 변환은 PCollection에서 요소 또는 값을 결합하는 데 사용할 수 있는 다양한 일반 메서드를 캡슐화합니다. Combine에는 전체 PCollection에서 작동하는 변형 그리고 키-값 쌍의 PCollection에 있는 개별 값 스트림을 결합하는 일부 변형도 있습니다. 또한 Combine에는 합계, 최솟값, 최댓값, 평균값 등 특정한 숫자 결합 연산을 위한 서브클래스도 있습니다.

Combine 변환 적용 시 요소 또는 값을 결합하기 위한 실제 논리를 포함하는 함수를 제공해야 합니다. 자세한 내용은 이 섹션의 뒷부분에 있는 결합 함수 생성 및 지정을 참조하세요.

이러한 의미에서 Combine 변환은 각 요소에 제공하는 처리 함수에서 논리를 적용하는 ParDo 변환과 유사합니다.

PCollection을 단일 값으로 결합

지정된 PCollection의 모든 요소를 단일 요소로 결합하여 하나의 요소를 포함하는 새로운 PCollection으로 파이프라인에 표현할 수 있습니다. PCollection의 요소를 결합하려면 전역 결합 변환을 사용합니다.

윈도우를 사용하여 입력 PCollection을 나눈 경우에는 Combine이 다르게 동작합니다. 그럴 경우에는 전역 결합이 윈도우별로 하나의 요소를 반환합니다.

전역 결합은 각 요소 값을 결합하기 위해 제공되는 결합 함수를 사용합니다. 자세한 내용은 이 섹션의 뒷부분에 있는 결합 함수 생성 및 지정을 참조하세요.

Dataflow SDK는 합계, 최솟값, 최댓값, 평균값 등 일반적인 숫자 결합 연산을 위해 몇 가지 사전 빌드된 결합 함수를 제공합니다. 결합 함수를 직접 만드는 대신 이러한 함수를 전역 결합과 함께 사용할 수 있습니다. 자세한 내용은 포함된 통계적 결합 연산 사용을 참조하세요.

자바

다음 예시 코드는 Combine.globally 변환을 apply하여 Integer 유형의 PCollection의 단일 합계값을 생성합니다.

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()));

예시에서, Sum.SumIntegerFn()은 변환이 입력 PCollection의 요소를 결합하는 데 사용하는 CombineFn입니다. sum이라는 결과 PCollection에는 입력 PCollection에 있는 모든 요소의 합계만 포함됩니다.

전역 윈도우

입력 PCollection에서 기본 전역 윈도우를 사용하는 경우, Dataflow의 기본 동작은 한 항목을 포함한 PCollection을 반환하는 것입니다. 해당 항목의 값은 Combine 적용 시 지정한 결함 함수의 누산기에서 가져옵니다. 예를 들어 Dataflow의 Sum 결합 함수는 0 값(빈 입력의 합계)을 반환하는 반면, Min 결합 함수는 최댓값 또는 무한값을 반환합니다.

Combine을 원하는 경우 다음 코드 예시와 같이 입력이 비어있으면 빈 PCollection을 반환하고 Combine 변환 적용 시 .withoutDefaults를 지정합니다.

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

비전역 윈도우

PCollection에서 비전역 윈도우 함수를 사용하는 경우, Dataflow는 기본 동작을 제공하지 않습니다. Combine 적용 시 다음 옵션 중 하나를 반드시 지정해야 합니다.

  • 입력 PCollection에서 비어 있는 윈도우가 출력 컬렉션에서도 비어 있는 경우 .withoutDefaults를 지정합니다.
  • 출력이 PCollectionView로 즉시 변환되는 .asSingletonView를 지정합니다. 그러면 부차 입력으로 사용되면 각각의 빈 윈도우에 대한 기본값이 제공됩니다. 파이프라인의 Combine 결과가 파이프라인에서 나중에 부차 입력으로 사용되는 경우 일반적으로 이 옵션을 사용해야 합니다.

키 그룹 컬렉션에서 값 결합

키 그룹 컬렉션을 만들면(예: GroupByKey 변환을 사용하여) 공동 패턴은 각 키와 연관된 값 컬렉션을 단일 병합 값으로 결합하는 것입니다.

GroupByKey의 이전 예시에 이어서 groupedWords라는 키 그룹 PCollection은 다음과 같습니다.

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

위의 PCollection에서 각 요소는 문자열 키(예: 'cat')와 첫 번째 요소의 값이 [1, 5, 9]인 정수의 Iterable을 가지며 각 정수는 키가 표시된 원래 텍스트의 행 번호를 나타냅니다. 파이프라인의 다음 처리 단계에서 값을 개별적으로 판단하는 대신 값을 결합하는 경우, 반복 가능한 정수를 각 키에 쌍으로 연결할 병합된 단일 값으로 결합할 수 있습니다.

값을 결합하는 방법의 실제 논리는 사용자에게 달려 있습니다. 예를 들어 키 및 정수 값의 키 그룹 컬렉션이 제공되는 경우에는 각 키와 연관된 정수 컬렉션을 합산하도록 선택할 수 있습니다(값이 발생 횟수를 나타내는 경우 등에 유용함). 현재의 예에서는 값이 줄 번호를 나타내기 때문에 처음 나타난 곳을 나타내는 최소값을 사용하는 것이 좋습니다.

자바

  PCollection<KV<String, Integer>> occurrences = ...;
  PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());
  PCollection<KV<String, Integer>> firstOccurrences = pc.apply(
    Combine.groupedValues(new Min.MinIntegerFn()));

Combine PerKey 사용

Dataflow SDK에서 GroupByKey를 사용한 후에 값 컬렉션을 병합하는 이 전체 패턴을 Combine PerKey 변환이라고 합니다.

Combine PerKey는 패턴의 두 단계를 모두 수행합니다. GroupByKey와 마찬가지로 키-값 쌍의 PCollectionapply합니다. 그러면 Combine PerKey 변환이 각 키와 연관된 모든 값의 세트에 제공된 결합 함수를 적용합니다. 결합 연산은 새로운 키-값 쌍의 PCollection을 생성하고, 여기에는 고유 키 그리고 각 키에 대해 병합된 단일 값이 포함됩니다.

Combine PerKey는 연산 과정의 일환으로 GroupByKey 변환을 수행합니다. 따라서 윈도우를 사용하여 PCollection을 나누면 GroupByKey처럼 동작합니다. 즉, Combine PerKey는 키 및 윈도우별로 결합합니다.

ParDo를 생성하여 이러한 기능(즉, 컬렉션을 키그룹화한 다음 값을 결합)을 수행 시 CombineFn 변환의 더욱 구조화된 시맨틱스를 사용하면 Cloud Dataflow 서비스가 결합 작업을 더 효율적으로 실행할 수 있습니다.

Combine PerKey에 제공되는 결합 함수는 연관된 감소 함수이거나 CombineFn의 서브클래스여야 합니다. CombineFn 생성 방법에 대한 자세한 내용은 이 섹션의 뒷부분에 있는 결합 함수 생성 및 지정을 참조하세요.

자바

Combine.perKey를 사용하는 경우, 입력 PCollection의 키 및 값 유형에 따라, 유형 매개변수와 출력 PCollection의 병합된 값이 입력 PCollection의 값과 다른 유형이면 추가 유형 매개 변수에 따라 유형 매개변수를 전달합니다.

또한 각 값 컬렉션에 적용할 조합 논리를 포함하는 combine 함수를 Combine.perKey에 전달해야 합니다. 일반적으로 줄 밖에서 결합 함수를 정의합니다.

다음 예시 코드는 Combine.perKey 변환을 apply하는 방법을 보여줍니다. 이 예시에서 입력 PCollection은 키로 그룹화되고 각 키와 연결된 Double 값의 컬렉션은 단일 합계 값으로 결합됩니다.

  PCollection<KV<String, Double>> salesRecords = ...;
  PCollection<KV<String, Double>> totalSalesPerPerson =
    salesRecords.apply(Combine.<String, Double, Double>perKey(
      new Sum.SumDoubleFn()));

Combine.perKey에는 키 유형은 String, 값은 Double의 세 가지 유형 매개 변수가 사용됩니다. 이는 각 키의 값 컬렉션이 Double 유형이기 때문이고, 결과 결합 값도 Double 유형이 됩니다.

다음 예시 코드는 변환 값이 키별 값의 원래 컬렉션과 다른 유형인 Combine.perKey 변환을 apply하는 방법을 보여줍니다. 이 경우 입력 PCollection의 키 유형은 String이고 값 유형은 Integer이며, 결합된 값은 키별 모든 값의 평균 값을 나타내는 Double입니다.

  PCollection<KV<String, Integer>> playerAccuracy = ...;
  PCollection<KV<String, Double>> avgAccuracyPerPlayer =
    playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
      new MeanInts())));

자바는 종종 CombineFn 유형에서 이러한 type 매개변수를 추론할 수 있습니다. 자바 8은 이전 버전의 자바보다 이러한 추론 능력이 더 뛰어납니다.

결합 함수 생성 및 지정

Combine 변환을 사용하는 경우 변이에 무관하게 여러 요소를 단일 값으로 결합하는 방법을 지정하는 일부 처리 논리를 제공해야 합니다.

결합 함수를 설계할 때, 이 함수는 해당 키를 사용하는 모든 값에서 딱 한 번씩만 호출되는 것은 아니라는 점에 유의하세요. 입력 데이터(값 컬렉션 포함)는 여러 작업자 인스턴스에 분산될 수 있기 때문에 값 컬렉션의 하위 세트에 대한 부분 결합을 수행하기 위해 결합 함수가 여러 번 호출될 수도 있습니다. 일반적으로 결합은 트리 구조에서 반복적으로 적용될 수 있습니다. 트리 구조는 지정되어 있지 않기 때문에 결합 함수는 가환적이고 결합적이어야 합니다.

결합 함수를 생성할 때 몇 가지 옵션이 있습니다. 일반적으로, sum 같은 간단한 결합 연산은 값 세트를 같은 유형의 단일 값으로 결합하는 간단한 함수로 구현될 수 있습니다. 함수는 결합적이고 가환적이어야 합니다. 즉, 결합 함수 f 및 값 집합 v 1 , ..., v n의 경우 논리적으로 다음과 같아야 합니다. f(v1, v2, ... , vn) = f(v1, v2, ... , f(vk, ... , vn)) = f(f(v1, v2), ... , f(vk, ... , vn)) = f(f(vk, ... , vn), ... , f(v2, v1))

좀 더 복잡한 결합 연산의 경우에는 입력/출력 유형과 별개의 누적 유형이 있는 CombineFn의 서브클래스를 만들어야 할 수도 있습니다. 마지막으로, Dataflow SDK는 Sum, Min, Max, Mean 등 다양한 통계적 결합을 처리하는 몇 가지 공통 함수를 제공합니다. 자세한 내용은 포함된 통계적 결합 연산 사용을 참조하세요.

간단한 함수를 사용하는 간단한 결합

자바

Integer 값의 컬렉션을 하나의 Integer로 합산하는 것과 같은 간단한 결합 함수의 경우 SerializableFunction 인터페이스를 실행하는 간단한 함수를 만들 수 있습니다. SerializableFunctionIterable<V>V 유형의 단일 값으로 변환해야 합니다.

다음 예시 코드는 Integer 값 컬렉션을 합산하는 단순한 결합 함수를 보여줍니다. 함수 SumIntsSerializableFunction 인터페이스를 구현합니다.

  public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
      int sum = 0;
      for (int item : input) {
        sum += item;
      }
      return sum;
    }
  }

CombineFn을 사용한 고급 결합

좀 더 복잡한 결합 함수의 경우에는 SDK 클래스 CombineFn의 서브클래스를 정의할 수 있습니다. 출력 유형을 변경하거나, 더 정교한 누산기가 필요하거나, 키를 고려해야 하는 사전 또는 사후 처리를 결합 함수가 추가로 수행해야 할 때는 CombineFn을 사용해야 합니다.

계산의 분산 특성을 고려하세요. 각 요소의 전체 키/값 컬렉션이 지정된 시점에 같은 Compute Engine 인스턴스에 존재하지 않을 수 있습니다. 모든 값을 고려하지 않으면 제대로 수행할 수 없는 계산에는 CombineFn을 사용해야 합니다.

예를 들어 특정 키와 연관된 값 컬렉션의 평균값을 계산한다고 가정해 보세요. 평균값을 계산하려면 결합 함수가 모든 값을 합산한 후에 결과 합계를 합산된 값의 개수로 나눠야 합니다. 하지만 여러 분산된 위치에서 나누기를 수행했어야 하는 경우에는 결과 평균값이 정확하지 않을 수 있습니다. CombineFn를 사용하면 Cloud Dataflow 서비스에서 누적 합계와 표시된 요소 수를 누적하고 모든 요소가 누적될 때까지 최종 계산(이 경우에는 나누기)을 저장합니다.

일반적인 결합 작업은 네 가지 작업으로 구성됩니다. CombineFn 서브클래스 생성 시 해당 메서드를 재정의하여 4개의 연산을 제공해야 합니다.

  • 누산기 생성
  • 입력 추가
  • 누산기 병합
  • 출력 추출

어떤 이유로 결합 함수가 키/값 컬렉션 페어의 키에 대한 액세스가 필요한 경우 KeyedCombineFn을 사용할 수 있습니다. KeyedCombineFn은 키를 추가적인 type 매개변수로 전달합니다.

누산기 생성은 새로운 '로컬' 누산기를 생성합니다. 평균값을 사용하는 이 예의 경우, 로컬 누산기는 값의 누적 합계(최종 평균 나누기의 분자 값)와 지금까지 합산된 값의 개수(분모 값)를 추적합니다. 분산 방식으로 몇 번이든 호출될 수 있습니다.

입력 추가는 누산기 값을 반환하는 입력 요소를 누산기에 추가합니다. 이 예에서는 합계를 업데이트하고 개수를 1씩 늘립니다. 동시에 호출될 수도 있습니다.

누산기 병합은 여러 누산기를 단일 누산기로 병합합니다. 이는 여러 누산기에 있는 데이터를 최종 계산 전에 결합하는 방법입니다. 평균값 계산의 경우에는 나누기의 분모와 분자를 나타내는 누산기가 함께 병합됩니다. 해당 출력에서 여러 차례 다시 호출될 수 있습니다.

출력 추출은 최종 계산을 수행합니다. 평균값 계산의 경우에는 모든 값의 결합된 합계를 합산한 값의 개수로 나누는 것입니다. 마지막 병합된 누산기에서 한 번 호출됩니다.

다음 예시 코드는 CombineFn을 정의하여 평균을 계산하는 방법을 보여줍니다.

자바

 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
   public static class Accum {
     int sum = 0;
     int count = 0;
   }

   @Override
   public Accum createAccumulator() { return new Accum(); }

   @Override
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }

   @Override
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }

   @Override
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

CombineFn의 서브클래스를 만드는 데 필요한 유형 매개변수와 일반사항에 대한 자세한 내용은 CombineFn을 위한 자바용 API 참조 문서를 참조하세요.

누산기 유형 AccumT는 인코딩이 가능해야 합니다. 데이터 유형에 대한 기본 코더를 지정하는 방법은 데이터 인코딩을 참조하세요.

포함된 통계적 결합 연산 사용

자바

Dataflow SDK에는 기본적인 수학 결합 함수를 제공하는 여러 개의 클래스가 포함되어 있습니다. Combine.globally 또는 Combine.perKey를 사용하면 이러한 클래스의 결합 함수를 결합 함수로 사용할 수 있습니다. 다음과 같은 함수가 여기에 해당합니다.

  • Sum: 모든 값을 함께 더하여 단일 합계 값을 생성합니다.
  • Min: 사용 가능한 모든 값 중에서 최솟값만 유지합니다.
  • Max: 사용 가능한 모든 값 중에서 최댓값만 유지합니다.
  • Mean: 사용 가능한 모든 값에서 단일 평균값을 계산합니다.

각 클래스는 Integer, Long, Double 유형(Mean 클래스는 제외하고 일반 누적 결합 함수를 정의하고 유형 매개변수를 전달해야 함)에 대한 결합 함수를 정의합니다.

예를 들어 Sum.SumIntegerFn을 사용하여 Integer 값(본 예시에서는 키별 값) 컬렉션을 합산합니다.

  PCollection<KV<String, Integer>> playerScores = ...;
  PCollection<KV<String, Integer>> totalPointsPerPlayer =
    playerScores.apply(Combine.<String, Integer, Integer>perKey(
      new Sum.SumIntegerFn()));

마찬가지로 Max.MaxLongFn을 사용하여 PCollection<Long>에서 최대 Long 값을 계산할 수 있습니다.

  PCollection<Long> waitTimes = ...;
  PCollection<Long> longestWaitTime = waitTimes.apply(
    Combine.globally(new Max.MaxLongFn()));

Sum, Min, Max, Mean 클래스는 모두 패키지com.google.cloud.dataflow.sdk.transforms에 정의되어 있습니다. 자세한 내용은 Cloud Dataflow 자바 API 참조를 참조하세요.