Procesamiento paralelo con ParDo

ParDo es la operación de procesamiento paralelo central en los SDK de Dataflow. ParDo se usa para el procesamiento paralelo genérico. El estilo de procesamiento ParDo es similar a lo que sucede dentro de la clase “Mapper” de un algoritmo de estilo Map/Shuffle/Reduce: ParDo toma cada elemento en una entrada PCollection, realiza algunas funciones de procesamiento sobre ese elemento y, luego, emite cero, uno o varios elementos en una salida PCollection.

Tú proporcionas la función que ParDo realiza sobre cada elemento de la entrada PCollection. La función que proporcionas se invoca de forma independiente y en paralelo sobre varias instancias de trabajador en tu trabajo de Dataflow.

ParDo es útil para una variedad de operaciones de procesamiento de datos, incluso las que se muestran a continuación:

  • Filtrar un conjunto de datos. Puedes usar ParDo para considerar cada elemento en una PCollection y enviar ese elemento a una colección nueva o descartarlo.
  • Dar formato o convertir el tipo de cada elemento en un conjunto de datos. Puedes usar ParDo para dar formato a los elementos en tu PCollection, como dar formato a los pares clave-valor como strings que se pueden imprimir.
  • Extraer partes de cada elemento en un conjunto de datos. Puedes usar ParDo para extraer solo una parte de cada elemento en tu PCollection. Esto puede ser útil en situaciones particulares, como para extraer campos individuales desde las filas de una tabla de BigQuery.
  • Realizar procesamientos sobre cada elemento en un conjunto de datos. Puedes usar ParDo para realizar procesamientos simples o complejos sobre cada elemento o sobre ciertos elementos de una PCollection.

ParDo es un paso intermedio común en una canalización. Por ejemplo, puedes usar ParDo con el fin de asignar claves a cada elemento en una PCollection y crear así pares clave-valor. Más tarde, puedes agrupar los pares con una transformación GroupByKey.

Aplica una transformación ParDo

A fin de usar ParDo, debes aplicarla a la PCollection que deseas transformar y guarda el valor mostrado como una PCollection del tipo adecuado.

EL argumento que proporcionas para ParDo debe ser una subclase de un tipo específico que proporciona el SDK de Dataflow, llamado DoFn. Si quieres obtener más información sobre DoFn, consulta Crea y especifica una lógica de procesamiento más adelante en esta sección.

En el código de ejemplo siguiente, se muestra un ParDo básico aplicado a una PCollection de strings que pasa una función basada en DoFn para procesar la longitud de cada string y da como salida las longitudes de la string en una PCollection de números enteros.

Java

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

En el ejemplo, el código llama a apply en la colección de entradas (llamadas “palabras”). ParDo es el argumento PTransform. La operación .of está donde especificas a DoFn que realice la operación sobre cada elemento que se llama ComputeWordLengthFn() en este caso.

Crea y especifica una lógica de procesamiento

La lógica de procesamiento que proporciones para ParDo debe ser de un tipo específico requerido por el SDK de Dataflow que usas a fin de crear tu canalización.

Java

Debes compilar una subclase de la clase DoFn del SDK.

La función que proporcionas de forma independiente y en varias instancias de Google Compute Engine.

Además, tu DoFn no debe depender de ningún estado persistente de invocación a invocación. Cualquier instancia de tu función de procesamiento en Cloud Platform podría no tener acceso a la información del estado en cualquier otra instancia de esa función.

Nota: El SDK de Dataflow proporciona una variante de ParDo que puedes usar para pasar datos persistentes inmutables a cada invocación de tu código de usuario como una entrada lateral.

Java

Un DoFn procesa un elemento a la vez desde la entrada PCollection. Cuando creas una subclase de DoFn, especificas el tipo de elemento de entrada y el tipo de elementos de salida como parámetros de tipo. En el código de ejemplo siguiente, se muestra cómo podríamos definir la función ComputeWordLengthFn() del ejemplo anterior que acepta una entrada String y produce una salida Integer:

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

Tu subclase de DoFn debe anular processElement, el método de procesamiento de elementos, en el que proporcionas tu código para que trabaje con el elemento de entrada. En el ejemplo de código siguiente, se muestra la ComputeWordLengthFn() completa:

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

No necesitas extraer los elementos de forma manual desde la colección de entrada; el SDK de Dataflow para Java controla la extracción de cada elemento y los pasa a tu subclase de DoFn. Cuando anulas processElement, tu método de anulación debe aceptar un objeto de tipo ProcessContext, lo que te permite acceder al elemento que quieres procesar. Puedes acceder al elemento que se pasó a tu DoFn con el método ProcessContext.element().

