Canalización de ejemplo de WordCount

Si aún no lo hiciste, realiza los pasos en la Guía de inicio rápido antes de continuar.

En los ejemplos de WordCount, se muestra cómo configurar una canalización de procesamiento que puede leer texto, dividir las líneas de texto en tokens que sean palabras individuales y realizar un recuento de la frecuencia de cada una de esas palabras. Los SDK de Dataflow contienen una serie de estos cuatro ejemplos de WordCount más detallados de forma sucesiva que se compilan uno sobre otro. El texto de entrada para todos los ejemplos es una colección de las obras de Shakespeare.

Cada ejemplo de WordCount presenta conceptos diferentes en los SDK de Dataflow.

  • Minimal WordCount muestra los principios básicos involucrados en la compilación de una canalización de Dataflow.
  • WordCount presenta algunas de las recomendaciones más comunes de la creación de canalizaciones que se pueden mantener y volver a usar.
  • Debugging WordCount presenta prácticas de registros y depuración.
  • Windowed WordCount muestra cómo puedes usar el modelo de programación de Dataflow para controlar los conjuntos de datos delimitados y no delimitados.

Primero, comprende Minimal WordCount, el ejemplo más simple. Una vez que te sientas cómodo con los principios básicos de la compilación de canalizaciones, continúa a fin de aprender las recomendaciones para escribir programas de Dataflow en WordCount. Luego, lee Debugging WordCount si quieres comprender cómo usar las prácticas comunes para los registros y la depuración. Por último, aprende a usar el mismo patrón de procesamiento en conjuntos de datos infinitos y finitos en Windowed WordCount.

MinimalWordCount

Minimal WordCount muestra una canalización simple que puede leer un bloque de texto desde un archivo en Google Cloud Storage, aplicar transformaciones para dividir en tokens y contar las palabras, y escribir los datos en un archivo de salida en un depósito de Cloud Storage. En este ejemplo, las ubicaciones para los archivos de entrada y salida se integran como parte del código y no se realiza ninguna verificación de errores; se intenta mostrar solo los aspectos básicos de la creación de una canalización de Dataflow. En ejemplos posteriores, parametrizaremos las fuentes de entrada y salida de la canalización y mostraremos otras recomendaciones.

Java

Conceptos clave:
  1. Crea la canalización.
  2. Aplica transformaciones a la canalización:
    • Lee datos de entrada (en este ejemplo: lee archivos de texto).
    • Aplica transformaciones de ParDo.
    • Aplica las transformaciones que proporcionan los SDK (en este ejemplo: Count).
    • Escribe datos de salida (en este ejemplo: escribe en Google Cloud Storage).
  3. Ejecuta la canalización.

En las secciones siguientes, se explican estos conceptos en detalle junto con extractos del código correspondiente de la canalización de Minimal WordCount.

Crea la canalización

El primer paso para crear una canalización de Cloud Dataflow es crear un objeto de Opciones de canalización. Este objeto permite configurar varias opciones para la canalización, como el ejecutor de canalizaciones que ejecutará la canalización, el ID del proyecto y la ubicación de etapa de pruebas a fin de que la canalización almacene los archivos (usados para hacer que tus archivos jar sean accesibles en la nube). En este ejemplo, se configuran estas opciones de manera programática, pero es más frecuente usar los argumentos de la línea de comandos para establecer las opciones de canalización.

En el ejemplo, se especifica BlockingDataflowPipelineRunner como el PipelineRunner, para que se ejecute la canalización en la nube con el servicio de Google Cloud Dataflow. Hay otras opciones que puedes configurar si quieres ejecutar tu canalización en la nube. También puedes omitir esta opción por completo, en cuyo caso, el ejecutor predeterminado ejecutará tu canalización de manera local. Esto se muestra en los siguientes ejemplos de WordCount y se explica con más detalle en Especifica parámetros de ejecución.

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

El siguiente paso es crear un objeto de Pipeline con las opciones que se acaban de crear. El objeto de Pipeline crea el grafo de las transformaciones que se ejecutarán, asociadas a esa canalización en particular.

Java

Pipeline p = Pipeline.create(options);

Consulta Canalizaciones para obtener una explicación detallada del objeto de canalizaciones y su funcionamiento.

