Cómo combinar grupos y valores

Durante una canalización, a menudo, deseas combinar o fusionar grupos de valores en tus datos. Por ejemplo, puedes tener un grupo de datos de ventas que consiste en pedidos para un mes determinado, cada uno con un valor en dólares. En tu canalización, es posible que desees combinar todos los valores de pedidos en un único valor que represente el valor total en dólares de pedidos para ese mes, el pedido más grande o el valor promedio en dólares por pedido. Para obtener este tipo de datos, combina los valores en tu grupo.

Los SDK de Dataflow contienen varias operaciones que puedes usar para combinar los valores en los objetos PCollection de tu canalización o los valores agrupados por clave.

La transformación básica Combine encapsula diferentes métodos genéricos que puedes usar para combinar los elementos o valores en PCollection. Combine dispone de variantes que funcionan en PCollection completas, y algunas que combinan las transmisiones de valor individual en PCollection de pares clave-valor. Combine también posee subclases para operaciones de combinación numérica específicas, como sumas, mínimos, máximos y promedios.

Cuando aplicas una transformación Combine, debes proporcionar la función que contiene la lógica real para combinar los elementos o valores. Más adelante en esta sección, consulta Cómo crear y especificar una función de combinación para obtener más información.

En este sentido, las transformaciones Combine son similares a la transformación ParDo que aplica la lógica en una función de procesamiento que proporcionas a cada elemento.

Cómo combinar PCollection en un único valor

Puedes combinar todos los elementos de una PCollection determinada en un único valor, representado en tu canalización como una PCollection nueva que contiene un elemento. Para combinar los elementos de una PCollection, usa la transformación de combinación global.

Combine funciona de manera distinta si la entrada PCollection se divide con un sistema de ventanas. En ese caso, una combinación global muestra un único elemento por ventana.

La combinación global utiliza una función de combinación que proporcionas para combinar cada valor de un elemento. Más adelante en esta sección, consulta Cómo crear y especificar una función de combinación para conocer más detalles.

Los SDK de Dataflow proporcionan algunas funciones de combinación previamente compiladas para operaciones de combinación numérica comunes, como sumas, mínimos, máximos y promedios. Puedes usar estas funciones con combinaciones globales en lugar de crear tu propia función de combinación. Consulta Cómo usar las operaciones de combinación estadísticas incluidas para obtener más información.

Java

El siguiente código de ejemplo muestra cómo aplicar (apply) una transformación Combine.globally para obtener un valor de suma único para una PCollection de tipo Integer:

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()));

En el ejemplo, Sum.SumIntegerFn() corresponde a CombineFn que la transformación usa para combinar los elementos en la entrada PCollection. La entrada PCollection resultante, llamada sum, contendrá un valor: la suma de todos los elementos en la entrada PCollection.

Sistema de ventanas global

Si tu entrada PCollection usa el sistema de ventanas global predeterminado, el comportamiento de Dataflow predeterminado es mostrar una entrada PCollection que contenga un elemento. El valor de ese elemento proviene del acumulador en la función de combinación especificado cuando aplicaste Combine. Por ejemplo, la función de combinación Sum de Dataflow muestra un valor cero (la suma de una entrada vacía), mientras que la función de combinación Min muestra un valor infinito o máximo.

En cambio, para que Combine muestre una entrada PCollection vacía si la entrada está vacía, especifica .withoutDefaults cuando apliques tu transformación Combine, como en el siguiente código de ejemplo:

  PCollection<Integer> pc = ...;
  PCollection<Integer> sum = pc.apply(
    Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());

Sistema de ventanas que no es global

Si tu entrada PCollection usa una función de sistema de ventanas que no es global, Dataflow no actúa de forma predeterminada. Debes especificar una de las siguientes opciones cuando aplicas Combine:

  • Especifica .withoutDefaults, en el que las ventanas que están vacías en la entrada PCollection también estarán vacías en el grupo de salida.
  • Especifica .asSingletonView, en que la salida de inmediato se convierte en PCollectionView, que proporcionará un valor predeterminado para cada ventana vacía cuando se usa como entrada complementaria. Por lo general, solo necesitarás usar esta opción si el resultado de Combine de tu canalización se usa como entrada complementaria posteriormente en la canalización.

