Combina colecciones y valores

Durante una canalización, a menudo, deseas combinar o fusionar colecciones de valores en tus datos. Por ejemplo, puedes tener una colección de datos de ventas que consiste en pedidos para un mes determinado, cada uno con un valor en dólares. En tu canalización, es posible que desees combinar todos los valores de pedidos en un único valor que represente el valor total en dólares de pedidos para ese mes, el pedido más grande o el valor promedio en dólares por pedido. Para obtener este tipo de datos, combina los valores en tu colección.

Los SDK de Dataflow contienen una serie de operaciones que puedes usar a fin de combinar los valores en los objetos PCollection de tu canalización o para combinar valores agrupados por clave.

La transformación básica Combine encapsula varios métodos genéricos que puedes usar para combinar los elementos o valores en una PCollection. Combine tiene variantes que funcionan en PCollection completas y algunas que combinan los flujos de valor individuales en PCollection de pares clave-valor. Combine también tiene subclases para operaciones de combinación numérica específicas, como sumas, mínimos, máximos y promedios.

Cuando aplicas una transformación Combine, debes proporcionar la función que contiene la lógica real para combinar los elementos o valores. Más adelante en esta sección, consulta Crea y especifica una función de combinación para obtener más información.

En este sentido, las transformaciones Combine son similares a la transformación ParDo que aplica la lógica en una función de procesamiento que proporcionas a cada elemento.

Combina una PCollection en un valor único

Puedes combinar todos los elementos de una PCollection determinada en un único valor, representado en tu canalización como una PCollection nueva que contiene un elemento. Para combinar los elementos de una PCollection, usa la transformación de combinación global.

Combine se comporta de manera diferente si la PCollection de entrada se divide mediante un sistema de ventanas. En ese caso, una combinación global muestra un único elemento por ventana.

La combinación global usa una función de combinación que proporcionas para combinar cada valor de un elemento. Más adelante en esta sección, consulta Crea y especifica una función de combinación para obtener más detalles.

Los SDK de Dataflow proporcionan algunas funciones de combinación ya compiladas para operaciones de combinación numérica comunes, como sumas, mínimos, máximos y promedios. Puedes usar estas funciones con combinaciones globales en lugar de crear tu propia función de combinación. Consulta Cómo usar las operaciones de combinación estadística incluidas para obtener más información.

Java

En el siguiente código de ejemplo, se muestra cómo apply una transformación Combine.globally a fin de producir un valor de suma único para una PCollection de tipo Integer:

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()));

En el ejemplo Sum.SumIntegerFn() es la CombineFn que la transformación usa para combinar los elementos en la PCollection de entrada. La PCollection resultante, llamada sum, contendrá un valor: la suma de todos los elementos en la PCollection de entrada.

Sistema de ventanas global

Si tu PCollection de entrada usa un sistema de ventanas global predeterminado, el comportamiento predeterminado de Dataflow es mostrar una PCollection que contenga un elemento. El valor de ese elemento proviene del acumulador en la función de combinación que especificaste cuando aplicaste Combine. Por ejemplo, la función de combinación Sum de Dataflow muestra un valor cero (la suma de una entrada vacía), mientras que la función de combinación Min muestra un valor máximo o infinito.

Para que Combine muestre una PCollection vacía si la entrada está vacía, especifica .withoutDefaults cuando apliques tu transformación Combine, como en el siguiente ejemplo de código:

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

Sistema de ventanas no global

Si tu PCollection usa una función de sistema de ventanas no global, Dataflow no proporciona el comportamiento predeterminado. Debes especificar una de las siguientes opciones cuando apliques Combine:

  • Especifica .withoutDefaults, en cuyo caso, las ventanas que están vacías en la PCollection de entrada también estarán vacías en la colección de salida.
  • Especifica .asSingletonView, en cuyo caso, la salida se convierte de inmediato en una PCollectionView, lo que proporcionará un valor predeterminado para cada ventana vacía cuando se usa como una entrada complementaria. Por lo general, solo deberás usar esta opción si el resultado de Combine de la canalización se usará como una entrada complementaria más adelante en la canalización.

