GroupByKey 및 조인

GroupByKey 코어 변환은 키-값 쌍의 컬렉션을 처리하는 데 사용되는 동시 축소 작업입니다. 컬렉션에 동일한 키를 갖지만 값은 서로 다른 쌍이 여러 개 포함된 멀티맵을 나타내는 키-값 쌍의 PCollection 입력과 GroupByKey를 사용합니다. GroupByKey 변환을 사용하면 동일한 키를 공유하는 멀티맵의 모든 값을 모을 수 있습니다.

GroupByKey는 매핑/셔플/축소 스타일 알고리즘의 셔플 단계와 유사합니다. GroupByKey를 사용하여 고유 키와 연결된 모든 값을 수집합니다.

GroupByKey는 공통점을 가진 데이터를 집계하는 데 유용합니다. 예를 들어, 동일한 우편 번호의 고객 주문을 그룹화하려고 할 수 있습니다. 이때 '키'는 각 주문의 우편 번호이고 '값'은 주문의 나머지 부분입니다. 또는 사용자 ID 또는 발생한 시간에 따라 모든 사용자 쿼리를 그룹화할 수 있습니다.

또한 이러한 컬렉션이 공통 키를 공유하는 경우, 값 유형이 서로 다르더라도 키-값 쌍의 여러 컬렉션을 조인할 수 있습니다. 조인을 수행하는 데는 두 가지 방법이 있습니다. 한 가지 방법은 공통 키를 공유하는 여러 컬렉션에서 모든 유형의 모든 값을 그룹화할 수 있는 CoGroupByKey 변환을 사용하는 것입니다. 다른 방법은 한 개 이상의 부차 입력으로 ParDo를 사용하여 키-값 쌍의 여러 컬렉션을 조인하는 것입니다. 경우에 따라서 이 두 번째 방법이 CoGroupByKey를 사용하는 것보다 효율적일 수 있습니다.

조인은 관련된 사항에 대한 정보를 제공하는 여러 데이터세트(소스 여러 개가 있을 수 있음)가 있는 경우에 유용합니다. 예를 들어, 입찰 데이터가 있는 두 개의 파일이 있다고 가정합니다. 한 파일에는 입찰 ID, 입찰 데이터, 가격 데이터가 있습니다. 다른 파일에는 입찰 ID와 품목 설명이 있습니다. 입찰 ID를 공통 키로 사용하고 연결된 다른 데이터를 연결 값으로 사용하여 두 데이터세트를 조인할 수 있습니다. 조인 후에는 각 입찰 ID와 연결된 모든 정보(입찰, 가격, 품목)를 포함하는 데이터세트 한 개가 남습니다.

GroupByKey

GroupByKey의 작동 방식을 간단한 예를 통해 살펴보겠습니다. 여기서, 데이터세트는 텍스트 파일의 단어와 해당 단어가 나타나는 줄로 구성됩니다. 동일한 단어(키)를 공유하는 모든 줄 번호(값)를 그룹화하여 텍스트에서 특정 단어가 나타나는 모든 위치를 보려 합니다.

입력은 각 단어가 키이고 값은 파일에서 단어가 나타나는 줄 번호인 키-값 쌍 PCollection입니다. 입력 컬렉션의 키-값 쌍 목록은 다음과 같습니다.

  cat, 1
  dog, 5
  and, 1
  jump, 3
  tree, 2
  cat, 5
  dog, 2
  and, 2
  cat, 9
  and, 6
  ...

GroupByKey 변환은 키가 동일한 모든 값을 모으고 PCollection 입력에서 고유 키 및 해당 키와 연결된 모든 값의 컬렉션으로 구성된 새로운 쌍을 만듭니다. 위의 키-값 쌍 컬렉션에 GroupByKey를 적용하면 출력 컬렉션은 다음과 같습니다.

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

따라서 GroupByKey는 여러 키와 개별 값 간에 매핑된 멀티맵에서 고유 키와 값 컬렉션 간에 매핑된 유니맵으로의 변환을 나타냅니다.

자바

키-값 쌍 표시에 대한 참고 사항:
Dataflow 자바 SDK에서는 KV<K, V> 유형의 객체로 키-값 쌍이 나타납니다. KVK 유형의 키와 V 유형의 값을 갖는 특수 클래스입니다.

GroupByKey 변환을 통해 얻은 키 그룹 컬렉션의 일반적인 처리 패턴은 해당 키와 연결된 값을 병합된 해당 키와 연결된 병합된 단일 값으로 결합하는 것입니다. Dataflow SDK는 이러한 전체 패턴(키 그룹화 후 각 키값 결합)을 Combine 변환으로 캡슐화합니다. 자세한 내용은 컬렉션과 값 결합을 참조하세요.

GroupByKey 변환 적용

GroupByKey를 키-값 쌍 PCollection 입력에 적용해야 합니다. GroupByKey는 키-값 쌍의 새로운 PCollection을 반환합니다. 새 컬렉션에서 키는 고유하며 각 연결된 값 요소는 실제로 해당 키와 연결된 값이 한 개 이상 포함된 값 스트림입니다.

자바

