GroupByKey 및 조인

GroupByKey 코어 변환은 키-값 쌍의 컬렉션을 처리하는 데 사용되는 병렬 축소 작업입니다. 컬렉션에 키가 같지만 값이 다른 여러 쌍이 포함된 멀티 맵을 나타내는 키-값 쌍의 입력 PCollection을 가지는 GroupByKey사용합니다 . GroupByKey 변환을 사용하면 동일한 키를 공유하는 멀티 맵의 모든 값을 모을 수 있습니다.

GroupByKey은 Map/Shuffle/Reduce 스타일 알고리즘의 Shuffle 단계와 유사합니다. GroupByKey를 사용하여 고유 키와 연관된 모든 값을 수집합니다.

GroupByKey는 공통점이있는 데이터를 집계하는 좋은 방법입니다. 예를 들어, 동일한 우편 번호의 고객 주문을 그룹화하려고 할 수 있습니다. 이때 '키'는 각 주문의 우편 번호이고 '값'은 주문의 나머지 부분입니다. 아니면 사용자 ID 또는 발생한 시간 별로 모든 사용자 쿼리를 그룹화할 수 있습니다.

또한 이러한 컬렉션이 공통 키를 공유하는 경우(값 유형이 서로 다르더라도), 키-값 쌍의 여러 컬렉션을 조인할 수 있습니다. 조인을 수행하는 데는 두 가지 방법이 있습니다. 한 가지 방법은 공통 키를 공유하는 여러 컬렉션에서 모든 유형의 값을 그룹화할 수 있는 CoGroupByKey 변환을 사용하는 것입니다. 다른 방법은 한 개 이상의 부차 입력으로 ParDo를 사용하여 키-값 쌍의 여러 컬렉션을 조인하는 것입니다. 경우에 따라서는 두 번째 방법이 CoGroupByKey를 사용하는 것보다 효율적일 수 있습니다.

조인은 관련된 사항에 대한 정보를 제공하는 가능한 다양한 소스의 여러 데이터 조합이 있는 경우에 유용합니다. 예를 들어, 입찰 데이터가 있는 두 개의 파일이 있다고 가정합니다. 한 파일에는 입찰 ID, 입찰 데이터, 가격 데이터가 있습니다. 다른 파일에는 입찰 ID와 품목 설명이 있습니다. 입찰 ID를 공통 키로 사용하고 연결된 다른 데이터를 연결 값으로 사용하여 두 데이터 조합을 조인할 수 있습니다. 조인 후에는 각 입찰 ID와 연결된 모든 정보(입찰, 가격, 품목)를 포함하는 데이터 조합 한 개가 남습니다.

GroupByKey

간단한 예를 들어 GroupByKey 메커니즘을 살펴 보겠습니다. 데이터 세트는 텍스트 파일의 단어와 텍스트가 표시되는 선으로 구성됩니다. 동일한 단어(키)를 공유하는 모든 선 번호(값)를 그룹화하여 텍스트에서 특정 단어가 나타나는 모든 장소를 보려 합니다.

입력은 각 단어가 키이고 값이 단어가 나타나는 파일의 선 번호인 키-값 쌍의 PCollection입니다. 입력 컬렉션의 키-값 쌍 목록은 다음과 같습니다.

      cat, 1
      dog, 5
      and, 1
      jump, 3
      tree, 2
      cat, 5
      dog, 2
      and, 2
      cat, 9
      and, 6
      ...
    

GroupByKey 변환은 동일한 키를 가진 모든 값을 수집하고 고유 한 키와 해당 키에 연결된 모든 값의 집합으로 구성된 새 쌍을 입력 PCollection에 생성합니다. . 위의 키-값 쌍의 컬렉션에 GroupByKey을 적용하면 출력 컬렉션은 다음과 같습니다.

      cat, [1,5,9]
      dog, [5,2]
      and, [1,2,6]
      jump, [3]
      tree, [2]
      ...
    