Combina valores en una colección agrupada por clave

Después de crear una colección agrupada por clave (por ejemplo, mediante una transformación GroupByKey), un patrón común es combinar la colección de valores asociados con cada clave en un solo valor combinado.

Si nos basamos en el ejemplo anterior de GroupByKey, una PCollection agrupada por clave llamada groupedWords se ve así:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

En el PCollection anterior, cada elemento tiene una clave de string (por ejemplo, “gato”) y un iterable de números enteros para su valor (en el primer elemento, que contiene [1, 5, 9]), en el que cada número entero representa el número de línea en el texto original donde apareció la clave. Si nuestro próximo paso de la canalización combina los valores (en lugar de considerarlos de forma individual), puedes combinar el iterable de los números enteros para crear un único valor combinado que se vincule con cada clave.

La lógica real acerca de cómo se combinan los valores depende de ti; por ejemplo, en una colección agrupada por clave de claves y valores de números enteros, puede ser que elijas sumar la colección de números enteros relacionados con cada clave (lo cual es útil, por ejemplo, cuando los valores representan conteos de casos). En nuestro ejemplo, dado que los valores representan números en línea, es posible que deseemos tomar el mínimo, el cual indica el primer caso.

Java

  PCollection<KV<String, Integer>> occurrences = ...;
  PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());
  PCollection<KV<String, Integer>> firstOccurrences = pc.apply(
    Combine.groupedValues(new Min.MinIntegerFn()));

Usa Combine PerKey

Los SDK de Dataflow representan este patrón completo (GroupByKey y, luego, combinan la colección de valores) como la transformación Combine PerKey.

Combine PerKey realiza ambos pasos del patrón: se apply a una PCollection de pares clave-valor, como lo harías con un GroupByKey. Luego, la transformación Combine PerKey aplica una función de combinación que proporcionas al conjunto de todos los valores asociados con cada clave. La operación de combinación produce una PCollection nueva de pares clave-valor, que contiene claves únicas y un único valor combinado para cada clave.

Combine PerKey realiza la transformación GroupByKey como parte de su operación. Por lo tanto, se comporta como GroupByKey cuando la PCollection se divide mediante un sistema de ventanas. Es decir, Combine PerKey realiza combinaciones por clave y ventana.

Si bien puedes crear un ParDo para realizar este tipo de función (es decir, mediante la agrupación de claves y la combinación de valores), la semántica más estructurada de la transformación CombineFn permite que el servicio de Cloud Dataflow ejecute la operación de combinación de manera más eficiente.

La función de combinación que proporcionas a Combine PerKey debe ser una función de reducción asociativa o una subclase de CombineFn. Consulta Crea y especifica una función de combinación más adelante en esta sección para obtener información sobre cómo crear una CombineFn.

Java

Cuando usas Combine.perKey, pasas parámetros de tipo según los tipos de claves y valores en tu PCollection de entrada, y un parámetro de tipo adicional si el valor combinado en la PCollection de salida es diferente de los valores en la PCollection de entrada.

También deberás pasar a Combine.perKey la función de combinación que contiene la lógica de combinación para aplicar a cada colección de valores. Por lo general, defines tu función de combinación fuera de línea.

En el siguiente código de ejemplo, se muestra cómo apply una transformación Combine.perKey. En el ejemplo, la PCollection de entrada se agrupa por clave y la colección de valores Double asociados con cada clave se combina en un solo valor de suma:

  PCollection<KV<String, Double>> salesRecords = ...;
  PCollection<KV<String, Double>> totalSalesPerPerson =
    salesRecords.apply(Combine.<String, Double, Double>perKey(
      new Sum.SumDoubleFn()));

Ten en cuenta que Combine.perKey toma tres parámetros de tipo: String para el tipo de clave y Double en el caso de los valores. Esto se debe a que la colección de valores para cada clave es de tipo Double y el valor combinado resultante también es de tipo Double.