Aplica transformaciones de canalizaciones

La canalización de Minimal WordCount contiene transformaciones para leer datos en la canalización, manipularlos o transformarlos y escribir los resultados. Cada transformación representa una operación en la canalización.

Cada transformación toma algún tipo de entrada (datos o de otro tipo) y produce datos de salida. La clase del SDK de PCollection representa los datos de entrada y salida. PCollection es una clase especial que proporciona el SDK de Dataflow que puedes usar para representar un conjunto de datos de casi cualquier tamaño, incluidos los conjuntos de datos infinitos.

En la Figura 1, se muestra el flujo de datos de la canalización:

La canalización usa una transformación de TextIO.Read para crear una PCollection a partir de datos almacenados en un archivo de datos de entrada; la transformación de CountWords produce una PCollection de recuentos de palabras del texto sin formato de PCollection; TextIO.Write escribe los recuentos de palabras con formato en un archivo de datos de salida.
Figura 1: El flujo de datos de la canalización.

La canalización de Minimal WordCount contiene cinco transformaciones:

  1. Se aplica una transformación de Lectura del archivo de texto al objeto de Pipeline y se produce una PCollection como resultado. Cada elemento de la salida de PCollection representa una línea de texto del archivo de entrada.
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. Una transformación de ParDo que invoca un DoFn (se define entre líneas como una clase anónima) en cada elemento que divide las líneas de texto en tokens que sean palabras individuales. La entrada para esta transformación es la PCollection de las líneas de texto que generó la transformación TextIO.Read anterior. La transformación de ParDo da como resultado una PCollection nueva, en la que cada elemento representa una palabra individual en el texto.
  4. Java

    Puedes darle a tu transformación un nombre que aparecerá en la Interfaz de supervisión de Dataflow, mediante la operación .named() como se hizo en este ejemplo. Cuando el servicio de Dataflow ejecuta tu canalización, la interfaz de supervisión indicará cuándo se encuentra en ejecución cada transformación de ParDo.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. La transformación de Count que proporcionan los SDK es una transformación genérica que toma una PCollection de cualquier tipo y muestra un par clave-valor de PCollection. Cada clave representa un elemento único de la recopilación de entrada y cada valor representa el número de veces que esa clave apareció en la recopilación de entrada.

    En esta canalización, la entrada para Count es la PCollection de las palabras individuales que generó el ParDo anterior y la salida es una PCollection de pares clave-valor en la que cada clave representa una palabra única en el texto y el valor asociado es el recuento de caso para cada una.
  6. Java

      .apply(Count.<String>perElement())
    
  7. La siguiente es una transformación que le da formato a cada uno de los pares clave/valor de las palabras únicas y los recuentos de casos en una string que se puede imprimir y es adecuada para escribir en un archivo de salida.
  8. Java

    MapElements es una transformación compuesta de nivel superior que encapsula un ParDo simple. Para cada elemento en la PCollection de entrada, MapElements aplica una función que produce exactamente un elemento de salida. Este MapElements invoca una SimpleFunction (definida entre líneas como una clase anónima) que le da el formato. Como entrada, este MapElements toma un PCollection de pares clave-valor que generó Count y produce una PCollection nueva de strings que se pueden imprimir.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. Un archivo de texto de Escritura. Esta transformación toma la última PCollection de las String con formato como entrada y escribe cada elemento en un archivo de texto de salida. Cada elemento de la PCollection de entrada representa una línea de texto en el archivo de salida resultante.
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Ten en cuenta que la transformación de Write produce un valor de resultado trivial de tipo PDone, que en este caso se ignora.

Ejecuta la canalización

Ejecuta la canalización con llamadas al método run, que envía tu canalización para que la ejecute el ejecutor de canalizaciones que especificaste cuando creaste tu canalización.

Java

p.run();

Ejemplo de WordCount

En este ejemplo de WordCount, se presentan algunas prácticas de programación recomendadas que pueden hacer que tu canalización sea más fácil de leer, escribir o mantener. Si bien no se requieren de forma explícita, pueden hacer que la ejecución de tu canalización sea más flexible, ayudar a probar tu canalización y hacer que el código de tu canalización se pueda volver a usar.