따라서, GroupByKey는 여러 키를 개별 키 값으로 매핑한 지도 multi-map에서 컬렉션 값에 대한 고유 키의 지도 uni-map로의 변환을 나타냅니다.

자바

키-값 쌍을 나타내는 방법에 대한 참고 사항:
Dataflow 자바 SDK에서 KV<K, V>유형의 객체를 가진 키-값 쌍을 나타냅니다. KVK 유형의 키와 V 유형의 값을 갖는 특수 클래스입니다.

키 그룹화 된 컬렉션(GroupByKey 변환의 결과)의 일반적인 처리 패턴은 각 키에 연결된 값을 해당 키와 연결된 단일 병합 값으로 결합하는 것입니다. Dataflow SDK는 이 전체 패턴을 Combine 변환으로 캡슐화(키 그룹화 한 다음 각 키의 값을 결합)합니다. 자세한 내용은 컬렉션과 값 결합을 참조하세요.

GroupByKey 변환 적용

GroupByKey을 키-값 쌍의 입력 PCollection에 적용해야합니다. GroupByKey는 키-값 쌍의 새로운 PCollection을 반환합니다. 새 컬렉션에서 키는 고유하고 각 연결된 값 요소는 실제로 키와 연결된 하나 이상의 값을 포함하는 값 스트림입니다.

자바

다음 예제 코드는 GroupByKey을 각 요소 쌍이 K 유형의 키 및 V 유형의 단일 값을 나타내는 KV<K, V>객체의 PCollection에 적용하는 방법을 보여줍니다.

GroupByKey의 반환 값은 각 요소 쌍이 키와 자바 Iterable 값 모음을 나타내는 KV<K, Iterable<V>>유형의 새로운 PCollection 입니다.

      // A PCollection of key/value pairs: words and line numbers.
      PCollection<KV<String, Integer>> wordsAndLines = ...;

      // Apply a GroupByKey transform to the PCollection "wordsAndLines".
      PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
        GroupByKey.<String, Integer>create());
    
자바 8은 GroupByKey.create의 매개 변수 유형을 추론할 수 있지만 이전 버전의 자바에서는 명시적으로 지정해야합니다.

기간 설정 작업과 GroupByKey

자바

GroupByKey은 입력 PCollection가 단일 전역 기간이 아닌 여러 기간으로 분할될 때 약간 다르게 동작합니다.

GroupByKey 변환은 축소를 수행할 때 각 요소가 속한 기간도 고려합니다. 기본적으로 각 키-값 쌍의 타임스탬프에 의해 결정되는 기간은 보조 키로 작동합니다. GroupByKey는 따라서 키와 기간으로 기간과 함께 그룹화됩니다. 모든 요소가 단일 전역 기간에 속한 경우, GroupByKey위에서 설명한 단순한 시맨틱스로 변합니다.

요소의 기간 설정은 그룹화를 위한 보조 키로 작동하지만 잠재적으로 더 강력할 수 있습니다. 요소는 한 개 이상의 기간 설정에 속할 수 있으며 겹치는 기간 설정은 병합될 수 있습니다. 이를 통해 보다 복잡한 그룹을 생성할 수 있습니다.

앞의 예에 기간 설정 작업을 적용해 보겠습니다.

      cat, 1 (window 0)
      dog, 5 (window 0)
      and, 1 (window 0)

      jump, 3 (window 1)
      tree, 2 (window 1)
      cat, 5  (window 1)

      dog, 2 (window 2)
      and, 2 (window 2)
      cat, 9 (window 2)
      and, 6 (window 2)
      ...
    

GroupByKey는 동일 키 및 기간를 사용하여 모든 요소를 수집하고 다음과 같은 출력 컬렉션을 생성합니다.

      cat, [1] (window 0)
      dog, [5] (window 0)
      and, [1] (window 0)

      jump, [3] (window 1)
      tree, [2] (window 1)
      cat, [5]  (window 1)

      dog, [2]   (window 2)
      and, [2,6] (window 2)
      cat, [9]   (window 2)
    