En el siguiente código de ejemplo, se muestra cómo apply una transformación Combine.perKey en la que el valor combinado es de un tipo diferente al de la colección original de valores por clave; en este caso, la PCollection de entrada tiene claves de tipo String y valores de tipo Integer, y el valor combinado es un Double que representa el promedio de todos los valores por clave.

  PCollection<KV<String, Integer>> playerAccuracy = ...;
  PCollection<KV<String, Double>> avgAccuracyPerPlayer =
    playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
      new MeanInts())));

A menudo, Java puede inferir estos parámetros de tipo de la CombineFn. Java 8 realiza mejor esta inferencia que las versiones anteriores de Java.

Crea y especifica una función de combinación

Cuando usas una transformación Combine, sin importar la variante, deberás proporcionar cierta lógica de procesamiento que especifique cómo combinar los elementos múltiples en un solo valor.

Cuando diseñes una función de combinación, ten en cuenta que la función no siempre se invoca en todos los valores con una clave determinada. Dado que los datos de entrada (incluida la colección de valor) pueden estar distribuidos en varias instancias de trabajador, la función de combinación puede llamarse varias veces para realizar una combinación parcial en subconjuntos de la colección de valor. En términos generales, la combinación puede aplicarse de manera repetida en una estructura de árbol. Debido a que la estructura de árbol no se especifica, tu función de combinación debe ser conmutativa y asociativa.

Dispones de algunas opciones cuando creas una función de combinación. Por lo general, las operaciones de combinación simple, como las sumas, pueden implementarse como una función simple de un conjunto de valores en un único valor del mismo tipo. Tu función debe ser asociativa y conmutativa. En otras palabras, para una función de combinación f y un conjunto de valores v1, … , vn, debe darse el caso que, lógicamente, f(v1, v2, … , vn) = f(v1, v2, … , f(vk, … , vn)) = f(f(v1, v2), … , f(vk, … , vn)) = f(f(vk, … , vn), … , f(v2, v1)), etcétera.

Las operaciones de combinación más complejas pueden requerir que crees una subclase de CombineFn que tenga un tipo de acumulación distinto del tipo de entrada/salida. Por último, el SDK de Dataflow proporciona algunas funciones comunes que manejan diferentes combinaciones estadísticas, como Sum, Min, Max y Mean. Consulta Usa las operaciones de combinación estadística incluidas para obtener más información.

Combinaciones simples mediante funciones simples

Java

Para funciones de combinación simples, como sumar una colección de valores Integer en un solo Integer, puedes crear una función simple que implemente la interfaz SerializableFunction. Tu SerializableFunction debe convertir un Iterable<V> en un solo valor de tipo V.

En el siguiente código de ejemplo, se muestra una función de combinación simple para sumar una colección de valores Integer; la función SumInts implementa la interfaz SerializableFunction:

  public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
      int sum = 0;
      for (int item : input) {
        sum += item;
      }
      return sum;
    }
  }

Combinaciones avanzadas mediante CombineFn

Para funciones de combinación más complejas, puedes definir una subclase de la clase CombineFn del SDK. Debes usar CombineFn cuando tu función de combinación debe realizar un procesamiento previo o posterior adicional, lo que podría cambiar el tipo de salida, requerir un acumulador más sofisticado o tomar en cuenta la clave.

Considera la naturaleza distribuida del cálculo. La clave o colección de valores completas para cada elemento puede no existir en la misma instancia de Compute Engine en un momento determinado. Debes usar CombineFn para cualquier cálculo que no se pueda realizar de forma correcta sin considerar todos los valores.

Por ejemplo, considera calcular el promedio de la colección de valores asociados con una clave determinada. Para calcular el promedio, tu función de combinación debe sumar todos los valores y, luego, dividir la suma resultante por la cantidad de valores que se sumaron. Sin embargo, si la división se realizara en varias ubicaciones distribuidas, el promedio resultante sería incorrecto. Usar CombineFn permite que el servicio de Cloud Dataflow acumule la suma en ejecución y la cantidad de elementos vistos, y guarde el cálculo final (en este caso, la división) hasta que se hayan acumulado todos los elementos.

