Procesamiento paralelo con ParDo

ParDo es la principal operación de procesamiento paralelo en los SDK de Dataflow. Usas ParDo para el procesamiento genérico en paralelo. El estilo de procesamiento ParDo es similar a lo que sucede dentro de la clase “Mapper” de un algoritmo de estilo Map/Shuffle/Reduce: ParDo toma cada elemento en una PCollection de entrada, realiza alguna función de procesamiento sobre ese elemento y, luego, emite cero, uno o varios elementos en una PCollection de salida.

Proporcionas la función que realiza ParDo en cada uno de los elementos de la PCollection de entrada. La función que proporcionas se invoca de forma independiente y, en paralelo, en varias instancias de trabajador en tu trabajo de Dataflow.

ParDo es útil para una variedad de operaciones de procesamiento de datos, incluidas las siguientes:

  • Filtrar un conjunto de datos. Puedes usar ParDo para considerar cada elemento de una PCollection y enviarlo a una colección nueva o descartarlo.
  • Dar formato al tipo de cada elemento en un conjunto de datos, o convertirlo. Puedes usar ParDo para dar formato a los elementos en tu PCollection, como dar formato a pares clave-valor en strings imprimibles.
  • Extraer partes de cada elemento en un conjunto de datos. Puedes usar ParDo para extraer solo una parte de cada elemento en tu PCollection. Esto puede ser útil en situaciones particulares a fin de extraer campos individuales desde las filas de una tabla de BigQuery.
  • Realizar procesamientos sobre cada elemento en un conjunto de datos. Puedes usar ParDo para realizar cálculos simples o complejos en cada elemento o en ciertos elementos de una PCollection.

ParDo también es un paso intermedio común en una canalización. Por ejemplo, puedes usar ParDo para asignar claves a cada elemento en una PCollection y crear pares clave-valor. Luego, puedes agrupar los pares mediante una transformación GroupByKey.

Aplica una transformación ParDo

Para usar ParDo, aplícala a la PCollection que deseas transformar y guarda el valor que se muestra como una PCollection del tipo adecuado.

El argumento que proporcionas a ParDo debe ser una subclase de un tipo específico proporcionado por el SDK de Dataflow, llamado DoFn. Para obtener más información sobre DoFn, consulta Crea y especifica una lógica de procesamiento más adelante en esta sección.

En la siguiente muestra de código, se observa un ParDo básico aplicado a una PCollection de strings, que pasa una función basada en DoFn para calcular la longitud de cada string y entregar esos resultados a una PCollection de números enteros.

Java

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

En el ejemplo el código llama a apply en la colección de entradas (llamadas “palabras”). ParDo es el argumento PTransform. La operación .of es en la que especificas el DoFn que se realizará en cada elemento, llamado, en este caso, ComputeWordLengthFn().

Crea y especifica una lógica de procesamiento

La lógica de procesamiento que proporcionas para ParDo debe ser de un tipo específico requerido por el SDK de Dataflow que usas a fin de crear tu canalización.

Java

Debes compilar una subclase de la clase DoFn del SDK.

La función que proporcionas de forma independiente y en varias instancias de Google Compute Engine.

Además, tu DoFn no debe depender de ningún estado persistente de invocación a invocación. Cualquier instancia de tu función de procesamiento en Cloud Platform podría no tener acceso a la información de estado en cualquier otra instancia de esa función.

Nota: El SDK de Dataflow proporciona una variante de ParDo que puedes usar para pasar datos persistentes inmutables a cada invocación de tu código de usuario como una entrada complementaria.

Java

Un DoFn procesa un elemento a la vez desde la PCollection de entrada. Cuando creas una subclase de DoFn, especificas el tipo de elemento de entrada y los tipos de elementos de salida como parámetros de tipo. En la siguiente muestra de código, se observa cómo podríamos definir la función ComputeWordLengthFn() del ejemplo anterior, que acepta una String de entrada y produce un Integer de salida:

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

Tu subclase de DoFn debe anular el método de procesamiento de elementos, processElement, en el que proporcionas el código para trabajar con el elemento de entrada. En el siguiente ejemplo de código, se muestra el ComputeWordLengthFn() completo:

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

No es necesario extraer de forma manual los elementos de la colección de entrada; el SDK de Dataflow para Java controla la extracción de cada elemento y su transferencia a tu subclase DoFn. Cuando anulas processElement, el método de anulación debe aceptar un objeto de tipo ProcessContext, lo que te permite acceder al elemento que deseas procesar. Accede al elemento que se pasa a tu DoFn mediante el método ProcessContext.element().

