GroupByKey y Join

La transformación del núcleo de GroupByKey es una operación de reducción usada para procesar colecciones de pares clave-valor. Usas GroupByKey con una entrada PCollection de pares clave-valor que representa un multimapa, en el que la colección contiene pares múltiples que tienen la misma clave, pero valores diferentes. La transformación GroupByKey te permite reunir todos los valores en el multimapa que comparten la misma clave.

GroupByKey es análogo a la fase aleatoria de un algoritmo de estilo mapa/aleatorio/reducir. Usas GroupByKey para recopilar todos los valores asociados con una clave única.

GroupByKey es una buena manera de agregar datos que tienen algo en común. Por ejemplo, es posible que desees agrupar pedidos de clientes del mismo código postal (donde la “clave” sería el código postal de cada pedido individual y el “valor” sería el resto del pedido). O podrías desear agrupar todas las consultas de los usuarios por ID de usuario o por la hora del día en que se produjeron.

También puedes unir múltiples colecciones de pares clave-valor, cuando esas colecciones comparten una clave común (incluso si los tipos de valor son diferentes). Hay dos formas de realizar una unión. Una de ellas es con la transformación de CoGroupByKey, que te permite agrupar todos los valores (de cualquier tipo) en varias colecciones que comparten una clave común. La otra forma es unir múltiples colecciones de pares clave-valor con un ParDo con una o más entradas laterales. En algunos casos, esta segunda forma puede ser más eficiente con un CoGroupByKey.

Las uniones son útiles cuando tienes varios conjuntos de datos, tal vez de múltiples fuentes, que proporcionan información sobre elementos relacionados. Por ejemplo, supongamos que tienes dos archivos diferentes con datos de subasta: un archivo tiene el ID de subasta, los datos de oferta y los datos de precios; el otro archivo tiene el ID de la subasta y una descripción del artículo. Puedes unir esos dos conjuntos de datos con el ID de la subasta como una clave común y con los otros datos adjuntos como los valores asociados. Después de la unión, tienes un conjunto de datos que contiene toda la información (oferta, precio y artículo) asociada con cada ID de subasta.

GroupByKey

Examinemos la mecánica de GroupByKey con un caso de ejemplo simple, en el que nuestro conjunto de datos consta de palabras de un archivo de texto y la línea en la que aparecen. Queremos agrupar todos los números de línea (valores) que comparten la misma palabra (clave), lo que nos permite ver todos los lugares en el texto donde aparece una palabra en particular.

Nuestra entrada es una PCollection de pares clave-valor donde cada palabra es una clave y el valor es un número de línea en el archivo donde aparece la palabra. Aquí hay una lista de los pares clave-valor en nuestra colección de entrada:

  cat, 1
  dog, 5
  and, 1
  jump, 3
  tree, 2
  cat, 5
  dog, 2
  and, 2
  cat, 9
  and, 6
  ...

La transformación GroupByKey reúne todos los valores con la misma clave y crea un nuevo par que consiste en la clave única y una colección de todos los valores que se asociaron con esa clave en la entrada PCollection. Si tuviéramos que aplicar GroupByKey en la colección de pares clave-valor anterior, la colección de salida se vería así:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

Por lo tanto, GroupByKey representa una transformación de un multimapa, que es un mapa de claves múltiples a valores individuales, a un unimapa, que es un mapa de claves únicas a colecciones de valores.

Java

Nota sobre la representación de pares clave-valor:
En Dataflow Java SDK, representas un par clave-valor con un objeto de tipo KV<K, V>. KV es una clase especializada con claves de tipo K y valores de tipo V.

Un patrón de procesamiento común para las colecciones agrupadas por claves (el resultado de una transformación GroupByKey) es combinar los valores asociados con cada clave en un solo valor combinado asociado con esa clave. Los SDK de Dataflow encapsulan todo este patrón (agrupando claves y, luego, combinando los valores para cada clave) como transformación Combine. Consulta Combinar colecciones y valores para obtener más información.

Aplica una transformación GroupByKey

Debes aplicar un GroupByKey a una entrada PCollection de pares clave-valor. GroupByKey muestra una nueva PCollection de pares clave-valor. En ella, las claves son únicas y cada elemento de valor asociado es, en realidad, un flujo de valores que contiene uno o más valores asociados con la clave.

Java

En el siguiente código de ejemplo, se muestra cómo aplicar GroupByKey a una PCollection de KV<K, V> objetos, en la que cada par de elementos representa una clave de tipo K y un valor único de tipo V.