이제 기간 설정이 출력 그룹에 영향을 줍니다. 다른 기간 설정 에 있는 키-값 쌍은 함께 그룹화되지 않습니다.

CoGroupByKey로 조인

CoGroupByKey변환은 2개 이상 데이터 세트의 관계형 조인을 실행합니다. CoGroupByKey는 키-값 쌍의 여러 PCollection값을 그룹화하는데 입력의 각 PCollection은 키 유형이 동일합니다.

자바

유형 안전을 위해 Dataflow를 사용하려면 KeyedPCollectionTuple의 일부로 각 PCollection을 전달해야 합니다. CoGroupByKey로 전달하려는 PCollection입력마다 TupleTag을 선언해야 합니다.

CoGroupByKey는 조인된 출력을 CoGbkResult 객체로 함께 묶습니다.

간단한 예제를 사용하여 CoGroupByKey의 메커니즘을 보여 드리겠습니다. 두 개의 입력 컬렉션을 하나의 KeyedPCollectionTuple에 가집니다. 첫 번째는 PCollection<K, V1> 따라서 Google은 TupleTag<V1>호출된 tag1을 할당합니다. PCollection 키-값 쌍은 다음과 같습니다.

      key1 ↦ v1
      key2 ↦ v2
      key2 ↦ v3
    

두 번째는 PCollection<K, V2> 따라서 Google은 TupleTag<V2>에 호출된 tag2을 할당합니다. 이 컬렉션에는 다음이 포함됩니다.

      key1 ↦ x1
      key1 ↦ x2
      key3 ↦ x3
    

결과 CoGbkResult 컬렉션에는 입력 컬렉션 중 하나의 고유 키와 관련된 모든데이터가 포함됩니다. 반환된 데이터 유형은 다음 내용을 포함하는 PCollection<KV<K, CoGbkResult>>입니다.

      key1 -> {
        tag1 ↦ [v1]
        tag2 ↦ [x1, x2]
      }
      key2 -> {
        tag1 ↦ [v2, v3]
        tag2 ↦ []
      }
      key3 -> {
        tag1 ↦ []
        tag2 ↦ [x3]
      }
    

CoGroupByKey를 적용한 후 적절한 TupleTag를 사용하여 각 컬렉션의 데이터를 조회할 수 있습니다.

CoGroupByKey 적용

자바

CoGroupByKey은 입력으로 키가 있는 PCollection(PCollection<KV<K, V>>)의 튜플을 받습니다. 출력으로서의 CoGroupByKey는 일반 키로 모든 입력 PCollection값을 그룹화하는 특수 유형의 호출된 CoGbkResults을 반환합니다. 여러 컬렉션TupleTag 메커니즘을 사용하여 CoGbkResults의 색인을 생성합니다. 초기 컬렉션과 함께 제공한 TupleTag을 사용하여 CoGbkResults 객체의 특정 컬렉션에 액세스할 수 있습니다.

다음은 별도의 PCollection로 읽은 두 데이터 세트를 다른 소스에서 결합하는 예입니다.

      // Each data set is represented by key-value pairs in separate PCollections.
      // Both data sets share a common key type ("K").
      PCollection<KV<K, V1>> pc1 = ...;
      PCollection<KV<K, V2>> pc2 = ...;

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> tag1 = new TupleTag<V1>();
      final TupleTag<V2> tag2 = new TupleTag<V2>();

      // Merge collection values into a CoGbkResult collection.
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(tag1, pc1)
                             .and(tag2, pc2)
                             .apply(CoGroupByKey.<K>create());
    

결과 PCollection<KV<K, CoGbkResult>>에서 각 키(모두 K 유형)에는 서로 다른 CoGbkResult가 있습니다. 즉, CoGbkResultTupleTag<T>에서 Iterable<T>까지의 맵입니다.

