ParDo
es la principal operación de procesamiento paralelo en los SDK de Dataflow. Usas ParDo
para el procesamiento genérico en paralelo. El estilo de procesamiento ParDo
es similar a lo que sucede dentro de la clase “Mapper” de un algoritmo de estilo Map/Shuffle/Reduce: ParDo
toma cada elemento en una PCollection
de entrada, realiza alguna función de procesamiento sobre ese elemento y, luego, emite cero, uno o varios elementos en una PCollection
de salida.
Proporcionas la función que realiza ParDo
en cada uno de los elementos de la PCollection
de entrada. La función que proporcionas se invoca de forma independiente y, en paralelo, en varias instancias de trabajador en tu trabajo de Dataflow.
ParDo
es útil para una variedad de operaciones de procesamiento de datos, incluidas las siguientes:
- Filtrar un conjunto de datos. Puedes usar
ParDo
para considerar cada elemento de unaPCollection
y enviarlo a una colección nueva o descartarlo. - Dar formato al tipo de cada elemento en un conjunto de datos, o convertirlo. Puedes usar
ParDo
para dar formato a los elementos en tuPCollection
, como dar formato a pares clave-valor en strings imprimibles. - Extraer partes de cada elemento en un conjunto de datos. Puedes usar
ParDo
para extraer solo una parte de cada elemento en tuPCollection
. Esto puede ser útil en situaciones particulares a fin de extraer campos individuales desde las filas de una tabla de BigQuery. - Realizar procesamientos sobre cada elemento en un conjunto de datos. Puedes usar
ParDo
para realizar cálculos simples o complejos en cada elemento o en ciertos elementos de unaPCollection
.
ParDo
también es un paso intermedio común en una canalización. Por ejemplo, puedes usar ParDo
para asignar claves a cada elemento en una PCollection
y crear pares clave-valor. Luego, puedes agrupar los pares mediante una transformación GroupByKey
.
Aplica una transformación ParDo
Para usar ParDo
, aplícala a la PCollection
que deseas transformar y guarda el valor que se muestra como una PCollection
del tipo adecuado.
El argumento que proporcionas a ParDo
debe ser una subclase de un tipo específico proporcionado por el SDK de Dataflow, llamado DoFn
. Para obtener más información sobre DoFn
, consulta Crea y especifica una lógica de procesamiento más adelante en esta sección.
En la siguiente muestra de código, se observa un ParDo
básico aplicado a una PCollection
de strings, que pasa una función basada en DoFn
para calcular la longitud de cada string y entregar esos resultados a una PCollection
de números enteros.
Java
// The input PCollection of Strings. PCollection<String> words = ...; // The DoFn to perform on each element in the input PCollection. static class ComputeWordLengthFn extends DoFn<String, Integer> { ... } // Apply a ParDo to the PCollection "words" to compute lengths for each word. PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn())); // The DoFn to perform on each element, which // we define above.
En el ejemplo el código llama a apply
en la colección de entradas (llamadas “palabras”).
ParDo
es el argumento PTransform
. La operación .of
es en la que especificas el DoFn
que se realizará en cada elemento, llamado, en este caso, ComputeWordLengthFn()
.
Crea y especifica una lógica de procesamiento
La lógica de procesamiento que proporcionas para ParDo
debe ser de un tipo específico requerido por el SDK de Dataflow que usas a fin de crear tu canalización.
Java
Debes compilar una subclase de la clase DoFn
del SDK.
La función que proporcionas de forma independiente y en varias instancias de Google Compute Engine.
Además, tu DoFn
no debe depender de ningún estado persistente de invocación a invocación. Cualquier instancia de tu función de procesamiento en Cloud Platform podría no tener acceso a la información de estado en cualquier otra instancia de esa función.
Nota: El SDK de Dataflow proporciona una variante de ParDo
que puedes usar para pasar datos persistentes inmutables a cada invocación de tu código de usuario como una entrada complementaria.
Java
Un DoFn
procesa un elemento a la vez desde la PCollection
de entrada.
Cuando creas una subclase de DoFn
, especificas el tipo de elemento de entrada y los tipos de elementos de salida como parámetros de tipo. En la siguiente muestra de código, se observa cómo podríamos definir la función ComputeWordLengthFn()
del ejemplo anterior, que acepta una String
de entrada y produce un Integer
de salida:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
Tu subclase de DoFn
debe anular el método de procesamiento de elementos, processElement
, en el que proporcionas el código para trabajar con el elemento de entrada.
En el siguiente ejemplo de código, se muestra el ComputeWordLengthFn()
completo:
static class ComputeWordLengthFn extends DoFn<String, Integer> { @Override public void processElement(ProcessContext c) { String word = c.element(); c.output(word.length()); } }
No es necesario extraer de forma manual los elementos de la colección de entrada; el SDK de Dataflow para Java controla la extracción de cada elemento y su transferencia a tu subclase DoFn
.
Cuando anulas processElement
, el método de anulación debe aceptar un objeto de tipo ProcessContext
, lo que te permite acceder al elemento que deseas procesar. Accede al elemento que se pasa a tu DoFn
mediante el método ProcessContext.element()
.
Si los elementos en tu PCollection
son pares clave-valor, puedes acceder a la clave mediante ProcessContext.element().getKey()
y al valor mediante ProcessContext.element().getValue()
.
Java
El SDK de Dataflow para Java controla de forma automática la recopilación de los elementos de salida en una PCollection
de resultado. Usa el objeto ProcessContext
para enviar los elementos de los resultados desde processElement
hasta la colección de salida. A fin de generar un elemento para la colección de resultados, usa el método ProcessContext.output()
.
DoFns básicas
Los SDK de Dataflow proporcionan formas específicas del lenguaje para simplificar cómo proporcionas tu implementación de DoFn
.
Java
A menudo, puedes crear un argumento DoFn
simple para ParDo
como una instancia de clase interna anónima. Si tu DoFn
consta solo de unas líneas, puedes especificarlo de forma intercalada para que sea más claro. En la siguiente muestra de código, se observa cómo aplicar un ParDo
con la función ComputeWordLengthFn
como un DoFn
anónimo:
// The input PCollection. PCollection<String> words = ...; // Apply a ParDo with an anonymous DoFn to the PCollection words. // Save the result as the PCollection wordLengths. PCollection<Integer> wordLengths = words.apply( ParDo .named("ComputeWordLengths") // the transform name .of(new DoFn<String, Integer>() { // a DoFn as an anonymous inner class instance @Override public void processElement(ProcessContext c) { c.output(c.element().length()); } }));
Para transformaciones como la anterior que aplican una función a cada elemento en la entrada a fin de producir de manera exacta una salida por elemento, puedes usar la transformación de nivel superior MapElements
.
Esto es muy conciso en Java 8, ya que MapElements
acepta una función lambda.
// The input PCollection. PCollection<String> words = ...; // Apply a MapElements with an anonymous lambda function to the PCollection words. // Save the result as the PCollection wordLengths. PCollection<Integer> wordLengths = words.apply( MapElements.via((String word) -> word.length()) .withOutputType(new TypeDescriptor<Integer>() {});
De manera similar, puedes usar las funciones lambda de Java 8 con las transformaciones Filter
, FlatMapElements
y Partition
. Consulta Transformaciones predefinidas en el SDK de Dataflow para obtener información sobre estas transformaciones.
Nombres de transformación
Los nombres de transformación aparecen en el grafo de ejecución cuando visualizas tu canalización en la Interfaz de supervisión de Dataflow. Es muy importante especificar un nombre explícito para tu transformación a fin de reconocerlas en el gráfico.
Java
La operación .named
especifica el nombre de transformación para este paso en tu canalización. Los nombres de transformación aparecen en el gráfico de ejecución cuando visualizas tu canalización en la Interfaz de supervisión de Dataflow. Es muy importante especificar un nombre explícito cuando usas una instancia DoFn
anónima con ParDo
, para que puedas ver un nombre fácil de leer en la interfaz de supervisión.
Entradas complementarias
Además de la entrada principal PCollection
, puedes proporcionar entradas adicionales a una transformación ParDo
en forma de entradas complementarias. Una entrada complementaria es una entrada adicional a la que tu DoFn
puede acceder cada vez que procesa un elemento en la PCollection
de entrada. Cuando especificas una entrada complementaria, creas una vista de otros datos que se pueden leer desde el DoFn
de la transformación ParDo
mientras procesas cada elemento.
Las entradas complementarias son útiles si tu ParDo
necesita inyectar datos adicionales cuando procesa cada elemento en la PCollection
de entrada, pero los datos adicionales deben determinarse en el entorno de ejecución (y no deben estar hard-coded). Esos valores podrían estar determinados por los datos de entrada o podrían depender de una rama diferente de tu canalización. Por ejemplo, puedes obtener un valor de un servicio remoto mientras tu canalización está en ejecución y usarlo como una entrada complementaria. O bien, puedes usar un valor que calcula una rama separada de tu canalización y agregarlo como una entrada complementaria a la ParDo
de otra rama.
Representa una entrada complementaria
Java
Las entradas laterales siempre son de tipo PCollectionView.
PCollectionView
es una forma de representar una PCollection
como una entidad única, que luego puedes pasar como una entrada complementaria a un ParDo
. Puedes crear una PCollectionView
que exprese una PCollection
como uno de los siguientes tipos:
Tipo de vista | Uso |
---|---|
View.asSingleton |
Representa una PCollection como un valor individual. Por lo general, se usa después de combinar una PCollection mediante Combine.globally. Usa esto cuando tu entrada complementaria sea un solo valor procesado. Por lo general, debes crear una vista singleton mediante Combine.globally(...).asSingletonView() .
|
View.asList |
Representa una PCollection como una List . Usa esta vista cuando tu entrada complementaria sea una colección de valores individuales. |
View.asMap |
Representa una PCollection como un Map . Usa esta vista cuando tu entrada complementaria esté compuesta por pares clave-valor (PCollection<K, V> ) y tenga un solo valor para cada clave. |
View.asMultimap |
Representa una PCollection como un MultiMap . Usa esta vista cuando tu entrada complementaria esté compuesta por pares clave-valor (PCollection<K, V> ) y tenga varios valores para cada clave. |
Nota: Al igual que otros datos de canalización, PCollectionView
es inmutable una vez creada.
Pasa entradas complementarias a ParDo
Java
Pasas entradas complementarias a tu transformación ParDo
mediante una invocación a .withSideInputs
. Dentro de tu DoFn
, puedes acceder a la entrada complementaria mediante el método DoFn.ProcessContext.sideInput
.
En el siguiente código de ejemplo, se crea una entrada complementaria de singleton desde una PCollection<Integer>
y se pasa a un ParDo
posterior.
En el ejemplo tenemos una PCollection<String>
llamada words
que representa una colección de palabras individuales, y una PCollection<Integer>
que representa la longitud de las palabras; podemos usar la última para calcular un límite de longitud máxima de palabra y, luego, pasar ese valor calculado como una entrada complementaria a un ParDo
que filtre words
según el límite.
// The input PCollection to ParDo. PCollection<String> words = ...; // A PCollection of word lengths that we'll combine into a single value. PCollection<Integer> wordLengths = ...; // Singleton PCollection // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton. final PCollectionView<Integer> maxWordLengthCutOffView = wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView()); // Apply a ParDo that takes maxWordLengthCutOffView as a side input. PCollection<String> wordsBelowCutOff = words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); // In our DoFn, access the side input. int lengthCutOff = c.sideInput(maxWordLengthCutOffView); if (word.length() <= lengthCutOff) { c.output(word); } }})); }
Entradas laterales y sistema de ventanas
Cuando creas una PCollectionView
de una PCollection
con ventanas, que puede ser infinita y, por lo tanto, no se puede comprimir en un solo valor (o clase de colección única), la PCollectionView
representa una sola entidad por ventana. Es decir, la PCollectionView
representa un singleton por ventana, una lista por ventana, etcétera.
Dataflow usa las ventanas a fin de que el elemento de entrada principal busque la ventana apropiada para el elemento de entrada complementaria. Dataflow proyecta la ventana del elemento de entrada principal en el conjunto de ventanas de la entrada complementaria y luego usa la entrada complementaria de la ventana resultante. Si la entrada principal y las complementarias tienen ventanas idénticas, la proyección proporciona la ventana correspondiente exacta; sin embargo, si las entradas tienen ventanas diferentes, Dataflow usa la proyección para elegir la ventana de entrada lateral más apropiada.
Java
Por ejemplo, si la entrada principal se organiza mediante ventanas fijas de un minuto y la entrada complementaria se organiza mediante ventanas fijas de una hora, Dataflow proyecta la ventana de entrada principal en el conjunto de ventanas de la entrada complementaria y selecciona el valor de la entrada complementaria de la ventana de entrada complementaria de una hora apropiada.
Si la entrada complementaria tiene varios disparos de activación, Dataflow usa el valor del último disparo de activación. En particular, esto es útil si usas una entrada complementaria con una única ventana global y especificas un activador.
Salidas laterales
Mientras que ParDo
siempre produce una PCollection
de salida principal (como el valor de retorno de apply
), también puedes hacer que tu ParDo
produzca cualquier cantidad de PCollection
de salida adicionales. Si eliges tener varias salidas, tu ParDo
mostrará todas las PCollection
de salida (incluida la salida principal) agrupadas. Por ejemplo, en Java, las PCollection
de salida se agrupan en una PCollectionTuple de tipo seguro.
Etiquetas para salidas complementarias
Java
A fin de emitir elementos a una PCollection
de salida complementaria, deberás crear un objeto TupleTag
para identificar cada colección que produce tu ParDo
. Por ejemplo, si tu ParDo
produce tres PCollection
de salida (la salida principal y dos salidas complementarias), deberás crear tres TupleTag
asociadas.
En el siguiente código de ejemplo, se muestra cómo crear TupleTag
para un ParDo
con una salida principal y dos salidas complementarias:
// Input PCollection to our ParDo. PCollection<String> words = ...; // The ParDo will filter words whose length is below a cutoff and add them to // the main output PCollection<String>. // If a word is above the cutoff, the ParDo will add the word length to a side output // PCollection<Integer>. // If a word starts with the string "MARKER", the ParDo will add that word to a different // side output PCollection<String>. final int wordLengthCutOff = 10; // Create the TupleTags for the main and side outputs. // Main output. final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; // Word lengths side output. final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; // "MARKER" words side output. final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
Pasa etiquetas de salida a ParDo
Una vez que especificaste las TupleTag
para cada una de tus ParDo
de salida, deberás pasar esas etiquetas a tu ParDo
mediante una invocación a .withOutputTags
. Primero, pasas la etiqueta para la salida principal y, luego, las etiquetas de cualquier salida complementaria en una TupleTagList
.
A partir de nuestro ejemplo anterior, aquí se muestra cómo pasamos las tres TupleTag
(una para la salida principal y dos destinadas a las salidas complementarias) a nuestro ParDo
:
PCollectionTuple results = words.apply( ParDo // Specify the tag for the main output, wordsBelowCutoffTag. .withOutputTags(wordsBelowCutOffTag, // Specify the tags for the two side outputs as a TupleTagList. TupleTagList.of(wordLengthsAboveCutOffTag) .and(markedWordsTag)) .of(new DoFn<String, String>() { // DoFn continues here. ... }
Ten en cuenta que todas las salidas (incluida la principal PCollection
) se agrupan en la PCollectionTuple
que se muestra llamada results.
.
Emite salidas complementarias en tu DoFn
Java
Dentro de tu DoFn
de ParDo
, puedes emitir un elemento a una salida complementaria mediante el método ProcessContext.sideOutput
. Deberás pasar la TupleTag
adecuada para la colección de salidas complementarias de destino cuando llames a ProcessContext.sideOutput
.
Como se muestra en el ejemplo anterior, la emisión de DoFn
en las salidas principales y complementarias es la siguiente:
.of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); if (word.length() <= wordLengthCutOff) { // Emit this short word to the main output. c.output(word); } else { // Emit this long word's length to a side output. c.sideOutput(wordLengthsAboveCutOffTag, word.length()); } if (word.startsWith("MARKER")) { // Emit this word to a different side output. c.sideOutput(markedWordsTag, word); } }}));
Después de tu ParDo
, deberás extraer las PCollection
de salida complementaria y principal resultantes de la PCollectionTuple
que se muestra. Consulta la sección sobre PCollectionTuple para ver algunos ejemplos en los que se muestra cómo extraer PCollection
individuales de una tupla.