El valor de retorno de GroupByKey es una PCollection nueva de tipo KV<K, Iterable<V>>, en la que cada par de elementos representa una clave y una colección de valores como una Java Iterable.

  // A PCollection of key/value pairs: words and line numbers.
  PCollection<KV<String, Integer>> wordsAndLines = ...;

  // Apply a GroupByKey transform to the PCollection "wordsAndLines".
  PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
    GroupByKey.<String, Integer>create());
Java 8 puede inferir los tipos de parámetros de GroupByKey.create, pero es posible que deban especificarse de forma explícita en versiones anteriores de Java.

GroupByKey con sistema de ventanas

Java

GroupByKey se comporta de manera un poco diferente cuando la entrada PCollection se divide en varias ventanas, en lugar de una sola ventana global.

La transformación GroupByKey también considera la ventana a la que pertenece cada elemento cuando realizas la reducción. Las ventanas (según lo determinado por la marca de tiempo de cada par clave-valor) actúa, en esencia, como una clave secundaria. Por lo tanto, GroupByKey con sistema de ventanas se agrupa por clave y ventana. Cuando todos los elementos forman parte de una única ventana global, GroupByKey se degenera a la semántica simple descrita antes.

Si bien las ventanas de un elemento actúan como una clave secundaria para agrupar, puede ser, en potencia, más potente. Los elementos pueden pertenecer a más de una ventana y las ventanas superpuestas se pueden combinar. Esto te permite crear agrupaciones más complejas.

Apliquemos un sistema de ventanas a nuestro ejemplo anterior:

  cat, 1 (window 0)
  dog, 5 (window 0)
  and, 1 (window 0)

  jump, 3 (window 1)
  tree, 2 (window 1)
  cat, 5  (window 1)

  dog, 2 (window 2)
  and, 2 (window 2)
  cat, 9 (window 2)
  and, 6 (window 2)
  ...

GroupByKey reúne todos los elementos con la misma clave y ventana, lo que genera una colección de salida como esta:

  cat, [1] (window 0)
  dog, [5] (window 0)
  and, [1] (window 0)

  jump, [3] (window 1)
  tree, [2] (window 1)
  cat, [5]  (window 1)

  dog, [2]   (window 2)
  and, [2,6] (window 2)
  cat, [9]   (window 2)

Ten en cuenta que la ventana ahora afecta a los grupos de salida; los pares clave-valor en diferentes ventanas no se agrupan juntos.

Se une a CoGroupByKey

La transformación CoGroupByKey realiza una unión relacional de dos o más conjuntos de datos. CoGroupByKey agrupa los valores desde múltiples PCollections de pares clave-valor, en las que cada PCollection de la entrada tiene el mismo tipo de clave.

Java

Para seguridad de tipos, Dataflow requiere que pases cada PCollection como parte de KeyedPCollectionTuple. Deberás declarar una TupleTag para cada entrada PCollection que quieras pasar a CoGroupByKey.

CoGroupByKey agrupa la salida unida en un objeto CoGbkResult.

Usemos un ejemplo simple para demostrar la mecánica de un CoGroupByKey. Tenemos dos colecciones de entrada para agrupar en un KeyedPCollectionTuple. La primera es una PCollection<K, V1>, por lo que asignaremos una TupleTag<V1> llamada tag1. Los pares clave-valor en esta PCollection son:

  key1 &map; v1
  key2 &map; v2
  key2 &map; v3

La segunda es una PCollection<K, V2>, por lo que asignaremos TupleTag<V2> llamada tag2. Esta colección contiene lo siguiente:

  key1 &map; x1
  key1 &map; x2
  key3 &map; x3

La colección CoGbkResult contiene todos los datos asociados con cada clave única desde cualquiera de las colecciones de entrada. El tipo de datos que se muestra es PCollection<KV<K, CoGbkResult>>, con los siguientes contenidos:

  key1 -> {
    tag1 &map; [v1]
    tag2 &map; [x1, x2]
  }
  key2 -> {
    tag1 &map; [v2, v3]
    tag2 &map; []
  }
  key3 -> {
    tag1 &map; []
    tag2 &map; [x3]
  }

Después de aplicar CoGroupByKey, puedes buscar datos de cada colección con la TupleTag adecuada.

Aplica CoGroupByKey

Java

CoGroupByKey acepta una tupla de PCollections (PCollection<KV<K, V>>) con clave como entrada. Como salida, CoGroupByKey muestra un tipo especial llamado CoGbkResults, que agrupa valores de todas las PCollections de entrada por sus claves comunes. Indexas los CoGbkResults mediante el mecanismo TupleTag para múltiples colecciones. Puedes acceder a una colección específica en el objeto CoGbkResults mediante la TupleTag que proporcionaste con la colección inicial.