Si los elementos en tu PCollection son pares clave-valor, puedes acceder a la clave mediante ProcessContext.element().getKey() y al valor mediante ProcessContext.element().getValue().

Java

El SDK de Dataflow para Java controla de forma automática la recopilación de los elementos de salida en una PCollection de resultado. Usa el objeto ProcessContext para enviar los elementos de los resultados desde processElement hasta la colección de salida. A fin de generar un elemento para la colección de resultados, usa el método ProcessContext.output().

DoFns básicas

Los SDK de Dataflow proporcionan formas específicas del lenguaje para simplificar cómo proporcionas tu implementación de DoFn.

Java

A menudo, puedes crear un argumento DoFn simple para ParDo como una instancia de clase interna anónima. Si tu DoFn consta solo de unas líneas, puedes especificarlo de forma intercalada para que sea más claro. En la siguiente muestra de código, se observa cómo aplicar un ParDo con la función ComputeWordLengthFn como un DoFn anónimo:

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

Para transformaciones como la anterior que aplican una función a cada elemento en la entrada a fin de producir de manera exacta una salida por elemento, puedes usar la transformación de nivel superior MapElements. Esto es muy conciso en Java 8, ya que MapElements acepta una función lambda.

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

De manera similar, puedes usar las funciones lambda de Java 8 con las transformaciones Filter, FlatMapElements y Partition. Consulta Transformaciones predefinidas en el SDK de Dataflow para obtener información sobre estas transformaciones.

Nombres de transformación

Los nombres de transformación aparecen en el grafo de ejecución cuando visualizas tu canalización en la Interfaz de supervisión de Dataflow. Es muy importante especificar un nombre explícito para tu transformación a fin de reconocerlas en el gráfico.

Java

La operación .named especifica el nombre de transformación para este paso en tu canalización. Los nombres de transformación aparecen en el gráfico de ejecución cuando visualizas tu canalización en la Interfaz de supervisión de Dataflow. Es muy importante especificar un nombre explícito cuando usas una instancia DoFn anónima con ParDo, para que puedas ver un nombre fácil de leer en la interfaz de supervisión.

Entradas complementarias

Además de la entrada principal PCollection, puedes proporcionar entradas adicionales a una transformación ParDo en forma de entradas complementarias. Una entrada complementaria es una entrada adicional a la que tu DoFn puede acceder cada vez que procesa un elemento en la PCollection de entrada. Cuando especificas una entrada complementaria, creas una vista de otros datos que se pueden leer desde el DoFn de la transformación ParDo mientras procesas cada elemento.

Las entradas complementarias son útiles si tu ParDo necesita inyectar datos adicionales cuando procesa cada elemento en la PCollection de entrada, pero los datos adicionales deben determinarse en el entorno de ejecución (y no deben estar hard-coded). Esos valores podrían estar determinados por los datos de entrada o podrían depender de una rama diferente de tu canalización. Por ejemplo, puedes obtener un valor de un servicio remoto mientras tu canalización está en ejecución y usarlo como una entrada complementaria. O bien, puedes usar un valor que calcula una rama separada de tu canalización y agregarlo como una entrada complementaria a la ParDo de otra rama.

Representa una entrada complementaria

Java

Las entradas laterales siempre son de tipo PCollectionView. PCollectionView es una forma de representar una PCollection como una entidad única, que luego puedes pasar como una entrada complementaria a un ParDo. Puedes crear una PCollectionView que exprese una PCollection como uno de los siguientes tipos:

Tipo de vista Uso
View.asSingleton Representa una PCollection como un valor individual. Por lo general, se usa después de combinar una PCollection mediante Combine.globally. Usa esto cuando tu entrada complementaria sea un solo valor procesado. Por lo general, debes crear una vista singleton mediante Combine.globally(...).asSingletonView().
View.asList Representa una PCollection como una List. Usa esta vista cuando tu entrada complementaria sea una colección de valores individuales.
View.asMap Representa una PCollection como un Map. Usa esta vista cuando tu entrada complementaria esté compuesta por pares clave-valor (PCollection<K, V>) y tenga un solo valor para cada clave.
View.asMultimap Representa una PCollection como un MultiMap. Usa esta vista cuando tu entrada complementaria esté compuesta por pares clave-valor (PCollection<K, V>) y tenga varios valores para cada clave.

Nota: Al igual que otros datos de canalización, PCollectionView es inmutable una vez creada.

