Si aún no lo hiciste, realiza los pasos en la Guía de inicio rápido antes de continuar.
En los ejemplos de WordCount, se muestra cómo configurar una canalización de procesamiento que puede leer texto, dividir las líneas de texto en tokens que sean palabras individuales y realizar un recuento de la frecuencia de cada una de esas palabras. Los SDK de Dataflow contienen una serie de estos cuatro ejemplos de WordCount más detallados de forma sucesiva que se compilan uno sobre otro. El texto de entrada para todos los ejemplos es una colección de las obras de Shakespeare.
Cada ejemplo de WordCount presenta conceptos diferentes en los SDK de Dataflow.
- Minimal WordCount muestra los principios básicos involucrados en la compilación de una canalización de Dataflow.
- WordCount presenta algunas de las recomendaciones más comunes de la creación de canalizaciones que se pueden mantener y volver a usar.
- Debugging WordCount presenta prácticas de registros y depuración.
- Windowed WordCount muestra cómo puedes usar el modelo de programación de Dataflow para controlar los conjuntos de datos delimitados y no delimitados.
Primero, comprende Minimal WordCount, el ejemplo más simple. Una vez que te sientas cómodo con los principios básicos de la compilación de canalizaciones, continúa a fin de aprender las prácticas recomendadas para escribir programas de Dataflow en WordCount. Luego, lee Debugging WordCount a fin de comprender cómo usar las prácticas comunes para los registros y la depuración. Por último, aprende a usar el mismo patrón de procesamiento en conjuntos de datos infinitos y finitos en Windowed WordCount.
MinimalWordCount
Minimal WordCount muestra una canalización simple que puede leer un bloque de texto desde un archivo en Google Cloud Storage, aplicar transformaciones para dividir en tokens y contar las palabras, y escribir los datos en un archivo de salida en un depósito de Cloud Storage. En este ejemplo, las ubicaciones para los archivos de entrada y salida se integran como parte del código y no se realiza ninguna verificación de errores; se intenta mostrar solo los aspectos básicos de la creación de una canalización de Dataflow. En ejemplos posteriores, parametrizaremos las fuentes de entrada y salida de la canalización y mostraremos otras recomendaciones.
Java
- Crea la canalización
- Aplica transformaciones a la canalización:
- Lee datos de entrada (en este ejemplo: lee archivos de texto)
- Aplica transformaciones
ParDo
- Aplica las transformaciones que proporcionan los SDK (en este ejemplo:
Count
) - Escribe datos de salida (en este ejemplo: escribe en Google Cloud Storage)
- Ejecuta la canalización.
En las secciones siguientes, se explican estos conceptos en detalle junto con extractos del código correspondiente de la canalización de Minimal WordCount.
Crea la canalización
El primer paso para crear una canalización de Cloud Dataflow es crear un objeto de Opciones de canalización. Este objeto permite configurar varias opciones para la canalización, como el ejecutor de canalizaciones que ejecutará la canalización, el ID del proyecto y la ubicación de etapa de pruebas a fin de que la canalización almacene los archivos (usados para hacer que tus archivos jar sean accesibles en la nube). En este ejemplo, se configuran estas opciones de manera programática, pero es más frecuente usar los argumentos de la línea de comandos para establecer las opciones de canalización.
En nuestro ejemplo, especificamos BlockingDataflowPipelineRunner
como PipelineRunner
, para que nuestra canalización se ejecute en la nube con el servicio de Google Cloud Dataflow. Existen otras opciones que puedes configurar si quieres ejecutar tu canalización en la nube. También puedes omitir esta opción por completo, en cuyo caso, el ejecutor predeterminado ejecutará tu canalización de manera local. Esto se muestra en los siguientes dos ejemplos de WordCount y se explica con más detalle en Especifica parámetros de ejecución.
Java
DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(BlockingDataflowPipelineRunner.class); options.setProject("SET-YOUR-PROJECT-ID-HERE"); // The 'gs' URI means that this is a Google Cloud Storage path options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");
El siguiente paso es crear un objeto Pipeline
con las opciones que acabamos de crear. El objeto Pipeline
compila el grafo de las transformaciones que se ejecutarán, asociadas con esa canalización específica.
Java
Pipeline p = Pipeline.create(options);
Consulta Canalizaciones para obtener una explicación detallada del objeto de canalizaciones y su funcionamiento.
Aplica transformaciones de canalizaciones
La canalización de Minimal WordCount contiene transformaciones para leer datos en la canalización, manipularlos o transformarlos y escribir los resultados. Cada transformación representa una operación en la canalización.
Cada transformación toma algún tipo de entrada (datos o de otro tipo) y produce datos de salida.
La clase del SDK de PCollection
representa los datos de entrada y salida.
PCollection es una clase especial que proporciona el SDK de Dataflow que puedes usar para representar un conjunto de datos de casi cualquier tamaño, incluidos los conjuntos de datos infinitos.
En la Figura 1, se muestra el flujo de datos de la canalización:

