Construye tu canalización

Tu programa de Dataflow expresa una canalización de procesamiento de datos de principio a fin. En esta sección, se explican los mecanismos del uso de las clases en el SDK de Dataflow para compilar una canalización. A fin de construir una canalización con las clases en el SDK de Dataflow, tu programa necesitará realizar los siguientes pasos generales:

  • Crear un objeto Pipeline
  • Usar una transformación de lectura o creación a fin de crear una o más PCollection para tus datos de canalización
  • Aplicar transformaciones a cada PCollection Las transformaciones pueden cambiar, filtrar, agrupar, analizar o procesar los elementos en una PCollection Cada transformación crea una nueva PCollection de salida, a la que puedes aplicarle transformaciones adicionales hasta que se complete el procesamiento
  • Escribir o entregar como resultado las PCollection finales transformadas
  • Ejecutar la canalización

Consulta la canalización simple de ejemplo para obtener un ejemplo completo que demuestra cada paso general.

Crea tu objeto de canalización

Un programa de Dataflow comienza, por lo general, con la creación de un objeto Pipeline.

En los SDK de Dataflow, cada canalización se representa con un objeto explícito del tipo Pipeline. Cada objeto Pipeline es una entidad independiente que encapsula los datos sobre los que opera la canalización y las transformaciones que se aplican a esos datos.

Java

A fin de crear una canalización, declara un objeto Pipeline y pasa algunas opciones de configuración. Las opciones de configuración se pasan cuando creas un objeto de tipo PipelineOptions, que puedes compilar con el método estático PipelineOptionsFactory.create().

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

Configura opciones de canalización

Usa las opciones de canalización para configurar diferentes aspectos de tu canalización. Estas pueden incluir los siguientes aspectos:

  • Dónde se ejecuta tu canalización
  • Dónde tu trabajo de canalización habilita los archivos por etapa
  • Con qué proyecto de Cloud Platform se asocia tu canalización
  • Cuántas instancias de Compute Engine usa tu canalización como trabajadores

Las propiedades de opción de canalización incluyen información sobre tu proyecto de Cloud Dataflow que requiere el servicio de Cloud Dataflow, como tu ID del proyecto y las ubicaciones de las etapas de pruebas de Cloud Storage. Las opciones de canalización también te permiten controlar cuántos trabajadores debe asignar el servicio de Dataflow a tu trabajo de canalización y a dónde dirigir los mensajes de estado de tu trabajo de canalización.

El ejecutor de canalización es una propiedad clave en las opciones de canalización que determina dónde se ejecuta tu canalización (en el servicio de Cloud Dataflow o de forma local). La propiedad de ejecutor de canalización también especifica si la ejecución de tu canalización debe ser asíncrona o de bloqueo.

Java

Si bien puedes establecer las propiedades del objeto PipelineOptions de forma directa dentro de tu programa de canalización con métodos definidores (PipelineOptions.set[OptionName]), se recomienda pasar los valores con las opciones de línea de comandos. El SDK de Dataflow para Java proporciona una clase PipelineOptionsFactory que analiza y valida las opciones de línea de comandos pasadas en tu canalización. Con el uso de las opciones de línea de comandos para determinar el PipelineRunner y otros campos en PipelineOptions en el entorno de ejecución, puedes usar el mismo código para construir y ejecutar tu canalización de forma local y en la nube.

Consulta Especifica los parámetros de ejecución a fin de obtener más información sobre cómo establecer las opciones de canalización de manera programática para el modo de ejecución local o en la nube. La canalización de ejemplo de WordCount también muestra cómo definir las opciones de canalización en el entorno de ejecución con las opciones de línea de comandos.

Lee datos en tu canalización

A fin de crear la PCollection inicial de tu canalización, aplica una transformación de raíz a tu objeto de canalización. Una transformación de raíz crea una PCollection a partir de una fuente de datos externa o algunos datos locales que especifiques.

Java

Existen dos tipos de transformaciones de raíz en el SDK de Dataflow para Java: Read y Create. Las transformaciones Read leen datos desde una fuente externa, como BigQuery o un archivo de texto en Google Cloud Storage. Las transformaciones Create crean una PCollection a partir de una java.util.Collection en memoria.

El siguiente código de ejemplo muestra cómo apply una transformación de raíz TextIO.Read para leer datos desde un archivo de texto en Google Cloud Storage. La transformación se aplica a un objeto p de Pipeline y muestra un conjunto de datos de canalización en el formato de una PCollection<String>.

PCollection<String> lines = p.apply(
  TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));

Aplica transformaciones para procesar datos de canalización

Para usar transformaciones en tu canalización, las aplicas a la PCollection que deseas transformar.

Java

Para aplicar una transformación, llama al método apply en cada PCollection que deseas procesar y pasa cada objeto de transformación deseado como un argumento.

El SDK de Dataflow contiene un número de transformaciones diferentes que puedes aplicar a las PCollection de tu canalización. Incluyen transformaciones principales de propósito general, como ParDo o Combine. También existen transformaciones compuestas predefinidas incluidas en el SDK, que combinan una o más de las transformaciones principales en un patrón de procesamiento útil, como contar o combinar elementos en una colección. También puedes definir tus propias transformaciones compuestas más complejas para adaptarlas al caso práctico de tu canalización.

Java

En el SDK de Dataflow para Java, cada transformación es una subclase de la clase base PTransform. Cuando llamas a apply en una PCollection, pasas la PTransform que deseas usar como argumento.

El siguiente código muestra cómo apply una transformación a una PCollection de strings. Esta transformación es una personalizada definida por el usuario que revierte los contenidos de cada string y da como resultado una PCollection nueva que contiene las strings revertidas.

La entrada es una PCollection<String> llamada words. El código pasa una instancia de un objeto PTransform llamado ReverseWords para apply y guarda el valor de resultado como la PCollection<String> llamada reversedWords.

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

Escribe o entrega los resultados de tus datos de canalización finales

Una vez que tu canalización aplicó todas sus transformaciones, por lo general, necesitarás entregar los resultados. Para entregar los resultados de las PCollection finales de tu canalización, aplica una transformación Write a esa PCollection. Las transformaciones Write pueden entregar los elementos de una PCollection a un receptor de datos externos, como un archivo en Google Cloud Storage o una tabla de BigQuery. Puedes usar Write para entregar una PCollection en cualquier momento en tu canalización, aunque, por lo general, escribes datos al final de tu canalización.

Java

El siguiente código de ejemplo muestra cómo apply una transformación TextIO.Write para escribir una PCollection de String a un archivo de texto en Google Cloud Storage:

PCollection<String> filteredWords = ...;
filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));

Ejecuta tu canalización

Una vez que construiste tu canalización, usa el método run para ejecutar la canalización. Las canalizaciones se ejecutan de forma asíncrona: el programa que creas envía una especificación de tu canalización a un ejecutor de canalización, que luego construye y ejecuta la serie de operaciones de canalización. Puedes especificar el lugar en que se ejecuta tu canalización: puede ser de forma local por motivos de prueba y depuración, o en el servicio administrado de Cloud Dataflow. Consulta Especifica los parámetros de ejecución para obtener más información sobre los ejecutores de canalización, la configuración de las opciones de canalización y la ejecución local y en la nube.

En los SDK de Dataflow, especifica un PipelineRunner en tus opciones de canalización cuando crees tu objeto Pipeline. Cuando terminaste de construir tu canalización, invoca run en tu objeto de canalización de la siguiente manera:

Java

p.run();
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.