Si los elementos en tu PCollection son pares clave-valor, puedes acceder a la clave mediante ProcessContext.element().getKey(), y al valor mediante ProcessContext.element().getValue().

Java

El SDK de Dataflow para Java controla de forma automática la recopilación de los elementos de salida en una PCollection de resultados. Usa el objeto ProcessContext para enviar los elementos resultantes desde processElement a la colección de salida. Para enviar un elemento a la colección de resultados, usa el método ProcessContext.output().

DoFns básicas

El SDK de Dataflow proporciona formas específicas del lenguaje para simplificar cómo proporcionas tu implementación DoFn.

Java

Por lo general, puedes crear un argumento DoFn simple para ParDo como una instancia de clase interna anónima. Si tu DoFn consiste solo de unas líneas, puedes especificarlo de forma intercalada para que sea más claro. En el código de ejemplo siguiente, se muestra cómo aplicar un ParDo con la función ComputeWordLengthFn como un DoFn anónimo:

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

Para las transformaciones como la anterior, que aplican una función a cada elemento en la entrada a fin de producir una salida por elemento con exactitud, puedes usar la transformación MapElements de nivel más alto. En especial, esto es conciso en Java 8, ya que MapElements acepta una función lambda.

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

También puedes usar las funciones lambda de Java 8 con las transformaciones Filter, FlatMapElements y Partition. Consulta Transformaciones escritas con anterioridad en los SDK de Dataflow para obtener información sobre estas transformaciones.

Nombres de transformación

Los nombres de transformación aparecen en el grafo de ejecución cuando visualizas tu canalización en la Interfaz de supervisión de Dataflow. Es muy importante especificar un nombre explícito para tu transformación a modo de reconocerlas en el grafo.

Java

La operación .named especifica el nombre de transformación para este paso en tu canalización. Los nombres de transformación aparecen en el grafo de ejecución cuando visualizas tu canalización en la Interfaz de supervisión de Dataflow. Es muy importante especificar un nombre explícito cuando usas un instancia DoFn anónima con ParDo, así puedes ver un nombre fácil de leer de tu paso en la interfaz de supervisión.

Entradas laterales

Además de la entrada principal PCollection, puedes proporcionar entradas adicional a una transformación ParDo en la forma de entradas laterales. Una entrada lateral es una entrada adicional a la que tu DoFn puede acceder cada vez que procesa un elemento en la entrada PCollection. Cuando especificas una entrada lateral, creas una vista de algunos otros datos que se pueden leer desde dentro del DoFn de la transformación ParDo mientras se procesa cada elemento.

Las entradas laterales son útiles si tu ParDo necesita incorporar datos adicionales al momento de procesar cada elemento en la entrada PCollection, pero los datos adicionales se tienen que determinar en el entorno de ejecución (y no codificarse de forma rígida). Esos valores podrían estar determinados por los datos de entrada o podrían depender de una rama diferente de tu canalización. Por ejemplo, puedes obtener un valor de un servicio remoto mientras tu canalización está en ejecución y usarlo como una entrada lateral. O bien, puedes usar un valor procesado por una rama distinta de tu canalización y agregarlo como una entrada lateral al ParDo de otra rama.

Representa una entrada lateral

Java

Las entradas laterales siempre son de tipo PCollectionView. PCollectionView es una forma de representar una PCollection como una entidad única, que luego puedes pasar como una entrada lateral a un ParDo. Puedes hacer una PCollectionView que exprese una PCollection como uno de los tipos siguientes:

Ver tipo Uso
View.asSingleton Representa a PCollection como un valor individual; en general, tendrás que usar esto después de la combinación de un PCollection mediante Combine.globally. Usa esto cuando tu entrada lateral sea un solo valor procesado. Por lo general, debes crear una vista de singleton con Combine.globally(...).asSingletonView().
View.asList Representa una PCollection como una List. Usa esta vista cuando tu entrada lateral sea una colección de valores individuales.
View.asMap Representa una PCollection como una Map. Usa esta vista cuando tu entrada lateral consiste de pares clave-valor (PCollection<K, V>) y tiene un solo valor para cada clave.
View.asMultimap Representa una PCollection como una MultiMap. Usa esta vista cuando tu entrada lateral consiste de pares clave-valor (PCollection<K, V>) y tiene múltiples valores para cada clave.

