Canalización de ejemplo de WordCount

Si aún no lo hiciste, realiza los pasos en la Guía de inicio rápido antes de continuar.

En los ejemplos de WordCount, se muestra cómo configurar una canalización de procesamiento que puede leer texto, dividir las líneas de texto en tokens que sean palabras individuales y realizar un recuento de la frecuencia de cada una de esas palabras. Los SDK de Dataflow contienen una serie de estos cuatro ejemplos de WordCount más detallados de forma sucesiva que se compilan uno sobre otro. El texto de entrada para todos los ejemplos es una colección de las obras de Shakespeare.

Cada ejemplo de WordCount presenta conceptos diferentes en los SDK de Dataflow.

  • Minimal WordCount muestra los principios básicos involucrados en la compilación de una canalización de Dataflow.
  • WordCount presenta algunas de las recomendaciones más comunes de la creación de canalizaciones que se pueden mantener y volver a usar.
  • Debugging WordCount presenta prácticas de registros y depuración.
  • Windowed WordCount muestra cómo puedes usar el modelo de programación de Dataflow para controlar los conjuntos de datos delimitados y no delimitados.

Primero, comprende Minimal WordCount, el ejemplo más simple. Una vez que te sientas cómodo con los principios básicos de la compilación de canalizaciones, continúa a fin de aprender las prácticas recomendadas para escribir programas de Dataflow en WordCount. Luego, lee Debugging WordCount a fin de comprender cómo usar las prácticas comunes para los registros y la depuración. Por último, aprende a usar el mismo patrón de procesamiento en conjuntos de datos infinitos y finitos en Windowed WordCount.

MinimalWordCount

Minimal WordCount muestra una canalización simple que puede leer un bloque de texto desde un archivo en Google Cloud Storage, aplicar transformaciones para dividir en tokens y contar las palabras, y escribir los datos en un archivo de salida en un depósito de Cloud Storage. En este ejemplo, las ubicaciones para los archivos de entrada y salida se integran como parte del código y no se realiza ninguna verificación de errores; se intenta mostrar solo los aspectos básicos de la creación de una canalización de Dataflow. En ejemplos posteriores, parametrizaremos las fuentes de entrada y salida de la canalización y mostraremos otras recomendaciones.

Java

Conceptos clave:
  1. Crea la canalización
  2. Aplica transformaciones a la canalización:
    • Lee datos de entrada (en este ejemplo: lee archivos de texto)
    • Aplica transformaciones ParDo
    • Aplica las transformaciones que proporcionan los SDK (en este ejemplo: Count)
    • Escribe datos de salida (en este ejemplo: escribe en Google Cloud Storage)
  3. Ejecuta la canalización.

En las secciones siguientes, se explican estos conceptos en detalle junto con extractos del código correspondiente de la canalización de Minimal WordCount.

Crea la canalización

El primer paso para crear una canalización de Cloud Dataflow es crear un objeto de Opciones de canalización. Este objeto permite configurar varias opciones para la canalización, como el ejecutor de canalizaciones que ejecutará la canalización, el ID del proyecto y la ubicación de etapa de pruebas a fin de que la canalización almacene los archivos (usados para hacer que tus archivos jar sean accesibles en la nube). En este ejemplo, se configuran estas opciones de manera programática, pero es más frecuente usar los argumentos de la línea de comandos para establecer las opciones de canalización.

En nuestro ejemplo, especificamos BlockingDataflowPipelineRunner como PipelineRunner, para que nuestra canalización se ejecute en la nube con el servicio de Google Cloud Dataflow. Existen otras opciones que puedes configurar si quieres ejecutar tu canalización en la nube. También puedes omitir esta opción por completo, en cuyo caso, el ejecutor predeterminado ejecutará tu canalización de manera local. Esto se muestra en los siguientes dos ejemplos de WordCount y se explica con más detalle en Especifica parámetros de ejecución.

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

El siguiente paso es crear un objeto Pipeline con las opciones que acabamos de crear. El objeto Pipeline compila el grafo de las transformaciones que se ejecutarán, asociadas con esa canalización específica.

Java

Pipeline p = Pipeline.create(options);

Consulta Canalizaciones para obtener una explicación detallada del objeto de canalizaciones y su funcionamiento.