Cómo combinar valores en un grupo agrupado por clave

Después de crear un grupo agrupado por clave (por ejemplo, cuando usas una transformación GroupByKey), un patrón común es combinar el grupo de valores relacionados con cada clave en un único valor combinado.

Si nos basamos en el ejemplo anterior de GroupByKey, una entrada PCollection agrupada por clave llamada groupedWords se vería de la siguiente forma:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

En la entrada PCollection anterior, cada elemento tiene una clave de string (por ejemplo, “cat”) y un iterable de números enteros para su valor (en el primer elemento, que contiene [1, 5, 9]), en el cual cada número entero representa el número de línea del texto original donde apareció la clave. Si nuestro próximo paso de la canalización combina los valores (en lugar de considerarlos de forma individual), puedes combinar el iterable de los números enteros para crear un único valor combinado que se sincronice con cada clave.

La lógica real acerca de cómo se combinan los valores depende de ti; por ejemplo, en un grupo agrupado por clave de claves y valores de números enteros, puede ser que elijas sumar el grupo de números enteros relacionados con cada clave (lo cual es útil, por ejemplo, cuando los valores representan conteos de casos). En nuestro ejemplo, dado que los valores representan números en línea, es posible que deseemos tomar el mínimo, el cual indica el primer caso.

Java

  PCollection<KV<String, Integer>> occurrences = ...;
  PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());
  PCollection<KV<String, Integer>> firstOccurrences = pc.apply(
    Combine.groupedValues(new Min.MinIntegerFn()));

Cómo usar Combine PerKey

Los SDK de Dataflow representan este patrón entero, GroupByKey, y luego combinan el grupo de valores, como la transformación de Combine PerKey.

Combine PerKey realiza ambos pasos del patrón: Se aplica (apply) a una entrada PCollection de pares clave-valor, como se haría con GroupByKey. Luego, la transformación Combine PerKey aplica una función de combinación que proporcionas al conjunto de todos los valores asociados con cada clave. La operación de combinación produce una nueva entrada PCollection de pares clave-valor, que contiene claves únicas y un único valor combinado para cada clave.

Combine PerKey realiza la transformación GroupByKey como parte de su operación; por consiguiente, se comporta como GroupByKey lo hace cuando tu entrada PCollection se divide mediante un sistema de ventanas. Es decir, Combine PerKey realiza combinaciones por clave y ventana.

Mientras puedas crear un ParDo para realizar este tipo de función (es decir, mediante la agrupación de un grupo por clave y luego combinar los valores), la semántica más estructurada de la transformación CombineFn permite que el servicio de Cloud Dataflow ejecute la operación de combinación de forma más eficiente.

La función de combinación que proporcionas a Combine PerKey debe ser una función de reducción asociativa o una subclase de CombineFn. Más adelante en esta sección, consulta Cómo crear y especificar una función de combinación para obtener más detalles acerca de cómo crear una transformación CombineFn.

Java

Cuando usas Combine.perKey, pasas parámetros de tipo en función de los tipos de claves y valores en tu entrada PCollection, y un parámetro de tipo adicional si el valor combinado en tu salida PCollection es de distinto tipo con respecto a los valores en tu entrada PCollection.

También deberás pasarle a Combine.perKey la función de combinación que contiene la lógica de combinación para aplicar a cada grupo de valores. Comúnmente defines tu función de combinación fuera de línea.

El siguiente código de ejemplo muestra cómo aplicar (apply) una transformación Combine.perKey. En el ejemplo, la entrada PCollection se agrupa por clave, y el grupo de valores Double asociado con cada clave se combina en un único valor de suma:

  PCollection<KV<String, Double>> salesRecords = ...;
  PCollection<KV<String, Double>> totalSalesPerPerson =
    salesRecords.apply(Combine.<String, Double, Double>perKey(
      new Sum.SumDoubleFn()));

Ten en cuenta que Combine.perKey toma tres parámetros de tipo: String para el tipo de clave y Double para los valores. Esto se debe a que el grupo de valores para cada clave es de tipo Double, y el valor combinado resultante también es de tipo Double.