La canalización de Minimal WordCount contiene cinco transformaciones:
- Se aplica una transformación Read del archivo de texto al objeto
Pipeline
y se produce unaPCollection
como salida. Cada elemento de la salida dePCollection
representa una línea de texto del archivo de entrada. - Es una transformación ParDo que invoca un
DoFn
(se define entre líneas como una clase anónima) en cada elemento que divide las líneas de texto en tokens que sean palabras individuales. La entrada para esta transformación es elPCollection
de las líneas de texto que generó la transformaciónTextIO.Read
anterior. La transformaciónParDo
genera unPCollection
nuevo, en el que cada elemento representa una palabra individual en el texto. - La transformación
Count
que proporcionan los SDK es una transformación genérica que toma unaPCollection
de cualquier tipo y muestra un par clave-valor dePCollection
. Cada clave representa un elemento único de la colección de entrada y cada valor representa el número de veces que esa clave apareció en la colección de entrada.
En esta canalización, la entrada paraCount
es laPCollection
de las palabras individuales que generó elParDo
anterior y la salida es unaPCollection
de pares clave-valor en la que cada clave representa una palabra única en el texto y el valor asociado es el recuento de caso para cada una. - La siguiente es una transformación que le da formato a cada uno de los pares clave/valor de las palabras únicas y los recuentos de casos en una string que se puede imprimir y es adecuada para escribir en un archivo de salida.
- Un archivo de texto de Escritura. Esta transformación toma la última
PCollection
de lasString
con formato como entrada y escribe cada elemento en un archivo de texto de salida. Cada elemento de laPCollection
de entrada representa una línea de texto en el archivo de salida resultante.
Java
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
Java
Puedes darle a tu transformación un nombre que aparecerá en la Interfaz de supervisión de Dataflow, mediante la operación .named()
como se hizo en este ejemplo. Cuando el servicio de Dataflow ejecuta tu canalización, la interfaz de supervisión indicará cuándo se encuentra en ejecución cada transformación ParDo
.
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); } } } }))
Java
.apply(Count.<String>perElement())
Java
MapElements
es una transformación compuesta por el nivel superior que encapsula un ParDo simple. Para cada elemento en la entrada PCollection
, MapElements
aplica una función que produce exactamente un elemento de salida. Este MapElements
invoca un SimpleFunction
(definido en línea como una clase anónima) que aplica el formato. Como entrada, este MapElements
toma un PCollection
de par clave-valor que genera Count
y produce un nuevo PCollection
de strings que se pueden imprimir.
.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> element) { return element.getKey() + ": " + element.getValue(); } }))
Java
.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
Ten en cuenta que la transformación Write
produce un valor de resultado trivial de tipo PDone
, que en este caso se ignora.
Ejecuta la canalización
Ejecuta la canalización con llamadas al método run
, que envía tu canalización para que la ejecute el ejecutor de canalizaciones que especificaste cuando creaste tu canalización.
Java
p.run();
Ejemplo de WordCount
En este ejemplo de WordCount, se presentan algunas prácticas de programación recomendadas que pueden hacer que tu canalización sea más fácil de leer, escribir o mantener. Si bien no se requieren de forma explícita, pueden hacer que la ejecución de tu canalización sea más flexible, ayudar a probar tu canalización y hacer que el código de tu canalización se pueda volver a usar.
En esta sección, se supone que conoces los conceptos básicos de la compilación de una canalización. Si sientes que aún no los conoces, lee la sección anterior Minimal WordCount.
Java
- Aplica
ParDo
con unDoFn
explícito - Crea transformaciones compuestas
- Usa
PipelineOptions
con parámetros
En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.
Especifica DoFns explícitos
Cuando se usan transformaciones ParDo
, es necesario especificar la operación de procesamiento que se aplica a cada elemento en el PCollection
de entrada. Esta operación de procesamiento es una subclase de la clase SDK de DoFn
. La canalización de ejemplo en la sección anterior (Minimal WordCount) crea las subclases DoFn
para cada ParDo
entre líneas, como una instancia de clase interna anónima.
Sin embargo, suele ser una buena idea definir DoFn
a nivel global, lo que facilita la prueba de unidades y puede hacer que el ParDo
sea más legible.
Como se mencionó en el ejemplo anterior (Minimal WordCount), cuando ejecutas tu canalización, la interfaz de supervisión de Dataflow indica cuándo se encuentra en ejecución cada transformación ParDo
. El servicio de Dataflow genera automáticamente nombres de transformación para las transformaciones ParDo
a partir del nombre del DoFn
que pasas. Por ejemplo, el ParDo
que aplica el FormatAsTextFn()
aparece en la interfaz de supervisión como ParDo(FormatAsText)
.
Java
En este ejemplo, los DoFn
s se definen como clases estáticas:
/** A DoFn that converts a Word and Count into a printable string. */ static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { ... @Override public void processElement(ProcessContext c) { ... } } public static void main(String[] args) throws IOException { Pipeline p = ... // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform. p.apply(...) .apply(...) .apply(ParDo.of(new FormatAsTextFn())) ... }
Consulta Procesamiento paralelo con ParDo para obtener un análisis detallado sobre cómo crear y especificar subclases de DoFn
en tus transformaciones ParDo
.
Crea transformaciones compuestas
Si tienes una operación de procesamiento que consta de varias transformaciones o pasos de ParDo
, puedes crearla como una subclase de PTransform
. Crear una subclase de PTransform
te permite crear transformaciones complejas que se pueden volver a usar, hacer que la estructura de tu canalización sea más clara y modular, y facilitar la prueba de unidades.
Si haces que la estructura lógica de tu canalización sea explícita, con las subclases de PTransform
, también se puede hacer que la supervisión de tu canalización sea más fácil. Cuando el servicio de Dataflow compila la estructura final y optimizada de tu canalización, la interfaz de supervisión de Dataflow usará las transformaciones que compilaste para reflejar con mayor precisión la estructura de tu canalización.
Java
En este ejemplo, dos transformaciones se encapsulan como la subclase de PTransform
de CountWords
. CountWords
contiene el ParDo
que ejecuta ExtractWordsFn
y la transformación Count
que proporcionan los SDK.
Cuando se define CountWords
, especificamos su entrada y salida finales; la entrada es PCollection<String>
para la operación de extracción, y la salida es PCollection<KV<String, Long>>
que produce la operación de recuento.
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } public static void main(String[] args) throws IOException { Pipeline p = ... p.apply(...) .apply(new CountWords()) ... }
Usa PipelineOptions
con parámetros
En el ejemplo anterior, Minimal WordCount, se establecieron varias opciones de ejecución cuando se creó la canalización. En este ejemplo, se definen las opciones de configuración personalizadas propias con la extensión de PipelineOptions
.
Puedes agregar tus propios argumentos si quieres que el analizador de línea de comandos los procese y si quieres especificar valores predeterminados para ellos. Entonces, puedes acceder a los valores de las opciones en tu código de canalización.
En el ejemplo de Minimal WordCount, las opciones de canalización se integraron como parte del código. Sin embargo, la forma más común de crear PipelineOptions
es a través del análisis de argumentos de la línea de comandos.
Java
public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); ... } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); ... }
Ejemplo de depuración de WordCount
En el ejemplo de Debugging WordCount, se muestran algunas prácticas recomendadas para instrumentar tu código de canalización. Puedes usar la interfaz de supervisión de Dataflow y los agregadores para obtener visibilidad adicional en tu canalización a medida que se ejecuta.
Java
Puedes usar DataflowAssert del SDK para probar la salida de tus transformaciones en las diferentes etapas de la canalización.
Java
- Visualiza registros en la interfaz de supervisión de Dataflow
- Controla los niveles de registro del trabajador de Dataflow
- Crea
Aggregators
- Prueba tu canalización mediante
DataflowAssert
En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.
Visualiza registros en la interfaz de supervisión de Dataflow
Google Cloud Logging agrupa los registros de todos los trabajadores de tu trabajo de Dataflow en una sola ubicación en Google Cloud Console. Puedes usar la interfaz de supervisión de Dataflow si quieres ver los registros desde todas las instancias de Compute Engine que Dataflow inició para completar tu trabajo de Dataflow. Puedes agregar instrucciones de registro en las instancias de DoFn
de tu canalización que aparecerán en la interfaz de supervisión a medida que se ejecute tu canalización.
Java
El siguiente registrador SLF4J usa el nombre de la clase de FilterTextFn
completamente calificado como nombre del registrador. Este nombre hará referencia a todas las instrucciones de registro que emita el registrador, que serán visibles en la interfaz de supervisión de Dataflow, si hay una configuración de nivel de registro apropiada.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DebuggingWordCount { public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); ... public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI // only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); ... } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. LOG.trace("Did not match: " + c.element().getKey()); ... } } } }
Controla los niveles de registro del trabajador de Dataflow
Java
Los trabajadores de Dataflow que ejecutan el código de usuario están configurados para acceder a Cloud Logging de forma predeterminada a nivel de registro INFO y superior. Puedes anular los niveles de registro para los espacios de nombres de registro determinados si especificas lo siguiente:
--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}
Por ejemplo, si especificas lo siguiente
--workerLogLevelOverrides={"com.example":"DEBUG"}
cuando ejecutas esta canalización con el servicio de Dataflow, la Interfaz de supervisión solo contendrá DEBUG o registros de nivel superior para el paquete com.example
, además de los registros de INFO predeterminados o de nivel superior.
Además, puedes anular la configuración de registro de trabajadores de Dataflow predeterminada si especificas lo siguiente:
--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>
Por ejemplo, si especificas lo siguiente
--defaultWorkerLogLevel=DEBUG
cuando ejecutas esta canalización con el servicio de Dataflow, la interfaz de supervisión tendrá todos los registros de DEBUG o nivel o superior. Ten en cuenta que si cambias el nivel de registro del trabajador predeterminado a TRACE o DEBUG, aumentará significativamente la cantidad de información de registro.
Obtén más información sobre registros en Cloud Dataflow.
Crea agregadores
Un agregador personalizado puede realizar un seguimiento de los valores en tu canalización a medida que se ejecuta. Esos valores se mostrarán en la interfaz de supervisión de Dataflow cuando la canalización se ejecute con el servicio de Dataflow.
Es posible que los agregadores no se puedan ver hasta que el sistema comience a ejecutar la transformación ParDo
que los creó o hasta que se cambie su valor inicial. Entonces, se pueden ver en la interfaz de supervisión en la parte inferior del Resumen del trabajo.
Los agregadores personalizados que se encuentran a continuación realizan un seguimiento de la cantidad de palabras coincidentes y no coincidentes.
Java
public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); private final Aggregator<Long, Long> unmatchedWords = createAggregator("umatchedWords", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { ... matchedWords.addValue(1L); ... } else { ... unmatchedWords.addValue(1L); } } } }
Agregadores en canalizaciones por lotes y de transmisión
Los agregadores en las canalizaciones por lotes proporcionan coherencia. Se confirman exactamente una vez en los paquetes exitosos y no se confirman en los paquetes con fallas.
En las canalizaciones de transmisión, los agregadores proporcionan una semántica más tolerante. La contribución desde paquetes correctos es el mejor esfuerzo y los paquetes con fallas pueden reflejarse en el valor final.
Prueba tu canalización mediante DataflowAssert
Java
DataflowAssert es un conjunto de PTransforms
conveniente del estilo de buscadores de coincidencias en colección de Hamcrest que se puede usar cuando se escriben pruebas de nivel de canalización para validar el contenido de PCollections
. DataflowAssert
se usa mejor en pruebas de unidades con conjuntos de datos pequeños, pero aquí se muestra como una herramienta de enseñanza.
A continuación, se verifica que el conjunto de palabras filtradas coincida con los recuentos esperados. Ten en cuenta que DataflowAssert
no proporciona ningún resultado y que la finalización correcta de la canalización implica que se cumplieron las expectativas. Obtén más información sobre cómo puedes probar tu canalización y consulta DebuggingWordCountTest para obtener una prueba de unidades de ejemplo.
public static void main(String[] args) { ... List<KV<String, Long>> expectedResults = Arrays.asList( KV.of("Flourish", 3L), KV.of("stomach", 1L)); DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); ... }
WindowedWordCount
Java
En este ejemplo, WindowedWordCount
, se cuentan las palabras en los textos como se hizo en los ejemplos anteriores, pero se presentan varios conceptos avanzados. La entrada para WindowedWordCount
puede ser un conjunto de datos fijo (como en los ejemplos anteriores) o un flujo de datos no delimitado.
El SDK de Dataflow es conveniente porque te permite crear una sola canalización que puede controlar tipos de entrada delimitados o no delimitados. Si la entrada no está delimitada, todos los PCollections
de la canalización no estarán delimitados. Lo mismo ocurre con la entrada delimitada.
Antes de leer esta sección, asegúrate de que conoces los principios básicos de la creación de una canalización.
Conceptos nuevos:- Lee datos de entrada delimitados y no delimitados
- Agrega marcas de tiempo a los datos
- Sistema de ventanas
- Escribe datos de salida delimitados y no delimitados
En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.
Lee datos de entrada delimitados y no delimitados
La entrada para WindowedWordCount
puede ser delimitada o no. Si tu entrada tiene una cantidad fija de elementos, se considera un conjunto de datos “delimitado”. Si tu entrada se actualiza de forma constante, entonces se considera “no delimitada”. Consulta las clases PCollection delimitadas y no delimitadas para obtener más información acerca de los tipos de entrada.
En este ejemplo, puedes elegir si la entrada será delimitada o no. Recuerda que la entrada para todos los ejemplos es un conjunto de textos de Shakespeare, una entrada finita y delimitada. Sin embargo, con el propósito de explicar los nuevos conceptos en este ejemplo, la entrada es una reproducción de los textos de Shakespeare.
En este ejemplo, si la entrada es delimitada, esta se leerá desde un tema de Pub/Sub de Google Cloud. En ese caso, la transformación Read
aplicada a la canalización es una PubSubIO.Read
. De lo contrario, la entrada se leerá desde Google Cloud Storage.
public static void main(String[] args) throws IOException { ... PCollection<String> input; if (options.isUnbounded()) { LOG.info("Reading from PubSub."); // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg. input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic())); } else { // Else, this is a bounded pipeline. Read from the Google Cloud Storage file. input = pipeline.apply(TextIO.Read.from(options.getInputFile())) ... } ... }
Agrega marcas de tiempo a los datos
Cada elemento en una PCollection
tiene una marca de tiempo asociada. La fuente que crea la PCollection
asigna la marca de tiempo para cada elemento. En este ejemplo, si eliges una entrada no delimitada para tu canalización, las marcas de tiempo provendrán de la fuente de datos de Pub/Sub. Si eliges una entrada delimitada, el método DoFn
llamado AddTimestampsFn
(que invoca ParDo
) establecerá una marca de tiempo para cada elemento en la PCollection
.
public static void main(String[] args) throws IOException { ... input = pipeline .apply(...) // Add an element timestamp, using an artificial time. .apply(ParDo.of(new AddTimestampFn())); }
A continuación, se muestra el código para AddTimestampsFn
, un DoFn
que invoca ParDo
, que establece el elemento de datos de la marca de tiempo de acuerdo con el elemento en sí. Por ejemplo, si los elementos eran líneas de registro, este ParDo
podría analizar el agotamiento del tiempo de la string de registro y establecerla como la marca de tiempo del elemento. No hay marcas de tiempo inherentes a las obras de Shakespeare, por lo que, en este caso, se crearon marcas de tiempo aleatorias solo para ilustrar el concepto. Cada línea del texto de entrada obtendrá una marca de tiempo asociada aleatoria en algún momento de un período de 2 horas.
static class AddTimestampFn extends DoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; AddTimestampFn() { this.minTimestamp = new Instant(System.currentTimeMillis()); } @Override public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past 2 hours. long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); Instant randomTimestamp = minTimestamp.plus(randMillis); // Set the data element with that timestamp. c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); } }
Puedes obtener más información sobre las marcas de tiempo en marcas de tiempo de elementos de PCollection.
Sistema de ventanas
El SDK de Dataflow usa un concepto llamado sistema de ventanas para subdividir una PCollection
de acuerdo con las marcas de tiempo de sus elementos individuales.
Las transformaciones de Dataflow que agregan varios elementos procesan cada PCollection
como una sucesión de ventanas múltiples y finitas, aunque toda la colección puede ser de tamaño infinito (no delimitada).
En el ejemplo de WindowingWordCount
, se aplica un sistema de ventanas fijo, en el que cada ventana representa un intervalo de tiempo fijo. El tamaño del sistema de ventanas fijo para este ejemplo se establece de manera predeterminada en 1 minuto (puedes cambiarlo con una opción de línea de comandos). Entonces, la canalización aplica la transformación CountWords
.
PCollection<KV<String, Long>> wordCounts = input .apply(Window.<String>into( FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) .apply(new WordCount.CountWords());
Escribe datos de salida delimitados y no delimitados
Como la entrada puede ser delimitada o no, lo mismo ocurre con la salida PCollection
. Es necesario asegurarse de elegir un receptor adecuado. Algunos receptores de salida solo admiten una salida delimitada o solo una salida no delimitada. Por ejemplo, un archivo de texto es un receptor que solo puede recibir datos delimitados. La fuente de salida de BigQuery admite una entrada delimitada y no delimitada.
En este ejemplo, transmitimos los resultados a una tabla de BigQuery. Se les da el formato para una tabla de BigQuery y, luego, se los escribe en BigQuery mediante BigQueryIO.Write
.
wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));