Aplica transformaciones de canalizaciones

La canalización de Minimal WordCount contiene transformaciones para leer datos en la canalización, manipularlos o transformarlos y escribir los resultados. Cada transformación representa una operación en la canalización.

Cada transformación toma algún tipo de entrada (datos o de otro tipo) y produce datos de salida. La clase del SDK de PCollection representa los datos de entrada y salida. PCollection es una clase especial que proporciona el SDK de Dataflow que puedes usar para representar un conjunto de datos de casi cualquier tamaño, incluidos los conjuntos de datos infinitos.

En la Figura 1, se muestra el flujo de datos de la canalización:

La canalización usa una transformación TextIO.Read para crear una clase PCollection a partir de datos almacenados en un archivo de datos de entrada. La transformación CountWords produce una clase PCollection de recuentos de palabras del texto sin formato de PCollection. TextIO.Write escribe los recuentos de palabras con formato en un archivo de datos de salida.
Figura 1: El flujo de datos de la canalización

La canalización de Minimal WordCount contiene cinco transformaciones:

  1. Se aplica una transformación Read del archivo de texto al objeto Pipeline y se produce una PCollection como salida. Cada elemento de la salida de PCollection representa una línea de texto del archivo de entrada.
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. Es una transformación ParDo que invoca un DoFn (se define entre líneas como una clase anónima) en cada elemento que divide las líneas de texto en tokens que sean palabras individuales. La entrada para esta transformación es el PCollection de las líneas de texto que generó la transformación TextIO.Read anterior. La transformación ParDo genera un PCollection nuevo, en el que cada elemento representa una palabra individual en el texto.
  4. Java

    Puedes darle a tu transformación un nombre que aparecerá en la Interfaz de supervisión de Dataflow, mediante la operación .named() como se hizo en este ejemplo. Cuando el servicio de Dataflow ejecuta tu canalización, la interfaz de supervisión indicará cuándo se encuentra en ejecución cada transformación ParDo.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. La transformación Countque proporcionan los SDK es una transformación genérica que toma una PCollection de cualquier tipo y muestra un par clave-valor de PCollection. Cada clave representa un elemento único de la colección de entrada y cada valor representa el número de veces que esa clave apareció en la colección de entrada.

    En esta canalización, la entrada para Count es la PCollection de las palabras individuales que generó el ParDo anterior y la salida es una PCollection de pares clave-valor en la que cada clave representa una palabra única en el texto y el valor asociado es el recuento de caso para cada una.
  6. Java

      .apply(Count.<String>perElement())
    
  7. La siguiente es una transformación que le da formato a cada uno de los pares clave/valor de las palabras únicas y los recuentos de casos en una string que se puede imprimir y es adecuada para escribir en un archivo de salida.
  8. Java

    MapElements es una transformación compuesta por el nivel superior que encapsula un ParDo simple. Para cada elemento en la entrada PCollection, MapElements aplica una función que produce exactamente un elemento de salida. Este MapElements invoca un SimpleFunction (definido en línea como una clase anónima) que aplica el formato. Como entrada, este MapElements toma un PCollection de par clave-valor que genera Count y produce un nuevo PCollection de strings que se pueden imprimir.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. Un archivo de texto de Escritura. Esta transformación toma la última PCollection de las String con formato como entrada y escribe cada elemento en un archivo de texto de salida. Cada elemento de la PCollection de entrada representa una línea de texto en el archivo de salida resultante.
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Ten en cuenta que la transformación Write produce un valor de resultado trivial de tipo PDone, que en este caso se ignora.

Ejecuta la canalización

Ejecuta la canalización con llamadas al método run, que envía tu canalización para que la ejecute el ejecutor de canalizaciones que especificaste cuando creaste tu canalización.

Java

p.run();

Ejemplo de WordCount

En este ejemplo de WordCount, se presentan algunas prácticas de programación recomendadas que pueden hacer que tu canalización sea más fácil de leer, escribir o mantener. Si bien no se requieren de forma explícita, pueden hacer que la ejecución de tu canalización sea más flexible, ayudar a probar tu canalización y hacer que el código de tu canalización se pueda volver a usar.