En esta sección, se supone que conoces los conceptos básicos de la compilación de una canalización. Si sientes que aún no los conoces, lee la sección anterior Minimal WordCount.

Java

Conceptos nuevos:
  1. Aplica ParDo con un DoFn explícito.
  2. Crea transformaciones compuestas.
  3. Usa PipelineOptions parametrizables.

En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.

Especifica DoFns explícitos

Cuando usas transformaciones de ParDo, debes especificar la operación de procesamiento que se aplica a cada elemento en el PCollection de entrada. Esta operación de procesamiento es una subclase de la clase del SDK de DoFn. La canalización de ejemplo en la sección anterior (Minimal WordCount) crea las subclases de DoFn para cada ParDo entre líneas, como una instancia de clase interna anónima.

Sin embargo, es una buena idea definir el DoFn a nivel global, lo que facilita la prueba de unidades y puede hacer que el código de ParDo sea más legible.

Como se mencionó en el ejemplo anterior (Minimal WordCount), cuando ejecutas tu canalización, la interfaz de supervisión de Dataflow indica cuándo se encuentra en ejecución cada transformación de ParDo. El servicio de Dataflow genera de forma automática los nombres de transformación para las transformaciones de ParDo a partir del nombre del DoFn que pasaste. Por ejemplo, el ParDo que aplica el FormatAsTextFn() aparece en la interfaz de supervisión como ParDo(FormatAsText).

Java

En el siguiente ejemplo, los DoFn se definen como clases estáticas:

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

Consulta Realiza procesamientos paralelos con ParDo si quieres obtener una explicación detallada de la creación y especificación de subclases de DoFn para tus transformaciones de ParDo.

Crea transformaciones compuestas

Si tienes una operación de procesamiento que consta de varias transformaciones o pasos de ParDo, puedes crearla como una subclase de PTransform. Crear una subclase de PTransform te permite crear transformaciones complejas que se pueden volver a usar, hacer que la estructura de tu canalización sea más clara y modular, y facilitar la prueba de unidades.

Si haces que la estructura lógica de tu canalización sea explícita, con las subclases de PTransform, también se puede hacer que la supervisión de tu canalización sea más fácil. Cuando el servicio de Dataflow compila la estructura final y optimizada de tu canalización, la interfaz de supervisión de Dataflow usará las transformaciones que compilaste para reflejar con mayor precisión la estructura de tu canalización.

Java

En este ejemplo, se encapsulan dos transformaciones como los PTransform de subclase de CountWords. CountWords contiene el ParDo que ejecuta ExtractWordsFn y la transformación de Count que proporcionan los SDK.

Cuando se define CountWords, se especifican la entrada y la salida finales; la entrada es PCollection<String> para la operación de extracción y la salida es PCollection<KV<String, Long>> que produce la operación de recuento.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

Usa PipelineOptions parametrizables

En el ejemplo anterior, Minimal WordCount, se establecieron varias opciones de ejecución cuando se creó la canalización. En este ejemplo, se definen las opciones de configuración personalizadas propias con la extensión de PipelineOptions.

Puedes agregar tus propios argumentos si quieres que el analizador de línea de comandos los procese y especificar valores predeterminados para ellos. Entonces, puedes acceder a los valores de las opciones en tu código de canalización.

En el ejemplo de Minimal WordCount, las opciones de canalización se integraron como parte del código. Sin embargo, la forma más común de crear PipelineOptions es a través del análisis de argumentos de la línea de comandos.

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Ejemplo de Debugging WordCount

En el ejemplo de Debugging WordCount, se muestran algunas recomendaciones para instrumentar tu código de canalización. Puedes usar la interfaz de supervisión de Dataflow y los agregadores para obtener visibilidad adicional en tu canalización a medida que se ejecuta.

Java

Puedes usar el DataflowAssert del SDK para probar la salida de tus transformaciones en las diferentes etapas de la canalización.

Java

Conceptos nuevos:
  1. Observa registros en la interfaz de supervisión de Dataflow.
  2. Controla los niveles de registro del trabajador de Dataflow.
  3. Crea Aggregators.
  4. Prueba tu canalización con DataflowAssert

En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.

Observa registros en la interfaz de supervisión de Dataflow