El siguiente código de ejemplo muestra cómo aplicar (apply) una transformación Combine.perKey en la que el valor combinado es de tipo diferente con respecto al grupo original de valores por clave. En este caso, la entrada PCollection tiene claves de tipo String y valores de tipo Integer, y el valor combinado es Double que representa el promedio de todos los valores por clave.

  PCollection<KV<String, Integer>> playerAccuracy = ...;
  PCollection<KV<String, Double>> avgAccuracyPerPlayer =
    playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
      new MeanInts())));

A menudo, Java puede inferir estos parámetros de tipo del tipo de CombineFn. Java 8 es mejor en esta inferencia que las versiones anteriores de Java.

Cómo crear y especificar una función de combinación

Cuando uses una transformación Combine, independientemente de la variante, necesitarás proporcionar alguna lógica de procesamiento que especifique cómo combinar los distintos elementos en un único valor.

Cuando diseñes una función de combinación, ten en cuenta que la función no necesariamente se invoca en todos los valores con una clave determinada. Dado que los datos de entrada (incluido el grupo de valor) pueden estar distribuidos en varias instancias de trabajador, la función de combinación puede llamarse varias veces para realizar una combinación parcial en subconjuntos del grupo de valor. En términos generales, la combinación puede aplicarse repetidamente en una estructura de árbol. Debido a que la estructura de árbol no se especifica, tu función de combinación debería ser conmutativa y asociativa.

Dispones de algunas opciones cuando creas una función de combinación. Por lo general, las operaciones de combinación simple, como las sumas, pueden implementarse como una función simple de un conjunto de valores en un único valor del mismo tipo. Tu función debe ser asociativa y conmutativa. En otras palabras, para una función de combinación f y un conjunto de valores v1, … y vn, debe darse el caso que, lógicamente, f(v1, v2, …, vn) = f(v1, v2, …, f(vk, …, vn)) = f(f(v1, v2), …, f(vk, …, vn)) = f(f(vk, …, vn), …, f(v2, v1)), etcétera.

Las operaciones de combinación más complejas pueden requerir que crees una subclase de CombineFn que tiene un tipo de acumulación distinto del tipo de entrada/salida. Finalmente, el SDK de Dataflow proporciona algunas funciones comunes que controlan diferentes combinaciones estadísticas, como Sum, Min, Max y Mean. Consulta Cómo usar las operaciones de combinación estadística incluidas para obtener más información.

Combinaciones simples mediante funciones simples

Java

Para funciones de combinación simple, como sumar un grupo de valores Integer en un único Integer, puedes crear una función simple que implemente la interfaz SerializableFunction. Tu SerializableFunction debería convertir un Iterable<V> en un único valor de tipo V.

El siguiente código de ejemplo muestra una función de combinación simple para sumar un grupo de valores Integer. La función SumInts implementa la interfaz SerializableFunction:

  public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
      int sum = 0;
      for (int item : input) {
        sum += item;
      }
      return sum;
    }
  }

Combinaciones avanzadas mediante CombineFn

Para obtener más funciones de combinación compleja, puedes definir una subclase de la clase CombineFn del SDK. Deberías usar CombineFn cuando tu función de combinación debe realizar un procesamiento previo o posterior adicional, que puede cambiar el tipo de salida, requerir un acumulador más sofisticado o tomar en cuenta la clave.

Considera la naturaleza distribuida del cálculo. La clave o el grupo de valores completos para cada elemento puede no existir en la misma instancia de Compute Engine en un momento determinado. Deberías usar CombineFn para un cálculo que no se pueda realizar adecuadamente sin considerar todos los valores.

Por ejemplo, considera calcular el promedio del grupo de valores asociado con una clave determinada. Para calcular la media, tu función de combinación debe sumar todos los valores y, luego, dividir la suma resultante por la cantidad de valores que haya agregado. Sin embargo, si la división se realizara en varias ubicaciones distribuidas, el promedio resultante sería incorrecto. Usar CombineFn le permite al servicio de Cloud Dataflow acumular la suma de ejecución y la cantidad de elementos observados, y guardar el cálculo final (en este caso, la división) hasta que todos los elementos se hayan acumulado.