다음 예 코드는 GroupByKeyKV<K, V> 객체의 PCollection에 적용하는 방법을 보여줍니다. 여기서, 각 요소 쌍은 K 유형의 키와 V 유형의 단일 값을 나타냅니다.

GroupByKey의 반환 값은 KV<K, Iterable<V>> 유형의 새로운 PCollection이며, 여기서 각 요소 쌍은 키와 값의 컬렉션을 자바 Iterable로 나타냅니다.

  // A PCollection of key/value pairs: words and line numbers.
  PCollection<KV<String, Integer>> wordsAndLines = ...;

  // Apply a GroupByKey transform to the PCollection "wordsAndLines".
  PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
    GroupByKey.<String, Integer>create());
자바 8은 GroupByKey.create 매개변수 유형을 추론할 수 있지만, 이전 버전의 자바에서는 이 유형을 명시적으로 지정해야 할 수 있습니다.

창 작업과 GroupByKey

자바

PCollection 입력을 단일 전역 윈도우가 아닌 여러 윈도우로 나누면 GroupByKey가 약간 다르게 동작합니다.

또한 GroupByKey 변환은 축소 수행 시 각 요소가 속한 창을 고려합니다. 기본적으로 각 키-값 쌍의 타임스탬프에 의해 결정되는 창은 보조 키로 작동합니다. GroupByKey는 윈도우를 통해 키와 윈도우 모두를 기준으로 그룹화됩니다. 모든 요소가 단일 전역 윈도우에 속한 경우에는 GroupByKey위에 설명된 단순한 시맨틱스로 퇴보합니다.

요소의 창은 그룹화를 위한 보조 키로 작동하지만 잠재적으로 더 강력할 수 있습니다. 요소는 한 개 이상의 창에 속할 수 있으며 겹치는 창은 병합될 수 있습니다. 이를 통해 보다 복잡한 그룹을 만들 수 있습니다.

앞의 예에 창 작업을 적용해 보겠습니다.

  cat, 1 (window 0)
  dog, 5 (window 0)
  and, 1 (window 0)

  jump, 3 (window 1)
  tree, 2 (window 1)
  cat, 5  (window 1)

  dog, 2 (window 2)
  and, 2 (window 2)
  cat, 9 (window 2)
  and, 6 (window 2)
  ...

GroupByKey는 동일한 키와 윈도우를 가진 모든 요소를 수집하여 다음과 같은 출력 컬렉션을 생성합니다.

  cat, [1] (window 0)
  dog, [5] (window 0)
  and, [1] (window 0)

  jump, [3] (window 1)
  tree, [2] (window 1)
  cat, [5]  (window 1)

  dog, [2]   (window 2)
  and, [2,6] (window 2)
  cat, [9]   (window 2)

이제 창이 출력 그룹에 영향을 줍니다. 다른 창에 있는 키-값 쌍은 함께 그룹화되지 않습니다.

CoGroupByKey로 조인

CoGroupByKey 변환은 데이터세트 두 개 이상의 관계형 조인을 수행합니다. CoGroupByKey는 키-값 쌍의 여러 PCollection의 값을 그룹화하며, 이 때 입력에서의 각 PCollection은 동일한 키 유형을 갖습니다.

자바

유형의 안전을 위해 Dataflow에서는 각 PCollectionKeyedPCollectionTuple의 일부로 전달하도록 요구합니다. CoGroupByKey로 전달하려는 각 PCollection 입력에 대해 TupleTag를 선언해야 합니다.

CoGroupByKey는 조인된 출력을 CoGbkResult 객체로 묶습니다.

간단한 예를 통해 CoGroupByKey의 작동 방식을 알아보겠습니다. KeyedPCollectionTuple에 묶을 입력 컬렉션이 두 개 있습니다. 첫 번째는 PCollection<K, V1>이며 tag1라는 TupleTag<V1>을 할당합니다. 이 PCollection의 키-값 쌍은 다음과 같습니다.

  key1 &map; v1
  key2 &map; v2
  key2 &map; v3

두 번째는 PCollection<K, V2>이며 tag2라는 TupleTag<V2>를 할당합니다. 이 컬렉션에는 다음이 포함됩니다.

  key1 &map; x1
  key1 &map; x2
  key3 &map; x3

결과 CoGbkResult 컬렉션에는 입력 컬렉션 중 하나의 각각의 고유 키와 연결된 모든 데이터가 포함됩니다. 반환된 데이터 유형은 PCollection<KV<K, CoGbkResult>>이고, 내용은 다음과 같습니다.

  key1 -> {
    tag1 &map; [v1]
    tag2 &map; [x1, x2]
  }
  key2 -> {
    tag1 &map; [v2, v3]
    tag2 &map; []
  }
  key3 -> {
    tag1 &map; []
    tag2 &map; [x3]
  }

CoGroupByKey를 적용한 후 적절한 TupleTag를 사용하여 각 컬렉션에서 데이터를 조회할 수 있습니다.

CoGroupByKey 적용

자바

