GroupByKey y Join

La transformación de núcleo GroupByKey es una operación de reducción paralela que se usa para procesar colecciones de pares clave-valor. Usarás GroupByKey con una entrada PCollection de pares clave-valor que representa un multimapa, donde la colección contiene varios pares que tienen la misma clave, pero valores diferentes. La transformación GroupByKey te permite recopilar todos los valores del multimapa que comparten la misma clave.

GroupByKey es similar a la fase Shuffle de un algoritmo de estilo Map/Shuffle/Reduce. Usarás GroupByKey para recopilar todos los valores asociados con una clave única.

GroupByKey es una buena manera de agregar datos que tienen algo en común. Por ejemplo, es posible que desees agrupar pedidos de clientes del mismo código postal (donde la “clave” sería el código postal de cada pedido individual y el “valor” sería el resto del pedido). O podrías desear agrupar todas las consultas de los usuarios por ID de usuario o por la hora del día en que se produjeron.

También puedes unir varias colecciones de pares clave-valor, cuando esas colecciones comparten una clave común (incluso si los tipos de valor son diferentes). Hay dos formas de realizar una unión. Una de ellas es con la transformación CoGroupByKey, que te permite agrupar todos los valores (de cualquier tipo) en varias colecciones que comparten una clave común. La otra forma es unir varias colecciones de pares clave-valor con ParDo con una o más entradas adicionales. En algunos casos, esta segunda forma puede ser más eficaz que usar CoGroupByKey.

Las uniones son útiles cuando tienes varios conjuntos de datos, tal vez de varias fuentes, que proporcionan información sobre elementos relacionados. Por ejemplo, supongamos que tienes dos archivos diferentes con datos de subasta: un archivo tiene el ID de subasta, los datos de oferta y los datos de precios; el otro archivo tiene el ID de la subasta y una descripción del artículo. Puedes unir esos dos conjuntos de datos con el ID de la subasta como una clave común y con los otros datos adjuntos como los valores asociados. Después de la unión, tienes un conjunto de datos que contiene toda la información (oferta, precio y artículo) asociada con cada ID de subasta.

GroupByKey

Examinemos la mecánica de GroupByKey con un caso de ejemplo simple, en el que nuestro conjunto de datos consta de palabras de un archivo de texto y la línea en las que aparecen. Queremos agrupar todos los números de línea (valores) que comparten la misma palabra (clave), lo que nos permite ver todas las instancias en el texto donde aparece una palabra en particular.

Nuestra entrada es una PCollection de pares clave-valor en el que cada palabra es una clave, y el valor es un número de línea en el archivo donde aparece la palabra. A continuación, hay una lista de los pares clave-valor en nuestra colección de entrada:

  cat, 1
  dog, 5
  and, 1
  jump, 3
  tree, 2
  cat, 5
  dog, 2
  and, 2
  cat, 9
  and, 6
  ...

La transformación GroupByKey recopila todos los valores con la misma clave y crea un par nuevo que consta de la clave única y una colección de todos los valores asociados con esa clave en la entrada PCollection. Si tuviéramos que aplicar GroupByKey en la colección de pares clave-valor anterior, la colección de salida se vería así:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

Por lo tanto, GroupByKey representa una transformación de un multimapa, que es un mapa de varias claves a valores individuales, a un unimapa, que es un mapa de claves únicas para colecciones de valores.

Java

Nota sobre la representación de pares clave-valor:
En el SDK de Dataflow para Java, representas un par clave-valor con un objeto de tipo KV<K, V>. KV es una clase especializada con claves de tipo K y valores de tipo V.

Un patrón de procesamiento común para las colecciones agrupadas por claves (el resultado de una transformación GroupByKey) es combinar los valores asociados con cada clave en un único valor combinado asociado con esa clave. Los SDK de Dataflow encapsulan este patrón completo (agrupación de claves y, luego, combina los valores para cada clave) como la transformación Combine. Consulta Cómo combinar grupos y valores para obtener más información.

Aplica una transformación GroupByKey

Debes aplicar un GroupByKey a una entrada PCollection de pares clave-valor. GroupByKey muestra una PCollection nueva de pares clave-valor. En la colección nueva, las claves son únicas y cada elemento de valor asociado es en realidad un flujo de valor que contiene uno o más valores asociados con la clave.

Java

El siguiente código de ejemplo muestra cómo aplicar GroupByKey a una PCollection de objetos KV<K, V>, donde cada par de elementos representa una clave de tipo K y un valor único de tipo V.