Google Cloud Logging agrega los registros de todos los trabajadores de tu trabajo de Dataflow a una ubicación única en Google Cloud Platform Console. Puedes usar la interfaz de supervisión de Dataflow si quieres ver los registros desde todas las instancias de Compute Engine que Dataflow inició para completar tu trabajo de Dataflow. Puedes agregar instrucciones de registro en las instancias de DoFn de tu canalización que aparecerán en la interfaz de supervisión a medida que se ejecuta tu canalización.

Java

El siguiente registrador SLF4J usa el nombre de la clase de FilterTextFn calificado por completo como nombre del registrador. Este nombre hará referencia a todas las instrucciones de registro que emita el registrador, que serán visibles en la interfaz de supervisión de Dataflow, si hay una configuración de nivel de registro apropiada.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Controla los niveles de registro del trabajador de Dataflow

Java

Los trabajadores de Dataflow que ejecutan el código de usuario están configurados para acceder a Cloud Logging de forma predeterminada a nivel de registro INFO y superior. Puedes anular los niveles de registro para los espacios de nombres de registro determinados si especificas lo siguiente:

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

Por ejemplo, si especificas lo siguiente

--workerLogLevelOverrides={"com.example":"DEBUG"}

cuando ejecutas esta canalización con el servicio de Dataflow, la interfaz de supervisión constará solo de registros de nivel DEBUG o superior para el paquete com.example, además de los registros de nivel predeterminado INFO o superior.

Además, puedes anular la configuración de registro de trabajadores de Dataflow predeterminada si especificas lo siguiente

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

Por ejemplo, si especificas lo siguiente

--defaultWorkerLogLevel=DEBUG

cuando ejecutas esta canalización con el servicio de Dataflow, la interfaz de supervisión constará solo de registros de nivel DEBUG o superior. Ten en cuenta que, si cambias el nivel de registro del trabajador predeterminado a TRACE o DEBUG, habrá un aumento importante en la cantidad de información de registros.

Obtén más información sobre registros dentro de Cloud Dataflow.

Crea agregadores

Un agregador personalizado puede realizar un seguimiento de los valores en tu canalización a medida que se ejecuta. Esos valores se mostrarán en la interfaz de supervisión de Dataflow cuando la canalización se ejecute con el servicio de Dataflow.

Es posible que los agregadores no se puedan ver hasta que el sistema comience a ejecutar la transformación de ParDo que los creó o hasta que se cambie su valor inicial. Entonces, se pueden ver en la interfaz de supervisión en la parte inferior del Resumen del trabajo.

Los agregadores personalizados que se encuentran a continuación realizan un seguimiento de la cantidad de palabras coincidentes y no coincidentes.

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

Agregadores en canalizaciones por lotes y de transmisión

Los agregadores en las canalizaciones por lotes proporcionan coherencia. Se confirman exactamente una vez en los paquetes exitosos y no se confirman en los paquetes con fallas.

En las canalizaciones de transmisión, los agregadores proporcionan una semántica más tolerante. La contribución desde paquetes exitosos es el mejor esfuerzo y los paquetes con fallas pueden reflejarse en el valor final.

Prueba tu canalización con DataflowAssert

Java

DataflowAssert es un conjunto de PTransforms convenientes del estilo de los buscadores de coincidencia de recopilación de Hamcrest que se pueden usar cuando se escriben pruebas de nivel de canalización para validar los contenidos de PCollections. DataflowAssert se usa mejor en pruebas de unidades con conjuntos de datos pequeños, pero aquí se muestra como una herramienta de enseñanza.

A continuación, se verifica que el conjunto de palabras filtradas coincida con los recuentos esperados. Ten en cuenta que DataflowAssert no proporciona ningún resultado y que la finalización exitosa de la canalización implica que se cumplieron las expectativas. Obtén más información sobre cómo puedes probar tu canalización y consulta DebuggingWordCountTest para obtener una prueba de unidades de ejemplo.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

Este ejemplo, WindowedWordCount, cuenta las palabras en los textos como se hizo en los ejemplos anteriores. La entrada para WindowedWordCount puede ser un conjunto de datos fijo (como en los ejemplos anteriores) o un flujo de datos no delimitado.

El SDK de Dataflow es conveniente porque te permite crear una sola canalización que puede controlar tipos de entrada delimitados o no delimitados. Si la entrada no está delimitada, entonces ninguna de las PCollections de la canalización estará delimitada. Lo mismo ocurre con la entrada delimitada.