Una operación de combinación general consta de cuatro operaciones. Cuando creas una subclase de CombineFn, debes proporcionar cuatro operaciones mediante la anulación de los métodos correspondientes:

  • Crear un acumulador
  • Agregar una entrada
  • Combinar acumuladores
  • Extraer una salida

Si, por algún motivo, la función de combinación necesita acceder a la clave en el par de claves o colecciones de valores, puedes usar KeyedCombineFn en su lugar. KeyedCombineFn pasa la clave como un parámetro de tipo adicional.

Crear un acumulador crea un acumulador “local” nuevo. En el caso de ejemplo, si consideramos un promedio, un acumulador local hace seguimiento de la suma de valores en ejecución (el valor numerador para nuestra división promedio final) y la cantidad de valores sumados hasta el momento (el valor denominador). Puede llamarse cualquier cantidad de veces de forma distribuida.

Agregar una entrada agrega un elemento de entrada a un acumulador y muestra el valor del acumulador. En nuestro ejemplo, actualizaría la suma y aumentaría la cantidad. También puede invocarse de forma paralela.

Combinar acumuladores combina varios acumuladores en un único acumulador; así es cómo se combinan los datos en varios acumuladores antes del cálculo final. En el caso del cálculo promedio, se combinan los acumuladores que representan cada parte de la división. Puede llamarse nuevamente en sus salidas cualquier cantidad de veces.

Extraer una salida realiza el cálculo final. En el caso del cálculo de un promedio, esto significa dividir la suma combinada de todos los valores por la cantidad de valores sumados. Se llama una vez en el acumulador final combinado.

En el siguiente código de ejemplo, se muestra cómo definir una CombineFn para calcular un promedio:

Java

 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
   public static class Accum {
     int sum = 0;
     int count = 0;
   }

   @Override
   public Accum createAccumulator() { return new Accum(); }

   @Override
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }

   @Override
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }

   @Override
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

Consulta la documentación de referencia de la API para Java destinada a CombineFn a fin de obtener todos los detalles sobre los parámetros de tipo obligatorios y genéricos para la subclase CombineFn.

El tipo de acumulador AccumT se debe poder codificar. Consulta Codificación de datos a fin de obtener detalles sobre cómo especificar un codificador predeterminado para un tipo de datos.

Cómo usar las funciones de combinación estadística incluidas

Java

Los SDK de Dataflow contienen varias subclases que proporcionan funciones de combinación matemática básicas. Puedes usar funciones de combinación de estas clases cuando usas Combine.globally o Combine.perKey. Las funciones incluyen lo siguiente:

  • Sum: suma todos los valores juntos para producir un valor de suma único.
  • Min: conserva solo el valor mínimo de todos los valores disponibles.
  • Max: conserva solo el valor máximo de todos los valores disponibles.
  • Mean: calcula un valor promedio único de todos los valores disponibles.

Cada clase define funciones de combinación para los tipos Integer, Long y Double (excepto la clase Mean, que define una función de combinación acumulativa genérica, a la que deberás pasar un parámetro de tipo).

Por ejemplo, puedes usar Sum.SumIntegerFn para sumar una colección de valores Integer (por clave, en este ejemplo):

  PCollection<KV<String, Integer>> playerScores = ...;
  PCollection<KV<String, Integer>> totalPointsPerPlayer =
    playerScores.apply(Combine.<String, Integer, Integer>perKey(
      new Sum.SumIntegerFn()));

Del mismo modo, puedes usar Max.MaxLongFn para calcular el valor máximo de Long en una PCollection<Long>, como se muestra a continuación:

  PCollection<Long> waitTimes = ...;
  PCollection<Long> longestWaitTime = waitTimes.apply(
    Combine.globally(new Max.MaxLongFn()));

Las clases Sum, Min, Max y Mean se definen en el paquete com.google.cloud.dataflow.sdk.transforms. Consulta la referencia de la API de Cloud Dataflow para Java a fin de obtener más información.