El valor que se muestra para GroupByKey es una PCollection nueva de tipo KV<K, Iterable<V>>, donde cada par de elementos representa una clave y una colección de valores como Iterable de Java.

  // A PCollection of key/value pairs: words and line numbers.
  PCollection<KV<String, Integer>> wordsAndLines = ...;

  // Apply a GroupByKey transform to the PCollection "wordsAndLines".
  PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
    GroupByKey.<String, Integer>create());
Java 8 puede inferir los tipos de parámetros de GroupByKey.create, pero es posible que deban especificarse de forma explícita en versiones anteriores de Java.

GroupByKey con sistema de ventanas

Java

GroupByKey se comporta de manera diferente cuando la entrada PCollection se divide en varias ventanas, en lugar de una sola ventana general.

La transformación GroupByKey también considera la ventana a la que pertenece cada elemento cuando se realiza la reducción. Las ventanas (según lo determinado por la marca de tiempo de cada par clave-valor) actúan, básicamente, como una clave secundaria. Entonces, GroupByKey con un sistema de ventanas agrupa tanto por clave como por ventana. Cuando todos los elementos son parte de una sola ventana general, GroupByKey se degenera a la semántica simple descrita anteriormente.

Si bien las ventanas de un elemento actúan como una clave secundaria para agrupar, es posible que esto sea más potente. Los elementos pueden pertenecer a más de una ventana y las ventanas superpuestas se pueden combinar. Esto te permite crear agrupaciones más complejas.

Apliquemos un sistema de ventanas a nuestro ejemplo anterior:

  cat, 1 (window 0)
  dog, 5 (window 0)
  and, 1 (window 0)

  jump, 3 (window 1)
  tree, 2 (window 1)
  cat, 5  (window 1)

  dog, 2 (window 2)
  and, 2 (window 2)
  cat, 9 (window 2)
  and, 6 (window 2)
  ...

GroupByKey recopila todos los elementos con la misma clave y ventana, lo que genera una colección de salida como la siguiente:

  cat, [1] (window 0)
  dog, [5] (window 0)
  and, [1] (window 0)

  jump, [3] (window 1)
  tree, [2] (window 1)
  cat, [5]  (window 1)

  dog, [2]   (window 2)
  and, [2,6] (window 2)
  cat, [9]   (window 2)

Ten en cuenta que la ventana ahora afecta a los grupos de salida; los pares clave-valor en diferentes ventanas no se agrupan juntos.

Uniones con CoGroupByKey

La transformación CoGroupByKey realiza una unión relacional de dos o más conjuntos de datos. CoGroupByKey agrupa los valores de varias PCollection de pares clave-valor, donde cada PCollection de la entrada tiene el mismo tipo de clave.

Java

Por seguridad de tipo, Dataflow requiere que pases cada PCollection como parte de una KeyedPCollectionTuple. Deberás declarar una TupleTag para cada PCollection de entrada que desees pasar a CoGroupByKey.

CoGroupByKey agrupa el resultado unido en un objeto CoGbkResult.

Usemos un ejemplo simple para demostrar la mecánica de una CoGroupByKey. Tenemos dos colecciones de entradas para agrupar en una KeyedPCollectionTuple. La primera es una PCollection<K, V1>, por lo que asignaremos una TupleTag<V1> llamada tag1. Los pares clave-valor en esta PCollection son los siguientes:

  key1 ↦ v1
  key2 ↦ v2
  key2 ↦ v3

La segunda es una PCollection<K, V2>, por lo que asignaremos una TupleTag<V2> llamada tag2. Esta colección contiene los siguientes pares clave-valor:

  key1 ↦ x1
  key1 ↦ x2
  key3 ↦ x3

La colección CoGbkResult resultante contiene todos los datos asociados con cada clave única de cualquiera de las colecciones de entrada. El tipo de datos que se muestra es PCollection<KV<K, CoGbkResult>>, con el siguiente contenido:

  key1 -> {
    tag1 ↦ [v1]
    tag2 ↦ [x1, x2]
  }
  key2 -> {
    tag1 ↦ [v2, v3]
    tag2 ↦ []
  }
  key3 -> {
    tag1 ↦ []
    tag2 ↦ [x3]
  }

Después de aplicar CoGroupByKey, puedes buscar los datos de cada colección con el TupleTag adecuado.

Aplica CoGroupByKey

Java

CoGroupByKey acepta una tupla de PCollection con clave (PCollection<KV<K, V>>) como entrada. Como salida, CoGroupByKey muestra un tipo especial llamado CoGbkResults, que agrupa los valores de todas las entradas PCollection según sus claves comunes. Deberás indexar CoGbkResults con el mecanismo TupleTag para varias colecciones. Puedes acceder a una colección específica en el objeto CoGbkResults si usas la TupleTag que proporcionaste con la colección inicial.