CoGroupByKey는 입력으로 키가 지정된 PCollection(PCollection<KV<K, V>>)의 튜플을 허용합니다. CoGroupByKey는 출력으로 CoGbkResults라는 특수 유형을 반환하며, 이 유형은 공통 키를 기준으로 모든 PCollection 입력의 값을 그룹화합니다. 여러 컬렉션에 대해 TupleTag 메커니즘을 사용하여 CoGbkResults의 색인을 생성합니다. 최초 컬렉션에서 제공한 TupleTag를 사용하여 CoGbkResults 객체의 특정 컬렉션에 액세스할 수 있습니다.

다음은 별도의 PCollection으로 읽혀진 서로 다른 두 개의 데이터세트(아마도 소스가 다름)를 조인하는 예입니다.

  // Each data set is represented by key-value pairs in separate PCollections.
  // Both data sets share a common key type ("K").
  PCollection<KV<K, V1>> pc1 = ...;
  PCollection<KV<K, V2>> pc2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> tag1 = new TupleTag<V1>();
  final TupleTag<V2> tag2 = new TupleTag<V2>();

  // Merge collection values into a CoGbkResult collection.
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
                         .and(tag2, pc2)
                         .apply(CoGroupByKey.<K>create());

결과 PCollection<KV<K, CoGbkResult>>에서 각 키(모두 K 유형)에는 서로 다른 CoGbkResult가 있습니다. 즉, CoGbkResultTupleTag<T>에서 Iterable<T>로 매핑한 것입니다.

다음은 CoGroupByKey를 사용하고 결과 CoGbkResult를 소비하는 ParDo가 뒤따르는 또 다른 예입니다. 예를 들어, ParDo는 후속 처리를 위해 데이터 형식을 지정할 수 있습니다.

  // Each BigQuery table of key-value pairs is read into separate PCollections.
  // Each shares a common key ("K").
  PCollection<KV<K, V1>> pt1 = ...;
  PCollection<KV<K, V2>> pt2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> t1 = new TupleTag<V1>();
  final TupleTag<V2> t2 = new TupleTag<V2>();

  //Merge collection values into a CoGbkResult collection
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(t1, pt1)
                         .and(t2, pt2)
                         .apply(CoGroupByKey.<K>create());

  // Access results and do something.
  PCollection<T> finalResultCollection =
    coGbkResultCollection.apply(ParDo.of(
      new DoFn<KV<K, CoGbkResult>, T>() {
        @Override
        public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          // Get all collection 1 values
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          // Now get collection 2 values
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
          c.output(...some T...);
        }
      }));

제한되지 않은 PCollection으로 CoGroupByKey 사용

자바

CoGroupByKey을 사용하여 윈도우 전략이 적용된 PCollection을 그룹화하는 경우, 그룹화할 모든 PCollection에서 동일한 윈도우 전략과 윈도우 크기 지정을 사용해야 합니다. 예를 들어, 병합 중인 모든 컬렉션은 가설적으로 동일한 5분 고정 창 또는 30초마다 시작하는 4분 슬라이딩 창을 사용해야 합니다.

파이프라인이 CoGroupByKey를 사용하여 창이 호환되지 않는 PCollection을 병합하려 하는 경우, 파이프라인이 생성되면 Dataflow에서 IllegalStateException 오류가 발생합니다.

ParDo 및 부차 입력과 조인

CoGroupByKey를 사용하는 대신 부차 입력 한 개 이상으로 ParDo를 적용하여 조인을 수행할 수 있습니다. 부차 입력은 PCollection에 제공할 수 있는 기본 입력 이외의 추가 입력입니다. DoFnPCollection 입력의 요소를 처리할 때마다 부차 입력에 액세스할 수 있습니다.

이런 방식으로 조인을 수행하면 다음과 같은 경우에 CoGroupByKey를 사용하는 것보다 더 효율적일 수 있습니다.

  • 조인하는 PCollection의 크기가 맞지 않으며 더 작은 PCollection이 메모리에 들어가는 경우
  • 파이프라인의 여러 위치에 여러 번 조인하려는 대형 테이블이 있는 경우. CoGroupByKey를 여러 번 수행하는 대신 부차 입력 한 개를 만들어 여러 ParDo에 전달할 수 있습니다. 예를 들어, 두 테이블을 조인하여 세 번째 테이블을 생성하려 합니다. 그런 다음 첫 번째 테이블을 세 번째 테이블에 조인하려 합니다.

이런 방식으로 조인을 수행하려면 ParDoPCollection 중 하나에 적용하고 다른 PCollection을 부차 입력으로 전달합니다. 다음 코드 샘플은 이러한 조인을 수행하는 방법을 보여줍니다.

자바

// Each BigQuery table of key-value pairs is read into separate PCollections.
PCollection<KV<K1, V1>> mainInput = ...
PCollection<KV<K2, V2>> sideInput = ...

// Create a view from your side input data
final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

// Access side input and perform join logic
PCollection<T> joinedData =
mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
  @Override
  public void processElement(ProcessContext c) {
    K2 sideInputKey = ... transform c.element().getKey() into K2 ...
    V2 sideInputValue = c.sideInput(view).get(sideInputKey);
    V1 mainInputValue = c.element().getValue();
    ... Do Something ...
    c.output(...some T...);
  }
}));
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.