En esta sección, se supone que conoces los conceptos básicos de la compilación de una canalización. Si sientes que aún no los conoces, lee la sección anterior Minimal WordCount.

Java

Conceptos nuevos:
  1. Aplica ParDo con un DoFn explícito
  2. Crea transformaciones compuestas
  3. Usa PipelineOptions con parámetros

En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.

Especifica DoFns explícitos

Cuando se usan transformaciones ParDo, es necesario especificar la operación de procesamiento que se aplica a cada elemento en el PCollection de entrada. Esta operación de procesamiento es una subclase de la clase SDK de DoFn. La canalización de ejemplo en la sección anterior (Minimal WordCount) crea las subclases DoFn para cada ParDo entre líneas, como una instancia de clase interna anónima.

Sin embargo, suele ser una buena idea definir DoFn a nivel global, lo que facilita la prueba de unidades y puede hacer que el ParDo sea más legible.

Como se mencionó en el ejemplo anterior (Minimal WordCount), cuando ejecutas tu canalización, la interfaz de supervisión de Dataflow indica cuándo se encuentra en ejecución cada transformación ParDo. El servicio de Dataflow genera automáticamente nombres de transformación para las transformaciones ParDo a partir del nombre del DoFn que pasas. Por ejemplo, el ParDo que aplica el FormatAsTextFn() aparece en la interfaz de supervisión como ParDo(FormatAsText).

Java

En este ejemplo, los DoFns se definen como clases estáticas:

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

Consulta Procesamiento paralelo con ParDo para obtener un análisis detallado sobre cómo crear y especificar subclases de DoFn en tus transformaciones ParDo.

Crea transformaciones compuestas

Si tienes una operación de procesamiento que consta de varias transformaciones o pasos de ParDo, puedes crearla como una subclase de PTransform. Crear una subclase de PTransform te permite crear transformaciones complejas que se pueden volver a usar, hacer que la estructura de tu canalización sea más clara y modular, y facilitar la prueba de unidades.

Si haces que la estructura lógica de tu canalización sea explícita, con las subclases de PTransform, también se puede hacer que la supervisión de tu canalización sea más fácil. Cuando el servicio de Dataflow compila la estructura final y optimizada de tu canalización, la interfaz de supervisión de Dataflow usará las transformaciones que compilaste para reflejar con mayor precisión la estructura de tu canalización.

Java

En este ejemplo, dos transformaciones se encapsulan como la subclase de PTransform de CountWords. CountWords contiene el ParDo que ejecuta ExtractWordsFn y la transformación Count que proporcionan los SDK.

Cuando se define CountWords, especificamos su entrada y salida finales; la entrada es PCollection<String> para la operación de extracción, y la salida es PCollection<KV<String, Long>> que produce la operación de recuento.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

Usa PipelineOptions con parámetros

En el ejemplo anterior, Minimal WordCount, se establecieron varias opciones de ejecución cuando se creó la canalización. En este ejemplo, se definen las opciones de configuración personalizadas propias con la extensión de PipelineOptions.

Puedes agregar tus propios argumentos si quieres que el analizador de línea de comandos los procese y si quieres especificar valores predeterminados para ellos. Entonces, puedes acceder a los valores de las opciones en tu código de canalización.

En el ejemplo de Minimal WordCount, las opciones de canalización se integraron como parte del código. Sin embargo, la forma más común de crear PipelineOptions es a través del análisis de argumentos de la línea de comandos.

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Ejemplo de depuración de WordCount

En el ejemplo de Debugging WordCount, se muestran algunas prácticas recomendadas para instrumentar tu código de canalización. Puedes usar la interfaz de supervisión de Dataflow y los agregadores para obtener visibilidad adicional en tu canalización a medida que se ejecuta.

Java

Puedes usar DataflowAssert del SDK para probar la salida de tus transformaciones en las diferentes etapas de la canalización.

Java

Conceptos nuevos:
  1. Visualiza registros en la interfaz de supervisión de Dataflow
  2. Controla los niveles de registro del trabajador de Dataflow
  3. Crea Aggregators
  4. Prueba tu canalización mediante DataflowAssert

En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.

Visualiza registros en la interfaz de supervisión de Dataflow