Este es un ejemplo que une dos conjuntos de datos diferentes (tal vez de diferentes fuentes) que se leyeron en PCollection separados:

  // Each data set is represented by key-value pairs in separate PCollections.
  // Both data sets share a common key type ("K").
  PCollection<KV<K, V1>> pc1 = ...;
  PCollection<KV<K, V2>> pc2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> tag1 = new TupleTag<V1>();
  final TupleTag<V2> tag2 = new TupleTag<V2>();

  // Merge collection values into a CoGbkResult collection.
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
                         .and(tag2, pc2)
                         .apply(CoGroupByKey.<K>create());

En la PCollection<KV<K, CoGbkResult>> resultante, cada clave (todas de tipo K) tendrá un CoGbkResult diferente. En otras palabras, CoGbkResult es un mapa de TupleTag<T> a Iterable<T>.

Aquí hay otro ejemplo del uso de CoGroupByKey, seguido de ParDo que consume el CoGbkResult resultante. ParDo podría, por ejemplo, formatear los datos para su posterior procesamiento:

  // Each BigQuery table of key-value pairs is read into separate PCollections.
  // Each shares a common key ("K").
  PCollection<KV<K, V1>> pt1 = ...;
  PCollection<KV<K, V2>> pt2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> t1 = new TupleTag<V1>();
  final TupleTag<V2> t2 = new TupleTag<V2>();

  //Merge collection values into a CoGbkResult collection
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(t1, pt1)
                         .and(t2, pt2)
                         .apply(CoGroupByKey.<K>create());

  // Access results and do something.
  PCollection<T> finalResultCollection =
    coGbkResultCollection.apply(ParDo.of(
      new DoFn<KV<K, CoGbkResult>, T>() {
        @Override
        public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          // Get all collection 1 values
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          // Now get collection 2 values
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
          c.output(...some T...);
        }
      }));

Usa CoGroupByKey con PCollections no delimitadas

Java

Cuando se utiliza CoGroupByKey para agrupar PCollection que tienen una estrategia de sistema de ventanas aplicada, todas las PCollection que desees agrupar deben usar la misma estrategia de sistema de ventanas y el mismo tamaño de ventana. Por ejemplo, todas las colecciones que estás combinando deben usar (hipotéticamente) ventanas fijas idénticas de 5 minutos o ventanas deslizantes de 4 minutos que comiencen cada 30 segundos.

Si tu canalización intenta usar CoGroupByKey para combinar PCollection con ventanas incompatibles, Dataflow generará un error IllegalStateException cuando se construya la canalización.

Uniones con ParDo y entradas adicionales

En lugar de usar CoGroupByKey, puedes aplicar ParDo con una o más entradas adicionales para realizar una unión. Una entrada adicional es una entrada complementaria, que no es la entrada principal, que puedes proporcionar a tu PCollection. Tu DoFn puede acceder a la entrada adicional cada vez que procesa un elemento en la entrada PCollection.

Realizar una unión de esta manera puede ser más eficaz que usar CoGroupByKey en algunos casos, como los siguientes:

  • Las PCollection que unes son desproporcionadas en tamaño y las PCollection más pequeñas podrían caber en la memoria.
  • Tienes una tabla grande a la que deseas unirte varias veces en diferentes lugares de la canalización. En lugar de hacer un CoGroupByKey varias veces, puedes crear una entrada adicional y pasarla a varios ParDo. Por ejemplo, te gustaría unir dos tablas para generar una tercera. Luego, te gustaría unir la primera tabla con la tercera y así sucesivamente.

Para realizar una unión de esta manera, aplica un ParDo a una de las PCollection y pasa las otras PCollection como entradas adicionales. En la siguiente muestra de código, se muestra cómo realizar esa unión.

Java

// Each BigQuery table of key-value pairs is read into separate PCollections.
PCollection<KV<K1, V1>> mainInput = ...
PCollection<KV<K2, V2>> sideInput = ...

// Create a view from your side input data
final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

// Access side input and perform join logic
PCollection<T> joinedData =
mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
  @Override
  public void processElement(ProcessContext c) {
    K2 sideInputKey = ... transform c.element().getKey() into K2 ...
    V2 sideInputValue = c.sideInput(view).get(sideInputKey);
    V1 mainInputValue = c.element().getValue();
    ... Do Something ...
    c.output(...some T...);
  }
}));