Crea tu canalización

Tu programa de Dataflow expresa una canalización de procesamiento de datos de principio a fin. En esta sección, se explican los mecanismos del uso de las clases en el SDK de Dataflow para compilar una canalización. A fin de construir una canalización con las clases en los SDK de Dataflow, tu programa necesitará que realices los siguientes pasos generales:

  • Crea un objeto Pipeline.
  • Usa una transformación Read o Create a fin de crear una o más PCollection para tus datos de canalización
  • Aplica transformaciones a cada PCollection. Las transformaciones pueden cambiar, filtrar, agrupar, analizar o procesar los elementos de una PCollection. Cada transformación crea una nueva PCollection de salida, a la que puedes aplicar transformaciones adicionales hasta que se complete el procesamiento.
  • Escribe o entrega como resultado las PCollection finales ya transformadas.
  • Ejecuta la canalización

Consulta la canalización simple de ejemplo para obtener un ejemplo completo que demuestra cada paso general.

Crea el objeto de canalización

Un programa de Dataflow comienza, por lo general, con la creación de un objeto Pipeline.

En los SDK de Dataflow, cada canalización está representada por un objeto explícito del tipo Pipeline. Cada objeto Pipeline es una entidad independiente que encapsula los datos con los que opera la canalización y las transformaciones que se aplican a esos datos.

Java

Para crear una canalización, declara un objeto Pipeline y pasa algunas opciones de configuración. Para pasar las opciones de configuración, crea un objeto del tipo PipelineOptions, que puedes compilar mediante el método estático PipelineOptionsFactory.create().

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

Configura opciones de canalización

Usa las opciones de canalización para configurar diferentes aspectos de tu canalización. Estas pueden incluir los siguientes aspectos:

  • Dónde se ejecuta tu canalización
  • Dónde tu trabajo de canalización habilita los archivos por etapa
  • Con qué proyecto de Cloud Platform se asocia tu canalización
  • Cuántas instancias de Compute Engine usa tu canalización como trabajadores

Las propiedades de opción de canalización incluyen información sobre tu proyecto de Cloud Dataflow que requiere el servicio de Cloud Dataflow, como tu ID del proyecto y las ubicaciones de las etapas de pruebas de Cloud Storage. Las opciones de canalización también te permiten controlar cuántos trabajadores debe asignar el servicio de Dataflow a tu trabajo de canalización y a dónde dirigir los mensajes de estado de tu trabajo de canalización.

El ejecutor de canalización es una propiedad clave en las opciones de canalización que determina dónde se ejecuta tu canalización (en el servicio de Cloud Dataflow o de forma local). La propiedad de ejecutor de canalización también especifica si la ejecución de tu canalización debe ser asíncrona o de bloqueo.

Java

Si bien puedes establecer las propiedades del objeto PipelineOptions directamente en tu canalización mediante métodos set (PipelineOptions.set[OptionName]), se recomienda pasar los valores por medio de las opciones de línea de comandos. El SDK de Dataflow para Java proporciona una clase PipelineOptionsFactory que analiza y valida las opciones de línea de comandos que se pasan a la canalización. Si usas opciones de línea de comandos para determinar PipelineRunner y otros campos en PipelineOptions en el entorno de ejecución, puedes usar el mismo código para construir y ejecutar tu canalización de manera local y en la nube.

Consulta Especifica los parámetros de ejecución a fin de obtener más información sobre cómo establecer las opciones de canalización de manera programática para el modo de ejecución local o en la nube. En la canalización WordCount de ejemplo, también se muestra cómo definir las opciones de canalización en el entorno de ejecución mediante las opciones de línea de comandos.

Lee datos en la canalización

A fin de crear la PCollection inicial de la canalización, aplica una transformación raíz al objeto de canalización. Una transformación raíz crea una PCollection a partir de una fuente de datos externa o de algunos datos locales que especifiques.

Java

Existen dos tipos de transformaciones raíz en el SDK de Dataflow para Java: Read y Create. Las transformaciones Read leen datos de una fuente externa, como BigQuery o un archivo de texto en Google Cloud Storage. Las transformaciones Create crean una PCollection a partir de una java.util.Collection en memoria.

En el siguiente ejemplo, se muestra cómo apply una transformación raíz a TextIO.Read para leer datos de un archivo de texto en Google Cloud Storage. La transformación se aplica a un objeto Pipeline p y muestra un conjunto de datos de canalización en el formato de una PCollection<String>:

PCollection<String> lines = p.apply(
  TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));

Aplica transformaciones para procesar datos de canalización

Si deseas usar transformaciones en tu canalización, las debes aplicar a la PCollection que deseas transformar.

Java

Para aplicar una transformación, debes llamar al método apply en cada PCollection que deseas procesar y pasar el objeto de transformación que quieres como un argumento.

Los SDK de Dataflow contienen varias transformaciones diferentes que puedes aplicar a las PCollection de la canalización. Dentro de ellas, se incluyen las transformaciones principales de uso general, como ParDo o Combine. También existen transformaciones compuestas predefinidas incluidas en el SDK, que combinan una o más de las transformaciones principales en un patrón de procesamiento útil, como contar o combinar elementos en una colección. También puedes definir tus propias transformaciones compuestas más complejas, de modo que se adapten al caso práctico de tu canalización.

Java

En el SDK de Dataflow para Java, cada transformación es una subclase de la clase principal PTransform. Cuando llamas a apply en una PCollection, pasas la PTransform que quieres usar como argumento.

En el siguiente código, se muestra cómo apply una transformación a una PCollection de strings. Esta es una transformación personalizada que define el usuario. Con ella, se revierten los contenidos de cada string, y el resultado es una PCollection nueva que contiene las strings revertidas.

La entrada es una PCollection<String> llamada words; el código pasa una instancia de un objeto PTransform llamado ReverseWords para apply, y guarda el valor de retorno como la PCollection<String> llamada reversedWords.

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

Escribe o entrega los resultados de los datos de canalización finales

Una vez que tu canalización aplicó todas sus transformaciones, por lo general, necesitarás entregar los resultados. Para entregar como resultado las PCollection finales de la canalización, aplica una transformación Write a esa PCollection. Las transformaciones Write pueden enviar los elementos de una PCollection a un receptor de datos externo, como un archivo en Google Cloud Storage o una tabla de BigQuery. Puedes usar Write para entregar una PCollection en cualquier momento de la canalización, aunque lo usual es que escribas los datos al final de la canalización.

Java

En el siguiente código de ejemplo, se muestra cómo apply una transformación TextIO.Write para escribir una PCollection de String en un archivo de texto en Google Cloud Storage:

PCollection<String> filteredWords = ...;
filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));

Ejecuta la canalización

Una vez construida la canalización, usa el método run para ejecutarla. Las canalizaciones se ejecutan de forma asíncrona: el programa que creas envía una especificación de tu canalización a un ejecutor de canalización, que luego construye y ejecuta la serie real de operaciones de canalización. Puedes especificar el lugar en el que deseas que se ejecute la canalización: puede ser de forma local con fines de prueba y depuración, o en el servicio administrado de Cloud Dataflow. Consulta Especifica los parámetros de ejecución para obtener más información sobre los ejecutores de canalizaciones, la configuración de las opciones de canalización, y la ejecución local y en la nube.

En los SDK de Dataflow, especifica un PipelineRunner en las opciones de tu canalización cuando crees el objeto Pipeline. Cuando termines de construir la canalización, invoca a run en el objeto de canalización de la siguiente manera:

Java

p.run();