Si aún no lo hiciste, realiza los pasos en la Guía de inicio rápido antes de continuar.
En los ejemplos de WordCount, se muestra cómo configurar una canalización de procesamiento que puede leer texto, dividir las líneas de texto en tokens que sean palabras individuales y realizar un recuento de la frecuencia de cada una de esas palabras. Los SDK de Dataflow contienen una serie de estos cuatro ejemplos de WordCount más detallados de forma sucesiva que se compilan uno sobre otro. El texto de entrada para todos los ejemplos es una colección de las obras de Shakespeare.
Cada ejemplo de WordCount presenta conceptos diferentes en los SDK de Dataflow.
- Minimal WordCount muestra los principios básicos involucrados en la compilación de una canalización de Dataflow.
- WordCount presenta algunas de las recomendaciones más comunes de la creación de canalizaciones que se pueden mantener y volver a usar.
- Debugging WordCount presenta prácticas de registros y depuración.
- Windowed WordCount muestra cómo puedes usar el modelo de programación de Dataflow para controlar los conjuntos de datos delimitados y no delimitados.
Primero, comprende Minimal WordCount, el ejemplo más simple. Una vez que te sientas cómodo con los principios básicos de la compilación de canalizaciones, continúa a fin de aprender las recomendaciones para escribir programas de Dataflow en WordCount. Luego, lee Debugging WordCount si quieres comprender cómo usar las prácticas comunes para los registros y la depuración. Por último, aprende a usar el mismo patrón de procesamiento en conjuntos de datos infinitos y finitos en Windowed WordCount.
MinimalWordCount
Minimal WordCount muestra una canalización simple que puede leer un bloque de texto desde un archivo en Google Cloud Storage, aplicar transformaciones para dividir en tokens y contar las palabras, y escribir los datos en un archivo de salida en un depósito de Cloud Storage. En este ejemplo, las ubicaciones para los archivos de entrada y salida se integran como parte del código y no se realiza ninguna verificación de errores; se intenta mostrar solo los aspectos básicos de la creación de una canalización de Dataflow. En ejemplos posteriores, parametrizaremos las fuentes de entrada y salida de la canalización y mostraremos otras recomendaciones.
Java
- Crea la canalización.
- Aplica transformaciones a la canalización:
- Lee datos de entrada (en este ejemplo: lee archivos de texto).
- Aplica transformaciones de
ParDo
. - Aplica las transformaciones que proporcionan los SDK (en este ejemplo:
Count
). - Escribe datos de salida (en este ejemplo: escribe en Google Cloud Storage).
- Ejecuta la canalización.
En las secciones siguientes, se explican estos conceptos en detalle junto con extractos del código correspondiente de la canalización de Minimal WordCount.
Crea la canalización
El primer paso para crear una canalización de Cloud Dataflow es crear un objeto de Opciones de canalización. Este objeto permite configurar varias opciones para la canalización, como el ejecutor de canalizaciones que ejecutará la canalización, el ID del proyecto y la ubicación de etapa de pruebas a fin de que la canalización almacene los archivos (usados para hacer que tus archivos jar sean accesibles en la nube). En este ejemplo, se configuran estas opciones de manera programática, pero es más frecuente usar los argumentos de la línea de comandos para establecer las opciones de canalización.
En el ejemplo, se especifica BlockingDataflowPipelineRunner
como el PipelineRunner
, para que se ejecute la canalización en la nube con el servicio de Google Cloud Dataflow. Hay otras opciones que puedes configurar si quieres ejecutar tu canalización en la nube. También puedes omitir esta opción por completo, en cuyo caso, el ejecutor predeterminado ejecutará tu canalización de manera local. Esto se muestra en los siguientes ejemplos de WordCount y se explica con más detalle en Especifica parámetros de ejecución.
Java
DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(BlockingDataflowPipelineRunner.class); options.setProject("SET-YOUR-PROJECT-ID-HERE"); // The 'gs' URI means that this is a Google Cloud Storage path options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");
El siguiente paso es crear un objeto de Pipeline
con las opciones que se acaban de crear. El objeto de Pipeline
crea el grafo de las transformaciones que se ejecutarán, asociadas a esa canalización en particular.
Java
Pipeline p = Pipeline.create(options);
Consulta Canalizaciones para obtener una explicación detallada del objeto de canalizaciones y su funcionamiento.
Aplica transformaciones de canalizaciones
La canalización de Minimal WordCount contiene transformaciones para leer datos en la canalización, manipularlos o transformarlos y escribir los resultados. Cada transformación representa una operación en la canalización.
Cada transformación toma algún tipo de entrada (datos o de otro tipo) y produce datos de salida.
La clase del SDK de PCollection
representa los datos de entrada y salida.
PCollection es una clase especial que proporciona el SDK de Dataflow que puedes usar para representar un conjunto de datos de casi cualquier tamaño, incluidos los conjuntos de datos infinitos.
En la Figura 1, se muestra el flujo de datos de la canalización:

La canalización de Minimal WordCount contiene cinco transformaciones:
- Se aplica una transformación de Lectura del archivo de texto al objeto de
Pipeline
y se produce unaPCollection
como resultado. Cada elemento de la salida dePCollection
representa una línea de texto del archivo de entrada. - Una transformación de ParDo que invoca un
DoFn
(se define entre líneas como una clase anónima) en cada elemento que divide las líneas de texto en tokens que sean palabras individuales. La entrada para esta transformación es laPCollection
de las líneas de texto que generó la transformaciónTextIO.Read
anterior. La transformación deParDo
da como resultado unaPCollection
nueva, en la que cada elemento representa una palabra individual en el texto. - La transformación de
Count
que proporcionan los SDK es una transformación genérica que toma unaPCollection
de cualquier tipo y muestra un par clave-valor dePCollection
. Cada clave representa un elemento único de la recopilación de entrada y cada valor representa el número de veces que esa clave apareció en la recopilación de entrada.
En esta canalización, la entrada paraCount
es laPCollection
de las palabras individuales que generó elParDo
anterior y la salida es unaPCollection
de pares clave-valor en la que cada clave representa una palabra única en el texto y el valor asociado es el recuento de caso para cada una. - La siguiente es una transformación que le da formato a cada uno de los pares clave/valor de las palabras únicas y los recuentos de casos en una string que se puede imprimir y es adecuada para escribir en un archivo de salida.
- Un archivo de texto de Escritura. Esta transformación toma la última
PCollection
de lasString
con formato como entrada y escribe cada elemento en un archivo de texto de salida. Cada elemento de laPCollection
de entrada representa una línea de texto en el archivo de salida resultante.
Java
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
Java
Puedes darle a tu transformación un nombre que aparecerá en la Interfaz de supervisión de Dataflow, mediante la operación .named()
como se hizo en este ejemplo. Cuando el servicio de Dataflow ejecuta tu canalización, la interfaz de supervisión indicará cuándo se encuentra en ejecución cada transformación de ParDo
.
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); } } } }))
Java
.apply(Count.<String>perElement())
Java
MapElements
es una transformación compuesta de nivel superior que encapsula un ParDo simple. Para cada elemento en la PCollection
de entrada, MapElements
aplica una función que produce exactamente un elemento de salida. Este MapElements
invoca una SimpleFunction
(definida entre líneas como una clase anónima) que le da el formato. Como entrada, este MapElements
toma un PCollection
de pares clave-valor que generó Count
y produce una PCollection
nueva de strings que se pueden imprimir.
.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> element) { return element.getKey() + ": " + element.getValue(); } }))
Java
.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
Ten en cuenta que la transformación de Write
produce un valor de resultado trivial de tipo PDone
, que en este caso se ignora.
Ejecuta la canalización
Ejecuta la canalización con llamadas al método run
, que envía tu canalización para que la ejecute el ejecutor de canalizaciones que especificaste cuando creaste tu canalización.
Java
p.run();
Ejemplo de WordCount
En este ejemplo de WordCount, se presentan algunas prácticas de programación recomendadas que pueden hacer que tu canalización sea más fácil de leer, escribir o mantener. Si bien no se requieren de forma explícita, pueden hacer que la ejecución de tu canalización sea más flexible, ayudar a probar tu canalización y hacer que el código de tu canalización se pueda volver a usar.
En esta sección, se supone que conoces los conceptos básicos de la compilación de una canalización. Si sientes que aún no los conoces, lee la sección anterior Minimal WordCount.
Java
- Aplica
ParDo
con unDoFn
explícito. - Crea transformaciones compuestas.
- Usa
PipelineOptions
parametrizables.
En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.
Especifica DoFns explícitos
Cuando usas transformaciones de ParDo
, debes especificar la operación de procesamiento que se aplica a cada elemento en el PCollection
de entrada. Esta operación de procesamiento es una subclase de la clase del SDK de DoFn
. La canalización de ejemplo en la sección anterior (Minimal WordCount) crea las subclases de DoFn
para cada ParDo
entre líneas, como una instancia de clase interna anónima.
Sin embargo, es una buena idea definir el DoFn
a nivel global, lo que facilita la prueba de unidades y puede hacer que el código de ParDo
sea más legible.
Como se mencionó en el ejemplo anterior (Minimal WordCount), cuando ejecutas tu canalización, la interfaz de supervisión de Dataflow indica cuándo se encuentra en ejecución cada transformación de ParDo
. El servicio de Dataflow genera de forma automática los nombres de transformación para las transformaciones de ParDo
a partir del nombre del DoFn
que pasaste. Por ejemplo, el ParDo
que aplica el FormatAsTextFn()
aparece en la interfaz de supervisión como ParDo(FormatAsText)
.
Java
En el siguiente ejemplo, los DoFn
se definen como clases estáticas:
/** A DoFn that converts a Word and Count into a printable string. */ static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { ... @Override public void processElement(ProcessContext c) { ... } } public static void main(String[] args) throws IOException { Pipeline p = ... // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform. p.apply(...) .apply(...) .apply(ParDo.of(new FormatAsTextFn())) ... }
Consulta Realiza procesamientos paralelos con ParDo si quieres obtener una explicación detallada de la creación y especificación de subclases de DoFn
para tus transformaciones de ParDo
.
Crea transformaciones compuestas
Si tienes una operación de procesamiento que consta de varias transformaciones o pasos de ParDo
, puedes crearla como una subclase de PTransform
. Crear una subclase de PTransform
te permite crear transformaciones complejas que se pueden volver a usar, hacer que la estructura de tu canalización sea más clara y modular, y facilitar la prueba de unidades.
Si haces que la estructura lógica de tu canalización sea explícita, con las subclases de PTransform
, también se puede hacer que la supervisión de tu canalización sea más fácil. Cuando el servicio de Dataflow compila la estructura final y optimizada de tu canalización, la interfaz de supervisión de Dataflow usará las transformaciones que compilaste para reflejar con mayor precisión la estructura de tu canalización.
Java
En este ejemplo, se encapsulan dos transformaciones como los PTransform
de subclase de CountWords
. CountWords
contiene el ParDo
que ejecuta ExtractWordsFn
y la transformación de Count
que proporcionan los SDK.
Cuando se define CountWords
, se especifican la entrada y la salida finales; la entrada es PCollection<String>
para la operación de extracción y la salida es PCollection<KV<String, Long>>
que produce la operación de recuento.
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } public static void main(String[] args) throws IOException { Pipeline p = ... p.apply(...) .apply(new CountWords()) ... }
Usa PipelineOptions
parametrizables
En el ejemplo anterior, Minimal WordCount, se establecieron varias opciones de ejecución cuando se creó la canalización. En este ejemplo, se definen las opciones de configuración personalizadas propias con la extensión de PipelineOptions
.
Puedes agregar tus propios argumentos si quieres que el analizador de línea de comandos los procese y especificar valores predeterminados para ellos. Entonces, puedes acceder a los valores de las opciones en tu código de canalización.
En el ejemplo de Minimal WordCount, las opciones de canalización se integraron como parte del código. Sin embargo, la forma más común de crear PipelineOptions
es a través del análisis de argumentos de la línea de comandos.
Java
public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); ... } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); ... }
Ejemplo de Debugging WordCount
En el ejemplo de Debugging WordCount, se muestran algunas recomendaciones para instrumentar tu código de canalización. Puedes usar la interfaz de supervisión de Dataflow y los agregadores para obtener visibilidad adicional en tu canalización a medida que se ejecuta.
Java
Puedes usar el DataflowAssert del SDK para probar la salida de tus transformaciones en las diferentes etapas de la canalización.
Java
- Observa registros en la interfaz de supervisión de Dataflow.
- Controla los niveles de registro del trabajador de Dataflow.
- Crea
Aggregators
. - Prueba tu canalización con
DataflowAssert
En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.
Observa registros en la interfaz de supervisión de Dataflow
Google Cloud Logging agrega los registros de todos los trabajadores de tu trabajo de Dataflow a una ubicación única en Google Cloud Platform Console. Puedes usar la interfaz de supervisión de Dataflow si quieres ver los registros desde todas las instancias de Compute Engine que Dataflow inició para completar tu trabajo de Dataflow. Puedes agregar instrucciones de registro en las instancias de DoFn
de tu canalización que aparecerán en la interfaz de supervisión a medida que se ejecuta tu canalización.
Java
El siguiente registrador SLF4J usa el nombre de la clase de FilterTextFn
calificado por completo como nombre del registrador. Este nombre hará referencia a todas las instrucciones de registro que emita el registrador, que serán visibles en la interfaz de supervisión de Dataflow, si hay una configuración de nivel de registro apropiada.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DebuggingWordCount { public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); ... public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI // only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); ... } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. LOG.trace("Did not match: " + c.element().getKey()); ... } } } }
Controla los niveles de registro del trabajador de Dataflow
Java
Los trabajadores de Dataflow que ejecutan el código de usuario están configurados para acceder a Cloud Logging de forma predeterminada a nivel de registro INFO y superior. Puedes anular los niveles de registro para los espacios de nombres de registro determinados si especificas lo siguiente:
--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}
Por ejemplo, si especificas lo siguiente
--workerLogLevelOverrides={"com.example":"DEBUG"}
cuando ejecutas esta canalización con el servicio de Dataflow, la interfaz de supervisión constará solo de registros de nivel DEBUG o superior para el paquete com.example
, además de los registros de nivel predeterminado INFO o superior.
Además, puedes anular la configuración de registro de trabajadores de Dataflow predeterminada si especificas lo siguiente
--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>
Por ejemplo, si especificas lo siguiente
--defaultWorkerLogLevel=DEBUG
cuando ejecutas esta canalización con el servicio de Dataflow, la interfaz de supervisión constará solo de registros de nivel DEBUG o superior. Ten en cuenta que, si cambias el nivel de registro del trabajador predeterminado a TRACE o DEBUG, habrá un aumento importante en la cantidad de información de registros.
Obtén más información sobre registros dentro de Cloud Dataflow.
Crea agregadores
Un agregador personalizado puede realizar un seguimiento de los valores en tu canalización a medida que se ejecuta. Esos valores se mostrarán en la interfaz de supervisión de Dataflow cuando la canalización se ejecute con el servicio de Dataflow.
Es posible que los agregadores no se puedan ver hasta que el sistema comience a ejecutar la transformación de ParDo
que los creó o hasta que se cambie su valor inicial. Entonces, se pueden ver en la interfaz de supervisión en la parte inferior del Resumen del trabajo.
Los agregadores personalizados que se encuentran a continuación realizan un seguimiento de la cantidad de palabras coincidentes y no coincidentes.
Java
public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); private final Aggregator<Long, Long> unmatchedWords = createAggregator("umatchedWords", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { ... matchedWords.addValue(1L); ... } else { ... unmatchedWords.addValue(1L); } } } }
Agregadores en canalizaciones por lotes y de transmisión
Los agregadores en las canalizaciones por lotes proporcionan coherencia. Se confirman exactamente una vez en los paquetes exitosos y no se confirman en los paquetes con fallas.
En las canalizaciones de transmisión, los agregadores proporcionan una semántica más tolerante. La contribución desde paquetes exitosos es el mejor esfuerzo y los paquetes con fallas pueden reflejarse en el valor final.
Prueba tu canalización con DataflowAssert
Java
DataflowAssert es un conjunto de PTransforms
convenientes del estilo de los buscadores de coincidencia de recopilación de Hamcrest que se pueden usar cuando se escriben pruebas de nivel de canalización para validar los contenidos de PCollections
. DataflowAssert
se usa mejor en pruebas de unidades con conjuntos de datos pequeños, pero aquí se muestra como una herramienta de enseñanza.
A continuación, se verifica que el conjunto de palabras filtradas coincida con los recuentos esperados. Ten en cuenta que DataflowAssert
no proporciona ningún resultado y que la finalización exitosa de la canalización implica que se cumplieron las expectativas. Obtén más información sobre cómo puedes probar tu canalización y consulta DebuggingWordCountTest para obtener una prueba de unidades de ejemplo.
public static void main(String[] args) { ... List<KV<String, Long>> expectedResults = Arrays.asList( KV.of("Flourish", 3L), KV.of("stomach", 1L)); DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); ... }
WindowedWordCount
Java
Este ejemplo, WindowedWordCount
, cuenta las palabras en los textos como se hizo en los ejemplos anteriores. La entrada para WindowedWordCount
puede ser un conjunto de datos fijo (como en los ejemplos anteriores) o un flujo de datos no delimitado.
El SDK de Dataflow es conveniente porque te permite crear una sola canalización que puede controlar tipos de entrada delimitados o no delimitados. Si la entrada no está delimitada, entonces ninguna de las PCollections
de la canalización estará delimitada. Lo mismo ocurre con la entrada delimitada.
Antes de leer esta sección, asegúrate de que conoces los principios básicos de la creación de una canalización.
Conceptos nuevos:- Lee datos de entrada delimitados y no delimitados
- Agrega marcas de tiempo a los datos
- Sistema de ventanas
- Escribe datos de salida delimitados y no delimitados
En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.
Lee datos de entrada delimitados y no delimitados
La entrada para WindowedWordCount
puede estar delimitada o no. Si tu entrada tiene una cantidad fija de elementos, se considera un conjunto de datos “delimitado”. Si tu entrada se actualiza de forma constante, entonces se considera “no delimitada”. Consulta PCollections delimitadas y no delimitadas para obtener más información acerca de los tipos de entrada.
En este ejemplo, puedes elegir si la entrada será delimitada o no. Recuerda que la entrada para todos los ejemplos es un conjunto de textos de Shakespeare, una entrada finita y delimitada. Sin embargo, con el propósito de explicar los nuevos conceptos en este ejemplo, la entrada es una reproducción de los textos de Shakespeare.
En este ejemplo, si tu entrada no está delimitada, entonces la entrada se leerá desde un tema de Google Cloud Pub/Sub; en ese caso, la transformación de Read
que se aplica a la canalización es una PubSubIO.Read
. De lo contrario, la entrada se leerá desde Google Cloud Storage.
public static void main(String[] args) throws IOException { ... PCollection<String> input; if (options.isUnbounded()) { LOG.info("Reading from PubSub."); // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg. input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic())); } else { // Else, this is a bounded pipeline. Read from the Google Cloud Storage file. input = pipeline.apply(TextIO.Read.from(options.getInputFile())) ... } ... }
Agrega marcas de tiempo a los datos
Cada elemento de una PCollection
tiene una marca de tiempo asociada. La fuente que crea la PCollection
asigna la marca de tiempo para cada elemento. En este ejemplo, si eliges una entrada no delimitada para tu canalización, las marcas de tiempo provendrán de la fuente de datos de Pub/Sub. Si eliges una entrada delimitada, el método DoFn
llamado AddTimestampsFn
(que invoca ParDo
) establecerá una marca de tiempo para cada elemento en la PCollection
.
public static void main(String[] args) throws IOException { ... input = pipeline .apply(...) // Add an element timestamp, using an artificial time. .apply(ParDo.of(new AddTimestampFn())); }
A continuación, se muestra el código para AddTimestampsFn
, un DoFn
invocado por ParDo
, que establece el elemento de los datos de la marca de tiempo de acuerdo con el elemento en sí mismo. Por ejemplo, si los elementos fueran líneas de registro, este ParDo
podría analizar el tiempo de espera de la string de registro y establecerlo como la marca de tiempo del elemento. No hay marcas de tiempo inherentes a las obras de Shakespeare, por lo que, en este caso, se crearon marcas de tiempo aleatorias solo para ilustrar el concepto. Cada línea del texto de entrada obtendrá una marca de tiempo asociada aleatoria en algún momento de un período de 2 horas.
static class AddTimestampFn extends DoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; AddTimestampFn() { this.minTimestamp = new Instant(System.currentTimeMillis()); } @Override public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past 2 hours. long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); Instant randomTimestamp = minTimestamp.plus(randMillis); // Set the data element with that timestamp. c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); } }
Puedes obtener más información sobre las marcas de tiempo en marcas de tiempo de elementos de PCollection.
Sistema de ventanas
El SDK de Dataflow usa un concepto llamado sistema de ventanas para subdividir una PCollection
de acuerdo con las marcas de tiempo de sus elementos individuales.
Las transformaciones de Dataflow que agregan varios elementos procesan cada PCollection
como una sucesión de ventanas múltiples y finitas, aunque toda la recopilación puede ser de tamaño infinito (no delimitada).
El ejemplo de WindowingWordCount
aplica un sistema de ventanas de tiempo fijo, en el que cada ventana representa un intervalo de tiempo fijo. El tamaño de la ventana fijo para este ejemplo se establece de manera predeterminada en 1 minuto (puedes cambiarlo con una opción de línea de comandos). Entonces, la canalización aplica la transformación de CountWords
.
PCollection<KV<String, Long>> wordCounts = input .apply(Window.<String>into( FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) .apply(new WordCount.CountWords());
Escribe datos de salida delimitados y no delimitados
Como la entrada puede ser delimitada o no, lo mismo ocurre con la salida PCollection
. Es necesario asegurarse de elegir un receptor adecuado. Algunos receptores de salida solo admiten una salida delimitada o solo una salida no delimitada. Por ejemplo, un archivo de texto es un receptor que solo puede recibir datos delimitados. La fuente de salida de BigQuery admite una entrada delimitada y no delimitada.
En este ejemplo, transmitimos los resultados a una tabla de BigQuery. Se les da el formato para una tabla de BigQuery y, luego, se los escribe en BigQuery mediante BigQueryIO.Write
.
wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));