Google Cloud Logging agrupa los registros de todos los trabajadores de tu trabajo de Dataflow en una sola ubicación en Google Cloud Console. Puedes usar la interfaz de supervisión de Dataflow si quieres ver los registros desde todas las instancias de Compute Engine que Dataflow inició para completar tu trabajo de Dataflow. Puedes agregar instrucciones de registro en las instancias de DoFn de tu canalización que aparecerán en la interfaz de supervisión a medida que se ejecute tu canalización.

Java

El siguiente registrador SLF4J usa el nombre de la clase de FilterTextFn completamente calificado como nombre del registrador. Este nombre hará referencia a todas las instrucciones de registro que emita el registrador, que serán visibles en la interfaz de supervisión de Dataflow, si hay una configuración de nivel de registro apropiada.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Controla los niveles de registro del trabajador de Dataflow

Java

Los trabajadores de Dataflow que ejecutan el código de usuario están configurados para acceder a Cloud Logging de forma predeterminada a nivel de registro INFO y superior. Puedes anular los niveles de registro para los espacios de nombres de registro determinados si especificas lo siguiente:

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

Por ejemplo, si especificas lo siguiente

--workerLogLevelOverrides={"com.example":"DEBUG"}

cuando ejecutas esta canalización con el servicio de Dataflow, la Interfaz de supervisión solo contendrá DEBUG o registros de nivel superior para el paquete com.example, además de los registros de INFO predeterminados o de nivel superior.

Además, puedes anular la configuración de registro de trabajadores de Dataflow predeterminada si especificas lo siguiente:

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

Por ejemplo, si especificas lo siguiente

--defaultWorkerLogLevel=DEBUG

cuando ejecutas esta canalización con el servicio de Dataflow, la interfaz de supervisión tendrá todos los registros de DEBUG o nivel o superior. Ten en cuenta que si cambias el nivel de registro del trabajador predeterminado a TRACE o DEBUG, aumentará significativamente la cantidad de información de registro.

Obtén más información sobre registros en Cloud Dataflow.

Crea agregadores

Un agregador personalizado puede realizar un seguimiento de los valores en tu canalización a medida que se ejecuta. Esos valores se mostrarán en la interfaz de supervisión de Dataflow cuando la canalización se ejecute con el servicio de Dataflow.

Es posible que los agregadores no se puedan ver hasta que el sistema comience a ejecutar la transformación ParDo que los creó o hasta que se cambie su valor inicial. Entonces, se pueden ver en la interfaz de supervisión en la parte inferior del Resumen del trabajo.

Los agregadores personalizados que se encuentran a continuación realizan un seguimiento de la cantidad de palabras coincidentes y no coincidentes.

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

Agregadores en canalizaciones por lotes y de transmisión

Los agregadores en las canalizaciones por lotes proporcionan coherencia. Se confirman exactamente una vez en los paquetes exitosos y no se confirman en los paquetes con fallas.

En las canalizaciones de transmisión, los agregadores proporcionan una semántica más tolerante. La contribución desde paquetes correctos es el mejor esfuerzo y los paquetes con fallas pueden reflejarse en el valor final.

Prueba tu canalización mediante DataflowAssert

Java

DataflowAssert es un conjunto de PTransforms conveniente del estilo de buscadores de coincidencias en colección de Hamcrest que se puede usar cuando se escriben pruebas de nivel de canalización para validar el contenido de PCollections. DataflowAssert se usa mejor en pruebas de unidades con conjuntos de datos pequeños, pero aquí se muestra como una herramienta de enseñanza.

A continuación, se verifica que el conjunto de palabras filtradas coincida con los recuentos esperados. Ten en cuenta que DataflowAssert no proporciona ningún resultado y que la finalización correcta de la canalización implica que se cumplieron las expectativas. Obtén más información sobre cómo puedes probar tu canalización y consulta DebuggingWordCountTest para obtener una prueba de unidades de ejemplo.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

En este ejemplo, WindowedWordCount, se cuentan las palabras en los textos como se hizo en los ejemplos anteriores, pero se presentan varios conceptos avanzados. La entrada para WindowedWordCount puede ser un conjunto de datos fijo (como en los ejemplos anteriores) o un flujo de datos no delimitado.