Antes de leer esta sección, asegúrate de que conoces los principios básicos de la creación de una canalización.

Conceptos nuevos:
  1. Lee datos de entrada delimitados y no delimitados
  2. Agrega marcas de tiempo a los datos
  3. Sistema de ventanas
  4. Escribe datos de salida delimitados y no delimitados

En las secciones siguientes, se explican estos conceptos clave y se divide el código de canalización en secciones más pequeñas.

Lee datos de entrada delimitados y no delimitados

La entrada para WindowedWordCount puede estar delimitada o no. Si tu entrada tiene una cantidad fija de elementos, se considera un conjunto de datos “delimitado”. Si tu entrada se actualiza de forma constante, entonces se considera “no delimitada”. Consulta PCollections delimitadas y no delimitadas para obtener más información acerca de los tipos de entrada.

En este ejemplo, puedes elegir si la entrada será delimitada o no. Recuerda que la entrada para todos los ejemplos es un conjunto de textos de Shakespeare, una entrada finita y delimitada. Sin embargo, con el propósito de explicar los nuevos conceptos en este ejemplo, la entrada es una reproducción de los textos de Shakespeare.

En este ejemplo, si tu entrada no está delimitada, entonces la entrada se leerá desde un tema de Google Cloud Pub/Sub; en ese caso, la transformación de Read que se aplica a la canalización es una PubSubIO.Read. De lo contrario, la entrada se leerá desde Google Cloud Storage.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

Agrega marcas de tiempo a los datos

Cada elemento de una PCollection tiene una marca de tiempo asociada. La fuente que crea la PCollection asigna la marca de tiempo para cada elemento. En este ejemplo, si eliges una entrada no delimitada para tu canalización, las marcas de tiempo provendrán de la fuente de datos de Pub/Sub. Si eliges una entrada delimitada, el método DoFn llamado AddTimestampsFn (que invoca ParDo) establecerá una marca de tiempo para cada elemento en la PCollection.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

A continuación, se muestra el código para AddTimestampsFn, un DoFn invocado por ParDo, que establece el elemento de los datos de la marca de tiempo de acuerdo con el elemento en sí mismo. Por ejemplo, si los elementos fueran líneas de registro, este ParDo podría analizar el tiempo de espera de la string de registro y establecerlo como la marca de tiempo del elemento. No hay marcas de tiempo inherentes a las obras de Shakespeare, por lo que, en este caso, se crearon marcas de tiempo aleatorias solo para ilustrar el concepto. Cada línea del texto de entrada obtendrá una marca de tiempo asociada aleatoria en algún momento de un período de 2 horas.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

Puedes obtener más información sobre las marcas de tiempo en marcas de tiempo de elementos de PCollection.

Sistema de ventanas

El SDK de Dataflow usa un concepto llamado sistema de ventanas para subdividir una PCollection de acuerdo con las marcas de tiempo de sus elementos individuales. Las transformaciones de Dataflow que agregan varios elementos procesan cada PCollection como una sucesión de ventanas múltiples y finitas, aunque toda la recopilación puede ser de tamaño infinito (no delimitada).

El ejemplo de WindowingWordCount aplica un sistema de ventanas de tiempo fijo, en el que cada ventana representa un intervalo de tiempo fijo. El tamaño de la ventana fijo para este ejemplo se establece de manera predeterminada en 1 minuto (puedes cambiarlo con una opción de línea de comandos). Entonces, la canalización aplica la transformación de CountWords.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

Escribe datos de salida delimitados y no delimitados

Como la entrada puede ser delimitada o no, lo mismo ocurre con la salida PCollection. Es necesario asegurarse de elegir un receptor adecuado. Algunos receptores de salida solo admiten una salida delimitada o solo una salida no delimitada. Por ejemplo, un archivo de texto es un receptor que solo puede recibir datos delimitados. La fuente de salida de BigQuery admite una entrada delimitada y no delimitada.

En este ejemplo, transmitimos los resultados a una tabla de BigQuery. Se les da el formato para una tabla de BigQuery y, luego, se los escribe en BigQuery mediante BigQueryIO.Write.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.