Las plantillas de Dataflow usan parámetros de entorno de ejecución para aceptar valores que solo están disponibles durante la ejecución de la canalización. Para personalizar la ejecución de una canalización con plantilla, puedes pasar estos parámetros a las funciones que se ejecutan dentro de la canalización (como un DoFn
).
Para crear una plantilla desde tu canalización de Apache Beam, debes modificar el código de la canalización de forma que admita los parámetros de entorno de ejecución:
- Usa
ValueProvider
para todas las opciones de canalización que deseas configurar o usar en el entorno de ejecución. - Llama a los métodos de E/S que aceptan parámetros de entorno de ejecución donde desees parametrizar tu canalización.
- Usa objetos
DoFn
que acepten parámetros de entorno de ejecución.
Luego, crea tu plantilla y habilítala por etapas.
Parámetros de entorno de ejecución y la interfaz de ValueProvider
La interfaz de ValueProvider
permite que las canalizaciones acepten parámetros de entorno de ejecución. Apache Beam proporciona tres tipos de objetos ValueProvider
.
Nombre | Descripción |
---|---|
RuntimeValueProvider |
Puedes usar Usa |
StaticValueProvider |
Usa |
NestedValueProvider |
Utiliza Nota: El SDK de Apache Beam para Python no admite |
Modifica tu código para usar parámetros de entorno de ejecución
En esta sección, se explica cómo utilizar ValueProvider
, StaticValueProvider
y NestedValueProvider
.
Usa ValueProvider en tus opciones de canalización
Usa ValueProvider
para todas las opciones de canalización que deseas configurar o usar en el entorno de ejecución.
Por ejemplo, el siguiente fragmento de código de WordCount
no admite parámetros de entorno de ejecución. El código agrega una opción de archivo de entrada, crea una canalización y lee líneas del archivo de entrada:
Java: SDK 2.x
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
Si deseas agregar la compatibilidad con parámetros de entorno de ejecución, modifica la opción de archivo de entrada para usar ValueProvider
.
Java: SDK 2.x
Usa ValueProvider<String>
en lugar de String
como el tipo de la opción de archivo de entrada.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Reemplaza add_argument
por add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
Usa ValueProvider en tus funciones
Si deseas usar los valores de los parámetros de entorno de ejecución en tus propias funciones, actualízalas para usar los parámetros de ValueProvider
.
El siguiente ejemplo contiene una opción de ValueProvider
que es un número entero, además de una función simple que suma un número entero. La función depende del número entero ValueProvider
. Durante la ejecución, la canalización aplica MySumFn
a cada número entero en una PCollection
que contenga [1, 2, 3]
. Si el valor del entorno de ejecución es 10, la PCollection
resultante contendrá [11, 12, 13]
.
Java: SDK 2.x
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Java: SDK 1.x
Usa StaticValueProvider
Para proporcionar un valor estático a tu canalización, usa StaticValueProvider
.
En este ejemplo, se usa MySumFn
, que es un DoFn
que toma un valor ValueProvider<Integer>
. Si conoces el valor del parámetro con anticipación, puedes usar StaticValueProvider
para especificar tu valor estático como un ValueProvider
.
Java: SDK 2.x
Este código obtiene el valor en el entorno de ejecución de la canalización:
.apply(ParDo.of(new MySumFn(options.getInt())))
En su lugar, puedes usar StaticValueProvider
con un valor estático:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Este código obtiene el valor en el entorno de ejecución de la canalización:
beam.ParDo(MySumFn(user_options.templated_int))
En su lugar, puedes usar StaticValueProvider
con un valor estático:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Java: SDK 1.x
También puedes usar StaticValueProvider
cuando implementes un módulo E/S que admita parámetros normales y de entorno de ejecución.
StaticValueProvider
reduce la duplicación de código que se genera a causa de la implementación de dos métodos similares.
Java: SDK 2.x
El código fuente para este ejemplo es de TextIO.java de Apache Beam en GitHub.
// Create a StaticValueProviderfrom a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
En este ejemplo, hay un único constructor que acepta un argumento de string
o de ValueProvider
. Si el argumento es una string
, se convierte en un StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, basestring): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Java: SDK 1.x
Usa NestedValueProvider
Nota: El SDK de Apache Beam para Python no admite NestedValueProvider
.
Para procesar un valor de otro objeto ValueProvider
, usa NestedValueProvider
.
NestedValueProvider
toma un ValueProvider
y un traductor SerializableFunction
como entrada. Cuando llamas a .get()
en un NestedValueProvider
, el traductor crea un nuevo valor basado en el valor ValueProvider
. Esta traducción te permite usar un valor ValueProvider
para crear el valor final que deseas:
- Ejemplo 1: El usuario proporciona el nombre de archivo
file.txt
. La transformación antepone la ruta de accesogs://directory_name/
al nombre del archivo. Si se llama a.get()
, se muestrags://directory_name/file.txt
. - Ejemplo 2: El usuario proporciona una substring para una consulta de BigQuery, por ejemplo, una fecha específica. La transformación usa la substring para crear la consulta completa. Si se llama a
.get()
, se muestra la consulta completa.
Nota: NestedValueProvider
solo acepta una entrada de valor. No puedes usar NestedValueProvider
para combinar dos valores diferentes.
En el siguiente código, se usa NestedValueProvider
para implementar el primer ejemplo: el usuario proporciona el nombre de un archivo, y la transformación antepone la ruta de acceso al nombre del archivo.
Java: SDK 2.x
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Python
El SDK de Apache Beam para Python no admite NestedValueProvider
.
Java: SDK 1.x
Metadatos
Puedes ampliar tus plantillas con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla. Si deseas crear metadatos para tu plantilla, debes seguir estos pasos:
- Crea un archivo con formato JSON que se llame
<template-name>_metadata
con los parámetros de la tabla que aparece a continuación.Nota: No uses el nombre
<template-name>_metadata.json
para el archivo que creas. Si bien el archivo contiene JSON, no puede terminar en la extensión de archivo.json
. - Almacena el archivo JSON en Cloud Storage en la misma carpeta que la plantilla.
Nota: La plantilla debe almacenarse en
<template-name>
, y los metadatos, en<template-name>_metadata
.
Parámetros de metadatos
Clave de parámetro | Obligatorio | Descripción del valor | |
---|---|---|---|
name |
Sí | El nombre de tu plantilla | |
description |
No | Un breve párrafo de texto que describe las plantillas | |
parameters |
No. El valor predeterminado es un arreglo vacío | Un arreglo de parámetros adicionales que usará la plantilla | |
name |
Sí | El nombre del parámetro que se usa en tu plantilla | |
label |
Sí | Una etiqueta legible que se usará en la IU para etiquetar el parámetro | |
helpText |
Sí | Un breve párrafo de texto que describe el parámetro | |
isOptional |
No. El valor predeterminado es falso | false si el parámetro es obligatorio y true si el parámetro es opcional |
|
regexes |
No. El valor predeterminado es un arreglo vacío | Un arreglo de expresiones regulares POSIX-egrep en formato de string que se usará para validar el valor del parámetro Por ejemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] es una expresión regular individual que valida que el valor comienza con una letra y, luego, tiene uno o más caracteres |
Ejemplo de archivo de metadatos
El servicio de Dataflow usa los siguientes metadatos para validar los parámetros personalizados de la plantilla de WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Puedes descargar este archivo de metadatos desde el directorio de plantillas de Dataflow.
Parámetros de entorno de ejecución y E/S de canalización
Java: SDK 2.x
Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider
. A fin de determinar la compatibilidad con un conector y un método específicos, consulta la documentación de referencia de la API para el conector de E/S. Los métodos compatibles tienen una sobrecarga con un ValueProvider
. Si un método no tiene una sobrecarga, no admite parámetros de entorno de ejecución. Los siguientes conectores de E/S tienen, al menos, compatibilidad parcial con ValueProvider
:
- E/S que se basan en archivos:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(requiere el SDK 2.3.0 o versiones posteriores)PubSubIO
SpannerIO
* Nota: Si deseas ejecutar una canalización por lotes que se lea desde BigQuery, debes usar .withTemplateCompatibility()
en todas las operaciones de lectura de BigQuery.
Python
Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider
. A fin de determinar la compatibilidad con los conectores de E/S y sus métodos, consulta la documentación de referencia de la API para el conector. Los siguientes conectores de E/S aceptan parámetros de entorno de ejecución:
- E/S que se basan en archivos:
textio
,avroio
,tfrecordio
Java: SDK 1.x
Crea plantillas y habilítalas por etapas
Después de escribir tu canalización, debes crear tu archivo de plantilla y habilitarlo por etapas. Usa el comando para tu versión del SDK.
Nota: Después de crear una plantilla y habilitarla por etapas, la ubicación de etapa de pruebas contendrá archivos adicionales que son necesarios para ejecutar tu plantilla. Si borras la ubicación de etapa de pruebas, la ejecución de la plantilla fallará.
Java: SDK 2.x
Este comando de Maven crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --templateLocation
.
Reemplaza
YOUR_PROJECT_ID
por el ID del proyecto.Reemplaza
YOUR_BUCKET_NAME
con el nombre del bucket de Cloud Storage.Reemplaza
YOUR_TEMPLATE_NAME
con el nombre de tu plantilla.Reemplaza
com.example.myclass
con la clase de Java.Verifica que la ruta de
templateLocation
sea correcta.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=YOUR_PROJECT_ID \ --stagingLocation=gs://YOUR_BUCKET_NAME/staging \ --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
Python
Este comando de Python crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --template_location
. Realiza los siguientes cambios en el comando:
Reemplaza
YOUR_PROJECT_ID
por el ID del proyecto.Reemplaza
YOUR_BUCKET_NAME
con el nombre del bucket de Cloud Storage.Reemplaza
YOUR_TEMPLATE_NAME
con el nombre de tu plantilla.Reemplaza
examples.mymodule
con tu módulo de Python.Verifica que la ruta de
template_location
sea correcta.
python -m examples.mymodule \ --runner DataflowRunner \ --project YOUR_PROJECT_ID \ --staging_location gs://YOUR_BUCKET_NAME/staging \ --temp_location gs://YOUR_BUCKET_NAME/temp \ --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
Java: SDK 1.x
Después de crear tu plantilla y habilitarla por etapas, el siguiente paso es ejecutarla.