El SDK de Dataflow es conveniente porque te permite crear una sola canalización que puede controlar tipos de entrada delimitados o no delimitados. Si la entrada no está delimitada, todos los PCollections de la canalización no estarán delimitados. Lo mismo ocurre con la entrada delimitada.

Antes de leer esta sección, asegúrate de que conoces los principios básicos de la creación de una canalización.

Conceptos nuevos:
  1. Lee datos de entrada delimitados y no delimitados
  2. Agrega marcas de tiempo a los datos
  3. Sistema de ventanas
  4. Escribe datos de salida delimitados y no delimitados

En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.

Lee datos de entrada delimitados y no delimitados

La entrada para WindowedWordCount puede ser delimitada o no. Si tu entrada tiene una cantidad fija de elementos, se considera un conjunto de datos “delimitado”. Si tu entrada se actualiza de forma constante, entonces se considera “no delimitada”. Consulta las clases PCollection delimitadas y no delimitadas para obtener más información acerca de los tipos de entrada.

En este ejemplo, puedes elegir si la entrada será delimitada o no. Recuerda que la entrada para todos los ejemplos es un conjunto de textos de Shakespeare, una entrada finita y delimitada. Sin embargo, con el propósito de explicar los nuevos conceptos en este ejemplo, la entrada es una reproducción de los textos de Shakespeare.

En este ejemplo, si la entrada es delimitada, esta se leerá desde un tema de Pub/Sub de Google Cloud. En ese caso, la transformación Read aplicada a la canalización es una PubSubIO.Read. De lo contrario, la entrada se leerá desde Google Cloud Storage.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

Agrega marcas de tiempo a los datos

Cada elemento en una PCollection tiene una marca de tiempo asociada. La fuente que crea la PCollection asigna la marca de tiempo para cada elemento. En este ejemplo, si eliges una entrada no delimitada para tu canalización, las marcas de tiempo provendrán de la fuente de datos de Pub/Sub. Si eliges una entrada delimitada, el método DoFn llamado AddTimestampsFn (que invoca ParDo) establecerá una marca de tiempo para cada elemento en la PCollection.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

A continuación, se muestra el código para AddTimestampsFn, un DoFn que invoca ParDo, que establece el elemento de datos de la marca de tiempo de acuerdo con el elemento en sí. Por ejemplo, si los elementos eran líneas de registro, este ParDo podría analizar el agotamiento del tiempo de la string de registro y establecerla como la marca de tiempo del elemento. No hay marcas de tiempo inherentes a las obras de Shakespeare, por lo que, en este caso, se crearon marcas de tiempo aleatorias solo para ilustrar el concepto. Cada línea del texto de entrada obtendrá una marca de tiempo asociada aleatoria en algún momento de un período de 2 horas.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

Puedes obtener más información sobre las marcas de tiempo en marcas de tiempo de elementos de PCollection.

Sistema de ventanas

El SDK de Dataflow usa un concepto llamado sistema de ventanas para subdividir una PCollection de acuerdo con las marcas de tiempo de sus elementos individuales. Las transformaciones de Dataflow que agregan varios elementos procesan cada PCollection como una sucesión de ventanas múltiples y finitas, aunque toda la colección puede ser de tamaño infinito (no delimitada).

En el ejemplo de WindowingWordCount, se aplica un sistema de ventanas fijo, en el que cada ventana representa un intervalo de tiempo fijo. El tamaño del sistema de ventanas fijo para este ejemplo se establece de manera predeterminada en 1 minuto (puedes cambiarlo con una opción de línea de comandos). Entonces, la canalización aplica la transformación CountWords.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

Escribe datos de salida delimitados y no delimitados

Como la entrada puede ser delimitada o no, lo mismo ocurre con la salida PCollection. Es necesario asegurarse de elegir un receptor adecuado. Algunos receptores de salida solo admiten una salida delimitada o solo una salida no delimitada. Por ejemplo, un archivo de texto es un receptor que solo puede recibir datos delimitados. La fuente de salida de BigQuery admite una entrada delimitada y no delimitada.

En este ejemplo, transmitimos los resultados a una tabla de BigQuery. Se les da el formato para una tabla de BigQuery y, luego, se los escribe en BigQuery mediante BigQueryIO.Write.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));