다음은 CoGroupByKey , 다음으로 ParDo 그 결과 CoGbkResult를 사용하는 또 다른 예입니다. 예를 들어 ParDo는 나중에 처리할 데이터를 형성할 수 있습니다.

      // Each BigQuery table of key-value pairs is read into separate PCollections.
      // Each shares a common key ("K").
      PCollection<KV<K, V1>> pt1 = ...;
      PCollection<KV<K, V2>> pt2 = ...;

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> t1 = new TupleTag<V1>();
      final TupleTag<V2> t2 = new TupleTag<V2>();

      //Merge collection values into a CoGbkResult collection
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(t1, pt1)
                             .and(t2, pt2)
                             .apply(CoGroupByKey.<K>create());

      // Access results and do something.
      PCollection<T> finalResultCollection =
        coGbkResultCollection.apply(ParDo.of(
          new DoFn<KV<K, CoGbkResult>, T>() {
            @Override
            public void processElement(ProcessContext c) {
              KV<K, CoGbkResult> e = c.element();
              // Get all collection 1 values
              Iterable<V1> pt1Vals = e.getValue().getAll(t1);
              // Now get collection 2 values
              V2 pt2Val = e.getValue().getOnly(t2);
              ... Do Something ....
              c.output(...some T...);
            }
          }));
    

제한되지 않은 PCollection으로 CoGroupByKey 사용

자바

CoGroupByKey을 기간 전략이 적용된 그룹 PCollection에 사용할 경우, 그룹화할 모든 PCollection같은 기간 전략과 기간 설정 크기를 사용해야 합니다. 예를 들어, 병합 중인 모든 컬렉션은 가설적으로 동일 5분 고정 기간 설정 또는 30초마다 시작하는 4분 슬라이딩 기간 설정을 사용해야 합니다.

파이프 라인이 CoGroupByKey을 사용하여 PCollection를 호환되지 않는 기간과 병합하려고 시도하면 파이프라인이 생성될 때 IllegalStateException오류가 발생합니다.

ParDo 및 부차 입력과 조인

CoGroupByKey을 사용하는대신 ParDo를 하나 이상의 사이드 입력에 적용하여 조인을 수행합니다. 사이드 입력은 PCollection에 제공할 수 있는 기본 입력 이외의 추가 입력입니다. DoFn는 입력 PCollection에서 요소를 처리할 때마다 사이드 입력에 액세스 할 수 있습니다.

이 방법으로 조인을 수행하면 다음의 일부 경우에 CoGroupByKey를 사용하는 것보다 더 효율적일 수 있습니다.

  • 조인하는 PCollection는 크기가 맞지 않고 PCollection가 작으면 메모리에 맞을 수 있습니다.
  • 파이프라인의 여러 장소에 여러 번 조인하려는 대형 테이블이 있는 경우. CoGroupByKey을 여러 번 입력하는 대신 하나의 사이드 입력을 생성하여 여러 ParDo로 전달할 수 있습니다. 예를 들어, 두 테이블을 조인하여 세 번째 테이블을 생성하고자 합니다. 그런 다음 첫 번째 테이블을 세 번째 테이블에 조인하려 합니다.

이 방법으로 조인을 수행하려면 PCollection 중 하나에 ParDo를 적용하고 나머지 PCollection를 사이드 입력으로 전달합니다. 다음 코드 예시는 이러한 조인을 수행하는 방법을 보여줍니다.

자바

    // Each BigQuery table of key-value pairs is read into separate PCollections.
    PCollection<KV<K1, V1>> mainInput = ...
    PCollection<KV<K2, V2>> sideInput = ...

    // Create a view from your side input data
    final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

    // Access side input and perform join logic
    PCollection<T> joinedData =
    mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
      @Override
      public void processElement(ProcessContext c) {
        K2 sideInputKey = ... transform c.element().getKey() into K2 ...
        V2 sideInputValue = c.sideInput(view).get(sideInputKey);
        V1 mainInputValue = c.element().getValue();
        ... Do Something ...
        c.output(...some T...);
      }
    }));