Aquí hay un ejemplo que une dos conjuntos de datos diferentes (quizás de diferentes fuentes) que se leyeron en PCollections separadas:

  // Each data set is represented by key-value pairs in separate PCollections.
  // Both data sets share a common key type ("K").
  PCollection<KV<K, V1>> pc1 = ...;
  PCollection<KV<K, V2>> pc2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> tag1 = new TupleTag<V1>();
  final TupleTag<V2> tag2 = new TupleTag<V2>();

  // Merge collection values into a CoGbkResult collection.
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
                         .and(tag2, pc2)
                         .apply(CoGroupByKey.<K>create());

En la PCollection<KV<K, CoGbkResult>> resultante (todas de tipo K) tendrán un CoGbkResult diferente. En otras palabras, CoGbkResult es un mapa de TupleTag<T> a Iterable<T>.

Aquí hay otro ejemplo de uso de CoGroupByKey, seguido de un ParDo que consume el CoGbkResult resultante; el ParDo podría, por ejemplo, formatear los datos para su procesamiento posterior:

  // Each BigQuery table of key-value pairs is read into separate PCollections.
  // Each shares a common key ("K").
  PCollection<KV<K, V1>> pt1 = ...;
  PCollection<KV<K, V2>> pt2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> t1 = new TupleTag<V1>();
  final TupleTag<V2> t2 = new TupleTag<V2>();

  //Merge collection values into a CoGbkResult collection
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(t1, pt1)
                         .and(t2, pt2)
                         .apply(CoGroupByKey.<K>create());

  // Access results and do something.
  PCollection<T> finalResultCollection =
    coGbkResultCollection.apply(ParDo.of(
      new DoFn<KV<K, CoGbkResult>, T>() {
        @Override
        public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          // Get all collection 1 values
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          // Now get collection 2 values
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
          c.output(...some T...);
        }
      }));

Usa CoGroupByKey con PCollections no delimitadas

Java

Cuando uses CoGroupByKey para agrupar PCollections que tengan estrategia de sistema de ventanas aplicadas, todas las PCollections que quieras agrupar deben usar la misma estrategia de sistema de ventanas y tamaño de ventanas. Por ejemplo, todas las colecciones que estás fusionando deben usar (hipotéticamente) ventanas fijas idénticas de 5 minutos o ventanas deslizantes de 4 minutos que comiencen cada 30 segundos.

Si tu canalización intenta usar CoGroupByKey para fusionar PCollections con ventanas incompatibles, Dataflow generará un error IllegalStateException cuando se construya tu canalización.

Uniones con ParDo y entradas laterales

En lugar de usar CoGroupByKey, puedes aplicar un ParDo con una o más entradas laterales para realizar una unión. Una entrada lateral es una entrada adicional, que no es la entrada principal, que puedes proporcionar a tu PCollection. Tu DoFn puede acceder a la entrada lateral cada vez que procesa un elemento en la entrada PCollection.

Realizar una unión de esta manera puede ser más eficiente que usar CoGroupByKey en algunos casos, como los siguientes:

  • Las PCollections que estás uniendo son desproporcionadas en tamaño y las PCollections podrían caber en la memoria.
  • Tienes una tabla grande a la que deseas unirte varias veces en diferentes lugares de la canalización. En lugar de hacer un CoGroupByKey varias veces, puedes crear una entrada lateral y pasarla a varios ParDos. Por ejemplo, te gustaría unir dos tablas para generar una tercera. Luego, te gustaría unir la primera tabla con la tercera y así sucesivamente.

Para realizar una unión de esta manera, aplica un ParDo a una de las PCollections y pasa las otras PCollections como entradas laterales. En el siguiente código de ejemplo, se muestra cómo realizar esa unión.

Java

// Each BigQuery table of key-value pairs is read into separate PCollections.
PCollection<KV<K1, V1>> mainInput = ...
PCollection<KV<K2, V2>> sideInput = ...

// Create a view from your side input data
final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

// Access side input and perform join logic
PCollection<T> joinedData =
mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
  @Override
  public void processElement(ProcessContext c) {
    K2 sideInputKey = ... transform c.element().getKey() into K2 ...
    V2 sideInputValue = c.sideInput(view).get(sideInputKey);
    V1 mainInputValue = c.element().getValue();
    ... Do Something ...
    c.output(...some T...);
  }
}));
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.