Una operación de combinación general consiste en cuatro operaciones. Cuando creas una subclase de CombineFn, necesitas proporcionar cuatro operaciones mediante la anulación de los métodos correspondientes:

  • Crear un acumulador
  • Agregar una entrada
  • Combinar acumuladores
  • Extraer una entrada

Si, por algún motivo, tu función de combinación necesita acceder a la clave en el par clave/grupo de valores, puedes usar KeyedCombineFn en su lugar. KeyedCombineFn pasa la clave como un parámetro de tipo adicional.

Crear un acumulador crea un nuevo acumulador “local”. En el caso de ejemplo, si consideramos un promedio, un acumulador local hace seguimiento de la suma de valores en ejecución (el valor numerador para nuestra división promedio final) y la cantidad de valores sumados hasta el momento (el valor denominador). Puede llamarse cualquier cantidad de veces de forma distribuida.

Agregar una entrada agrega un elemento de entrada a un acumulador y muestra el valor del acumulador. En nuestro ejemplo, actualizaría la suma y aumentaría la cantidad. También puede invocarse de forma paralela.

Combinar acumuladores combina varios acumuladores en un único acumulador; así es cómo se combinan los datos en varios acumuladores antes del cálculo final. En el caso del cálculo promedio, se combinan los acumuladores que representan cada parte de la división. Puede llamarse nuevamente en sus salidas cualquier cantidad de veces.

Extraer una salida realiza el cálculo final. En el caso del cálculo de un promedio, esto significa dividir la suma combinada de todos los valores por la cantidad de valores sumados. Se llama una vez en el acumulador final combinado.

El siguiente código de ejemplo muestra cómo definir una transformación CombineFn para calcular un promedio:

Java

 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
   public static class Accum {
     int sum = 0;
     int count = 0;
   }

   @Override
   public Accum createAccumulator() { return new Accum(); }

   @Override
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }

   @Override
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }

   @Override
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

Consulta la documentación de referencia de API para Java destinada a CombineFn a fin de obtener todos los detalles sobre los parámetros de tipo obligatorios y genéricos que necesitarás para la subclase CombineFn.

El tipo de acumulador AccumT debe poder codificarse. Consulta Codificación de datos a fin de obtener detalles sobre cómo especificar un codificador predeterminado para un tipo de dato.

Cómo usar las funciones de combinación estadística incluidas

Java

Los SDK de Dataflow contienen varias subclases que proporcionan funciones de combinación matemática básicas. Puedes usar funciones de combinación de estas subclases como función de combinación cuando usas Combine.globally o Combine.perKey. Las funciones incluyen lo siguiente:

  • Sum: agrega todos los valores juntos para generar un único valor de suma.
  • Min: mantiene solo el valor mínimo de todos los valores disponibles.
  • Max: mantiene solo el valor máximo de todos los valores disponibles.
  • Mean: calcula un único valor promedio de todos los valores disponibles.

Cada clase define funciones de combinación para los tipos Integer, Long y Double (a excepción de la clase Mean, que define una función de combinación genérica de acumulación a la cual necesitarás pasarle un parámetro de tipo).

Por ejemplo, puedes usar Sum.SumIntegerFn para sumar un grupo de valores Integer (en este ejemplo, por clave):

  PCollection<KV<String, Integer>> playerScores = ...;
  PCollection<KV<String, Integer>> totalPointsPerPlayer =
    playerScores.apply(Combine.<String, Integer, Integer>perKey(
      new Sum.SumIntegerFn()));

Asimismo, puedes usar Max.MaxLongFn para calcular el valor Long máximo en una entrada PCollection<Long>, como se muestra:

  PCollection<Long> waitTimes = ...;
  PCollection<Long> longestWaitTime = waitTimes.apply(
    Combine.globally(new Max.MaxLongFn()));

Las clases Sum, Min, Max y Mean se definen en el paquete com.google.cloud.dataflow.sdk.transforms. Consulta la referencia de API para Java de Cloud Dataflow a fin de obtener más información.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.