Pasa entradas complementarias a ParDo

Java

Pasas entradas complementarias a tu transformación ParDo mediante una invocación a .withSideInputs. Dentro de tu DoFn, puedes acceder a la entrada complementaria mediante el método DoFn.ProcessContext.sideInput.

En el siguiente código de ejemplo, se crea una entrada complementaria de singleton desde una PCollection<Integer> y se pasa a un ParDo posterior.

En el ejemplo tenemos una PCollection<String> llamada words que representa una colección de palabras individuales, y una PCollection<Integer> que representa la longitud de las palabras; podemos usar la última para calcular un límite de longitud máxima de palabra y, luego, pasar ese valor calculado como una entrada complementaria a un ParDo que filtre words según el límite.

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

Entradas laterales y sistema de ventanas

Cuando creas una PCollectionView de una PCollection con ventanas, que puede ser infinita y, por lo tanto, no se puede comprimir en un solo valor (o clase de colección única), la PCollectionView representa una sola entidad por ventana. Es decir, la PCollectionView representa un singleton por ventana, una lista por ventana, etcétera.

Dataflow usa las ventanas a fin de que el elemento de entrada principal busque la ventana apropiada para el elemento de entrada complementaria. Dataflow proyecta la ventana del elemento de entrada principal en el conjunto de ventanas de la entrada complementaria y luego usa la entrada complementaria de la ventana resultante. Si la entrada principal y las complementarias tienen ventanas idénticas, la proyección proporciona la ventana correspondiente exacta; sin embargo, si las entradas tienen ventanas diferentes, Dataflow usa la proyección para elegir la ventana de entrada lateral más apropiada.

Java

Por ejemplo, si la entrada principal se organiza mediante ventanas fijas de un minuto y la entrada complementaria se organiza mediante ventanas fijas de una hora, Dataflow proyecta la ventana de entrada principal en el conjunto de ventanas de la entrada complementaria y selecciona el valor de la entrada complementaria de la ventana de entrada complementaria de una hora apropiada.

Si la entrada complementaria tiene varios disparos de activación, Dataflow usa el valor del último disparo de activación. En particular, esto es útil si usas una entrada complementaria con una única ventana global y especificas un activador.

Salidas laterales

Mientras que ParDo siempre produce una PCollection de salida principal (como el valor de retorno de apply), también puedes hacer que tu ParDo produzca cualquier cantidad de PCollection de salida adicionales. Si eliges tener varias salidas, tu ParDo mostrará todas las PCollection de salida (incluida la salida principal) agrupadas. Por ejemplo, en Java, las PCollection de salida se agrupan en una PCollectionTuple de tipo seguro.

Etiquetas para salidas complementarias

Java

A fin de emitir elementos a una PCollection de salida complementaria, deberás crear un objeto TupleTag para identificar cada colección que produce tu ParDo. Por ejemplo, si tu ParDo produce tres PCollection de salida (la salida principal y dos salidas complementarias), deberás crear tres TupleTag asociadas.

En el siguiente código de ejemplo, se muestra cómo crear TupleTag para un ParDo con una salida principal y dos salidas complementarias:

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

Pasa etiquetas de salida a ParDo

Una vez que especificaste las TupleTag para cada una de tus ParDo de salida, deberás pasar esas etiquetas a tu ParDo mediante una invocación a .withOutputTags. Primero, pasas la etiqueta para la salida principal y, luego, las etiquetas de cualquier salida complementaria en una TupleTagList.

A partir de nuestro ejemplo anterior, aquí se muestra cómo pasamos las tres TupleTag (una para la salida principal y dos destinadas a las salidas complementarias) a nuestro ParDo:

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

Ten en cuenta que todas las salidas (incluida la principal PCollection) se agrupan en la PCollectionTuple que se muestra llamada results..

Emite salidas complementarias en tu DoFn

Java

Dentro de tu DoFn de ParDo, puedes emitir un elemento a una salida complementaria mediante el método ProcessContext.sideOutput. Deberás pasar la TupleTag adecuada para la colección de salidas complementarias de destino cuando llames a ProcessContext.sideOutput.

Como se muestra en el ejemplo anterior, la emisión de DoFn en las salidas principales y complementarias es la siguiente:

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

Después de tu ParDo, deberás extraer las PCollection de salida complementaria y principal resultantes de la PCollectionTuple que se muestra. Consulta la sección sobre PCollectionTuple para ver algunos ejemplos en los que se muestra cómo extraer PCollection individuales de una tupla.