Tu programa de Dataflow expresa una canalización de procesamiento de datos de principio a fin. En esta sección, se explican los mecanismos del uso de las clases en el SDK de Dataflow para compilar una canalización. A fin de construir una canalización con las clases en los SDK de Dataflow, tu programa necesitará que realices los siguientes pasos generales:
- Crea un objeto
Pipeline
. - Usa una transformación Read o Create a fin de crear una o más
PCollection
para tus datos de canalización - Aplica transformaciones a cada
PCollection
. Las transformaciones pueden cambiar, filtrar, agrupar, analizar o procesar los elementos de unaPCollection
. Cada transformación crea una nuevaPCollection
de salida, a la que puedes aplicar transformaciones adicionales hasta que se complete el procesamiento. - Escribe o entrega como resultado las
PCollection
finales ya transformadas. - Ejecuta la canalización
Consulta la canalización simple de ejemplo para obtener un ejemplo completo que demuestra cada paso general.
Crea el objeto de canalización
Un programa de Dataflow comienza, por lo general, con la creación de un objeto Pipeline
.
En los SDK de Dataflow, cada canalización está representada por un objeto explícito del tipo Pipeline
. Cada objeto Pipeline
es una entidad independiente que encapsula los datos con los que opera la canalización y las transformaciones que se aplican a esos datos.
Java
Para crear una canalización, declara un objeto Pipeline
y pasa algunas opciones de configuración. Para pasar las opciones de configuración, crea un objeto del tipo PipelineOptions
, que puedes compilar mediante el método estático PipelineOptionsFactory.create()
.
// Start by defining the options for the pipeline. PipelineOptions options = PipelineOptionsFactory.create(); // Then create the pipeline. Pipeline p = Pipeline.create(options);
Configura opciones de canalización
Usa las opciones de canalización para configurar diferentes aspectos de tu canalización. Estas pueden incluir los siguientes aspectos:
- Dónde se ejecuta tu canalización
- Dónde tu trabajo de canalización habilita los archivos por etapa
- Con qué proyecto de Cloud Platform se asocia tu canalización
- Cuántas instancias de Compute Engine usa tu canalización como trabajadores
Las propiedades de opción de canalización incluyen información sobre tu proyecto de Cloud Dataflow que requiere el servicio de Cloud Dataflow, como tu ID del proyecto y las ubicaciones de las etapas de pruebas de Cloud Storage. Las opciones de canalización también te permiten controlar cuántos trabajadores debe asignar el servicio de Dataflow a tu trabajo de canalización y a dónde dirigir los mensajes de estado de tu trabajo de canalización.
El ejecutor de canalización es una propiedad clave en las opciones de canalización que determina dónde se ejecuta tu canalización (en el servicio de Cloud Dataflow o de forma local). La propiedad de ejecutor de canalización también especifica si la ejecución de tu canalización debe ser asíncrona o de bloqueo.
Java
Si bien puedes establecer las propiedades del objeto PipelineOptions
directamente en tu canalización mediante métodos set (PipelineOptions.set[OptionName]
), se recomienda pasar los valores por medio de las opciones de línea de comandos.
El SDK de Dataflow para Java proporciona una clase PipelineOptionsFactory
que analiza y valida las opciones de línea de comandos que se pasan a la canalización. Si usas opciones de línea de comandos para determinar PipelineRunner
y otros campos en PipelineOptions
en el entorno de ejecución, puedes usar el mismo código para construir y ejecutar tu canalización de manera local y en la nube.
Consulta Especifica los parámetros de ejecución a fin de obtener más información sobre cómo establecer las opciones de canalización de manera programática para el modo de ejecución local o en la nube. En la canalización WordCount de ejemplo, también se muestra cómo definir las opciones de canalización en el entorno de ejecución mediante las opciones de línea de comandos.
Lee datos en la canalización
A fin de crear la PCollection
inicial de la canalización, aplica una transformación raíz al objeto de canalización. Una transformación raíz crea una PCollection
a partir de una fuente de datos externa o de algunos datos locales que especifiques.
Java
Existen dos tipos de transformaciones raíz en el SDK de Dataflow para Java: Read
y Create
. Las transformaciones Read
leen datos de una fuente externa, como BigQuery o un archivo de texto en Google Cloud Storage. Las transformaciones Create
crean una PCollection
a partir de una java.util.Collection
en memoria.
En el siguiente ejemplo, se muestra cómo apply
una transformación raíz a TextIO.Read
para leer datos de un archivo de texto en Google Cloud Storage. La transformación se aplica a un objeto Pipeline
p
y muestra un conjunto de datos de canalización en el formato de una PCollection<String>
:
PCollection<String> lines = p.apply( TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));
Aplica transformaciones para procesar datos de canalización
Si deseas usar transformaciones en tu canalización, las debes aplicar a la PCollection
que deseas transformar.
Java
Para aplicar una transformación, debes llamar al método apply
en cada PCollection
que deseas procesar y pasar el objeto de transformación que quieres como un argumento.
Los SDK de Dataflow contienen varias transformaciones diferentes que puedes aplicar a las PCollection
de la canalización. Dentro de ellas, se incluyen las transformaciones principales de uso general, como ParDo o Combine. También existen transformaciones compuestas predefinidas incluidas en el SDK, que combinan una o más de las transformaciones principales en un patrón de procesamiento útil, como contar o combinar elementos en una colección. También puedes definir tus propias transformaciones compuestas más complejas, de modo que se adapten al caso práctico de tu canalización.
Java
En el SDK de Dataflow para Java, cada transformación es una subclase de la clase principal PTransform
.
Cuando llamas a apply
en una PCollection
, pasas la PTransform
que quieres usar como argumento.
En el siguiente código, se muestra cómo apply
una transformación a una PCollection
de strings. Esta es una transformación personalizada que define el usuario. Con ella, se revierten los contenidos de cada string, y el resultado es una PCollection
nueva que contiene las strings revertidas.
La entrada es una PCollection<String>
llamada words
; el código pasa una instancia de un objeto PTransform
llamado ReverseWords
para apply
, y guarda el valor de retorno como la PCollection<String>
llamada reversedWords
.
PCollection<String> words = ...; PCollection<String> reversedWords = words.apply(new ReverseWords());
Escribe o entrega los resultados de los datos de canalización finales
Una vez que tu canalización aplicó todas sus transformaciones, por lo general, necesitarás entregar los resultados.
Para entregar como resultado las PCollection
finales de la canalización, aplica una transformación Write
a esa PCollection
. Las transformaciones Write
pueden enviar los elementos de una PCollection
a un receptor de datos externo, como un archivo en Google Cloud Storage o una tabla de BigQuery. Puedes usar Write
para entregar una PCollection
en cualquier momento de la canalización, aunque lo usual es que escribas los datos al final de la canalización.
Java
En el siguiente código de ejemplo, se muestra cómo apply
una transformación TextIO.Write
para escribir una PCollection
de String
en un archivo de texto en Google Cloud Storage:
PCollection<String> filteredWords = ...; filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));
Ejecuta la canalización
Una vez construida la canalización, usa el método run
para ejecutarla. Las canalizaciones se ejecutan de forma asíncrona: el programa que creas envía una especificación de tu canalización a un ejecutor de canalización, que luego construye y ejecuta la serie real de operaciones de canalización. Puedes especificar el lugar en el que deseas que se ejecute la canalización: puede ser de forma local con fines de prueba y depuración, o en el servicio administrado de Cloud Dataflow. Consulta Especifica los parámetros de ejecución para obtener más información sobre los ejecutores de canalizaciones, la configuración de las opciones de canalización, y la ejecución local y en la nube.
En los SDK de Dataflow, especifica un PipelineRunner
en las opciones de tu canalización cuando crees el objeto Pipeline
. Cuando termines de construir la canalización, invoca a run
en el objeto de canalización de la siguiente manera:
Java
p.run();