Nota: Al igual que otros datos de canalización, PCollectionView es inmutable una vez creado.

Pasa entradas laterales a ParDo

Java

Pasas las entradas laterales a tu transformación ParDo con la invocación a .withSideInputs. Dentro de tu DoFn, puedes acceder a la entrada lateral con el método DoFn.ProcessContext.sideInput.

En el código de ejemplo siguiente, se crea una entrada lateral singleton desde un PCollection<Integer> y la pasa a un ParDo posterior.

En el ejemplo, tenemos una PCollection<String> llamada words que representa una colección de palabras individuales y una PCollection<Integer> que representa la longitud de las palabras; podemos usar esta última para calcular un corte de longitud máxima de palabra como un valor de singleton y, luego, pasar ese valor computado como una entrada lateral a un ParDo que filtra words según el corte.

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

Entradas laterales y sistema de ventanas

Cuando creas una PCollectionView de un sistema de ventanas PCollection que puede ser infinita y, por lo tanto, no se puede comprimir en un solo valor (o clase de colección única), la PCollectionView representa una entidad única por ventana. Es decir, el PCollectionView representa un singleton por ventana, una lista por ventana, etcétera.

El flujo de datos usa las ventanas para que el elemento de entrada principal busque la ventana apropiada destinada al elemento de entrada lateral. Dataflow proyecta la ventana del elemento de entrada principal en el conjunto de ventanas de la entrada lateral y luego usa la entrada lateral de la ventana resultante. Si la entrada principal y las laterales tienen ventanas idénticas, la proyección proporciona la ventana correspondiente exacta; sin embargo, si las entradas tienen ventanas diferentes, Dataflow usa la proyección para elegir la ventana de entrada lateral más apropiada.

Java

Por ejemplo, si la entrada principal se visualiza mediante períodos de tiempo fijos de un minuto y la entrada lateral se muestra mediante períodos de tiempo fijos de una hora, Dataflow proyecta la ventana de entrada principal en el conjunto de ventanas de entrada lateral y selecciona el valor de la entrada lateral de la ventana de entrada lateral de una hora apropiada.

Si la entrada lateral tiene varios disparos de activación, Dataflow usa el valor del último disparo del activador. En particular, esto es útil si usas una entrada lateral con una única ventana global y especificas un activador.

Salidas laterales

Si bien ParDo siempre produce una salida principal PCollection (como el valor de retorno de apply), también puedes hacer que ParDo produzca cualquier cantidad de salidas adicionales PCollection. Si eliges tener varias salidas, tu ParDo mostrará todas las salidas de PCollection (incluida la salida principal) agrupadas. Por ejemplo, en Java, las salidas PCollection se agrupan en un PCollectionTuple de tipo seguro.

Etiquetas para salidas laterales

Java

Para emitir elementos a una salida lateral PCollection, deberás crear un objeto TupleTag a modo de identificar cada colección que produce ParDo. Por ejemplo, si tu ParDo produce tres salidas PCollection (la salida principal y dos laterales), deberás crear tres TupleTag asociadas.

En el código de ejemplo siguiente, se muestra cómo crear TupleTags para un ParDo con una salida principal y dos salidas laterales:

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

Pasa etiquetas de salida a ParDo

Una vez que especificaste las TupleTags para cada una de tus salidas ParDo, deberás pasar esas etiquetas a tu ParDo mediante la invocación .withOutputTags. Primero, se pasa la etiqueta para la salida principal y, luego, las etiquetas de cualquier salida lateral en una TupleTagList.

Sobre la base de nuestro ejemplo anterior, aquí se muestra cómo pasamos las tres TupleTags (una para la salida principal y dos destinados a las salidas laterales) a nuestro ParDo:

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

Ten en cuenta que todas las salidas (incluso la salida principal PCollection) se agrupan en la PCollectionTuple que se muestra, llamada results.

Emite salidas laterales en tu DoFn

Java

Dentro de tu DoFn de ParDo, puedes emitir un elemento a una salida lateral con el método ProcessContext.sideOutput. Deberás pasar el TupleTag apropiado para la colección de salida del lado de destino cuando llames a ProcessContext.sideOutput.

De nuestro ejemplo anterior, aquí está la emisión de DoFn a la salida principal y lateral:

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

Después de tu ParDo, deberás extraer las salidas principales y laterales resultantes PCollection de las PCollectionTuple que se muestran. Consulta la sección sobre PCollectionTuple para ver algunos ejemplos que muestran cómo extraer PCollections individuales desde una tupla.

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.