GroupByKey y Join

La transformación de núcleo GroupByKey es una operación de reducción paralela que se utiliza para procesar colecciones de pares clave-valor. Usa GroupByKey con una entrada PCollection de pares clave-valor que representa un multimapa, donde la colección contiene varios pares que tienen la misma clave, pero valores diferentes. La transformación GroupByKey te permite reunir todos los valores del multimapa que comparten la misma clave.

GroupByKey es análogo a la fase aleatoria de un algoritmo Map/Shuffle/Reduce-style. Usa GroupByKey para recopilar todos los valores asociados con una clave única.

GroupByKey es una buena forma de agregar datos que tienen algo en común. Por ejemplo, es posible que desees agrupar pedidos de clientes del mismo código postal (donde la “clave” sería el código postal de cada pedido individual y el “valor” sería el resto del pedido). O podrías desear agrupar todas las consultas de los usuarios por ID de usuario o por la hora del día en que se produjeron.

También puedes unir múltiples colecciones de pares clave-valor, cuando esas colecciones comparten una clave común (incluso si los tipos de valor son diferentes). Hay dos formas de realizar una unión. Una de ellas es con la transformación de CoGroupByKey, que te permite agrupar todos los valores (de cualquier tipo) en varias colecciones que comparten una clave común. La otra forma es unir múltiples colecciones de pares clave-valor con un ParDo con una o más entradas laterales. En algunos casos, esta segunda forma puede ser más eficaz que usar un CoGroupByKey.

Las uniones son útiles cuando tienes varios conjuntos de datos, tal vez de múltiples fuentes, que proporcionan información sobre elementos relacionados. Por ejemplo, supongamos que tienes dos archivos diferentes con datos de subasta: un archivo tiene el ID de subasta, los datos de oferta y los datos de precios; el otro archivo tiene el ID de la subasta y una descripción del artículo. Puedes unir esos dos conjuntos de datos con el ID de la subasta como una clave común y con los otros datos adjuntos como los valores asociados. Después de la unión, tienes un conjunto de datos que contiene toda la información (oferta, precio y artículo) asociada con cada ID de subasta.

GroupByKey

Examinemos la mecánica de GroupByKey con un caso de ejemplo simple, en el que nuestro conjunto de datos consiste en palabras de un archivo de texto y la línea en la que aparecen. Queremos agrupar todos los números de línea (valores) que comparten la misma palabra (clave), lo que nos permite ver todos los lugares en el texto donde aparece una palabra en particular.

Nuestra entrada es un PCollection de pares clave-valor donde cada palabra es una clave, y el valor es un número de línea en el archivo donde aparece la palabra. Aquí hay una lista de los pares clave-valor en nuestra colección de entrada:

      cat, 1
      dog, 5
      and, 1
      jump, 3
      tree, 2
      cat, 5
      dog, 2
      and, 2
      cat, 9
      and, 6
      ...
    

La transformación GroupByKey reúne todos los valores con la misma clave y crea un nuevo par que consiste en la clave única y una colección de todos los valores asociados con esa clave en la entrada PCollection. Si aplicamos GroupByKey en el conjunto de pares clave-valor anteriores, la colección de resultados se vería así:

      cat, [1,5,9]
      dog, [5,2]
      and, [1,2,6]
      jump, [3]
      tree, [2]
      ...
    

Por lo tanto, GroupByKey representa una transformación de un multi-map, que es un mapa de varias claves a valores individuales, a un uni-map, que es un mapa de claves únicas para colecciones de valores.

Java

Nota sobre la representación de pares clave / valor:
En el SDK de Java de Dataflow, representa un par clave-valor con un objeto de tipo KV<K, V>. KV es una clase especializada con claves de tipo K y valores de tipo V.

Un patrón de procesamiento común para las colecciones agrupadas por claves (el resultado de una transformación GroupByKey) es combinar los valores asociados con cada clave en un único valor combinado asociado con esa clave. Los SDK de Dataflow encapsulan todo el patrón (agrupación de claves y luego combinando los valores de cada clave) como la transformación Combine. Consulta Combinar colecciones y valores para obtener más información.

Aplica una transformación GroupByKey

Debe aplicar un GroupByKey a una entrada PCollection de pares clave-valor. GroupByKey muestra un nuevo PCollection de pares clave-valor; en la colección nueva, las claves son únicas y cada elemento de valor asociado es una transmisión de valor que contiene uno o más valores asociados con la clave.

Java

El siguiente código de ejemplo muestra cómo aplicar GroupByKey a un PCollection de KV<K, V> objetos, donde cada par de elementos representa una clave de tipo K y un único valor de tipo V.

El valor de retorno de GroupByKey es un nuevo PCollection de tipo KV<K, Iterable<V>>, donde cada par de elementos representa una clave y un conjunto de valores como Java Iterable.

      // A PCollection of key/value pairs: words and line numbers.
      PCollection<KV<String, Integer>> wordsAndLines = ...;

      // Apply a GroupByKey transform to the PCollection "wordsAndLines".
      PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
        GroupByKey.<String, Integer>create());
    
Java 8 puede inferir los tipos de parámetros de GroupByKey.createGroupByKey.create
, pero es posible que deban especificarse de forma explícita en versiones anteriores de Java.

GroupByKey con sistema de ventanas

Java

GroupByKey se comporta de manera diferente cuando la entrada PCollection se divide en varias ventanas, en lugar de una sola ventana global.

La transformación GroupByKey también considera la ventana a la que pertenece cada elemento cuando realiza la reducción. Las ventanas (según lo determinado por la marca de tiempo de cada par clave-valor) actúa, en esencia, como una clave secundaria. GroupByKey con ventanas agrupa los grupos tanto por clave como por ventana. Cuando todos los elementos forman parte de una sola ventana global, GroupByKey degenera a la semántica simple descrita anteriormente.

Si bien las ventanas de un elemento actúan como una clave secundaria para agrupar, puede ser, en potencia, más potente. Los elementos pueden pertenecer a más de una ventana y las ventanas superpuestas se pueden combinar. Esto te permite crear agrupaciones más complejas.

Apliquemos un sistema de ventanas a nuestro ejemplo anterior:

      cat, 1 (window 0)
      dog, 5 (window 0)
      and, 1 (window 0)

      jump, 3 (window 1)
      tree, 2 (window 1)
      cat, 5  (window 1)

      dog, 2 (window 2)
      and, 2 (window 2)
      cat, 9 (window 2)
      and, 6 (window 2)
      ...
    

GroupByKey reúne todos los elementos con la misma clave y window, generando una colección de resultados como esta:

      cat, [1] (window 0)
      dog, [5] (window 0)
      and, [1] (window 0)

      jump, [3] (window 1)
      tree, [2] (window 1)
      cat, [5]  (window 1)

      dog, [2]   (window 2)
      and, [2,6] (window 2)
      cat, [9]   (window 2)
    

Ten en cuenta que la ventana ahora afecta a los grupos de salida; los pares clave-valor en diferentes ventanas no se agrupan juntos.

Se une a CoGroupByKey

La transformación CoGroupByKey realiza una combinación relacional de dos o más conjuntos de datos. CoGroupByKey agrupa los valores de varios PCollection s de pares clave-valor, donde cada PCollection de la entrada tiene el mismo tipo de clave.

Java

Por seguridad de tipo, Dataflow requiere que transfiera cada PCollection como parte de un KeyedPCollectionTuple. Deberá declarar una TupleTag para cada entrada PCollection a la que desee pasarle a CoGroupByKey.

CoGroupByKey agrupa el resultado unido en un objeto CoGbkResult.

Usemos un ejemplo simple para demostrar la mecánica de un CoGroupByKey. Tenemos dos conjuntos de entradas para agrupar en un KeyedPCollectionTuple. El primero es un PCollection<K, V1>, así que asignaremos un TupleTag<V1> llamado tag1. Los pares clave-valor en este PCollection son:

      key1 ↦ v1
      key2 ↦ v2
      key2 ↦ v3
    

El segundo es un PCollection<K, V2>, así que asignaremos un TupleTag<V2> llamado tag2. Esta colección contiene lo siguiente:

      key1 ↦ x1
      key1 ↦ x2
      key3 ↦ x3
    

La colección CoGbkResult resultante contiene todos los datos asociados con cada clave única de cualquiera de las colecciones de entrada. El tipo de datos devuelto es PCollection<KV<K, CoGbkResult>>, con el siguiente contenido:

      key1 -> {
        tag1 ↦ [v1]
        tag2 ↦ [x1, x2]
      }
      key2 -> {
        tag1 ↦ [v2, v3]
        tag2 ↦ []
      }
      key3 -> {
        tag1 ↦ []
        tag2 ↦ [x3]
      }
    

Después de aplicar CoGroupByKey, puedes buscar los datos de cada colección con el TupleTag adecuado.

Aplica CoGroupByKey

Java

CoGroupByKey acepta una tupla de PCollection s (PCollection<KV<K, V>>) con clave como entrada. Como salida, CoGroupByKey muestra un tipo especial llamado CoGbkResults, que agrupa los valores de todas las entradas PCollection s por sus claves comunes. Puedes indexar el CoGbkResults mediante el mecanismo TupleTag para varias colecciones. Puedes acceder a una colección específica en el objeto CoGbkResults mediante el TupleTag que proporcionaste con la colección inicial.

Este es un ejemplo que une dos conjuntos de datos diferentes (tal vez de diferentes fuentes) que se leyeron en PCollection s por separado:

      // Each data set is represented by key-value pairs in separate PCollections.
      // Both data sets share a common key type ("K").
      PCollection<KV<K, V1>> pc1 = ...;
      PCollection<KV<K, V2>> pc2 = ...;

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> tag1 = new TupleTag<V1>();
      final TupleTag<V2> tag2 = new TupleTag<V2>();

      // Merge collection values into a CoGbkResult collection.
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(tag1, pc1)
                             .and(tag2, pc2)
                             .apply(CoGroupByKey.<K>create());
    

En la PCollection<KV<K, CoGbkResult>> resultante, cada clave (todas de tipo K) tendrá una CoGbkResult diferente. En otras palabras, CoGbkResult es un mapa del TupleTag<T> al Iterable<T>.

Este es otro ejemplo de uso de CoGroupByKey, seguido de un ParDo que consume el CoGbkResult resultante; el ParDo podría, por ejemplo, dar formato a los datos para su posterior procesamiento:

      // Each BigQuery table of key-value pairs is read into separate PCollections.
      // Each shares a common key ("K").
      PCollection<KV<K, V1>> pt1 = ...;
      PCollection<KV<K, V2>> pt2 = ...;

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> t1 = new TupleTag<V1>();
      final TupleTag<V2> t2 = new TupleTag<V2>();

      //Merge collection values into a CoGbkResult collection
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(t1, pt1)
                             .and(t2, pt2)
                             .apply(CoGroupByKey.<K>create());

      // Access results and do something.
      PCollection<T> finalResultCollection =
        coGbkResultCollection.apply(ParDo.of(
          new DoFn<KV<K, CoGbkResult>, T>() {
            @Override
            public void processElement(ProcessContext c) {
              KV<K, CoGbkResult> e = c.element();
              // Get all collection 1 values
              Iterable<V1> pt1Vals = e.getValue().getAll(t1);
              // Now get collection 2 values
              V2 pt2Val = e.getValue().getOnly(t2);
              ... Do Something ....
              c.output(...some T...);
            }
          }));
    

Usa CoGroupByKey con PCollections no delimitadas

Java

Cuando se utiliza CoGroupByKey para agrupar PCollection s que tienen una estrategia de ventanas aplicada, todas las PCollection que desee agrupar deben usar la misma estrategia de ventanas y el tamaño de la ventana. Por ejemplo, todas las colecciones que estás fusionando deben usar (hipotéticamente) ventanas fijas idénticas de 5 minutos o ventanas deslizantes de 4 minutos que comiencen cada 30 segundos.

Si tu canalización intenta usar CoGroupByKey para combinar PCollection s con ventanas incompatibles, Dataflow generará un error IllegalStateException cuando se construya tu canalización.

Uniones con ParDo y entradas laterales

En lugar de usar CoGroupByKey, puedes aplicar un ParDo con una o más entradas laterales para realizar una combinación. Una entrada lateral es una entrada adicional, además de la entrada principal, que puedes proporcionar a tu PCollection . Tu DoFn puede acceder a la entrada lateral cada vez que procesa un elemento en la entrada PCollection.

Realizar una combinación de esta manera puede ser más eficaz que usar CoGroupByKey en algunos casos, como los siguientes:

  • Los PCollection s que se unen tienen un tamaño desproporcionado y los PCollection s más pequeños pueden caber en la memoria.
  • Tienes una tabla grande a la que deseas unirte varias veces en diferentes lugares de la canalización. En lugar de hacer un CoGroupByKey varias veces, puedes crear una entrada lateral y pasarla a varios ParDo s. Por ejemplo, te gustaría unir dos tablas para generar una tercera. Luego, te gustaría unir la primera tabla con la tercera y así sucesivamente.

Para realizar un join de esta manera, aplica un ParDo a uno de los PCollections y pasa el otro PCollection(s) como entradas secundarias. En el siguiente código de ejemplo, se muestra cómo realizar esa unión.

Java

    // Each BigQuery table of key-value pairs is read into separate PCollections.
    PCollection<KV<K1, V1>> mainInput = ...
    PCollection<KV<K2, V2>> sideInput = ...

    // Create a view from your side input data
    final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

    // Access side input and perform join logic
    PCollection<T> joinedData =
    mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
      @Override
      public void processElement(ProcessContext c) {
        K2 sideInputKey = ... transform c.element().getKey() into K2 ...
        V2 sideInputValue = c.sideInput(view).get(sideInputKey);
        V1 mainInputValue = c.element().getValue();
        ... Do Something ...
        c.output(...some T...);
      }
    }));