Crea plantillas

Las plantillas de Cloud Dataflow usan parámetros de entorno de ejecución para aceptar valores que solo están disponibles durante la ejecución de la canalización. Para personalizar la ejecución de una canalización con plantilla, puedes pasar estos parámetros a las funciones que se ejecutan dentro de la canalización (como un DoFn).

Para crear una plantilla desde tu canalización de Apache Beam, debes modificar tu código de canalización de forma que admita los parámetros de entorno de ejecución:

Luego, crea tu plantilla y habilítala por etapas.

Parámetros de entorno de ejecución y la interfaz de ValueProvider

La interfaz de ValueProvider permite que las canalizaciones acepten parámetros de entorno de ejecución. Apache Beam proporciona tres tipos de objetos ValueProvider.

Nombre Descripción
RuntimeValueProvider

RuntimeValueProvider es el tipo predeterminado de ValueProvider. RuntimeValueProvider permite que tu canalización acepte un valor que solo está disponible durante la ejecución de la canalización. El valor no está disponible durante la construcción de la canalización, por lo que no puedes usarlo para cambiar el grafo de flujo de trabajo de tu canalización.

Puedes usar isAccessible() para verificar si el valor de un ValueProvider está disponible. Si llamas a get() antes de la ejecución de la canalización, Apache Beam mostrará un error:
Value only available at runtime, but accessed from a non-runtime context.

Usa RuntimeValueProvider cuando no conozcas el valor de antemano.

StaticValueProvider

StaticValueProvider te permite proporcionar un valor estático a tu canalización. El valor está disponible durante la construcción de la canalización, por lo que puedes usarlo para cambiar el grafo de flujo de trabajo de tu canalización.

Usa StaticValueProvider cuando conozcas el valor de antemano. Consulta la sección de StaticValueProvider para obtener ejemplos.

NestedValueProvider

NestedValueProvider te permite procesar un valor de otro objeto ValueProvider. NestedValueProvider une un ValueProvider, y el tipo del ValueProvider unido determina si se puede acceder al valor durante la construcción de la canalización.

Usa NestedValueProvider cuando desees usar el valor para procesar otro valor en el entorno de ejecución. Consulta la sección de NestedValueProvider para obtener ejemplos.

Nota: El SDK de Apache Beam para Python no admite NestedValueProvider.

Modifica tu código para usar parámetros de entorno de ejecución

En esta sección, se explica cómo usar ValueProvider, StaticValueProvider y NestedValueProvider.

Usa ValueProvider en tus opciones de canalización

Usa ValueProvider para todas las opciones de canalización que deseas configurar o usar en el entorno de ejecución.

Por ejemplo, el siguiente fragmento de código de WordCount no admite parámetros de entorno de ejecución. El código agrega una opción de archivo de entrada, crea una canalización y lee líneas del archivo de entrada:

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()));
    ...

Si deseas agregar la compatibilidad con parámetros de entorno de ejecución, modifica la opción de archivo de entrada para usar ValueProvider.

Java: SDK 2.x

Usa ValueProvider<String> en lugar de String como el tipo de la opción de archivo de entrada.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Reemplaza add_argument por add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

Usa ValueProvider<String> en lugar de String como el tipo de la opción de archivo de entrada.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // Add .withoutValidation() when you use a RuntimeValueProvider with SDK 1.x.
    // The value may not be available at validation time.
    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation());
    ...

Usa ValueProvider en tus funciones

Si deseas usar los valores de los parámetros de entorno de ejecución en tus propias funciones, actualízalas para usar los parámetros de ValueProvider.

El siguiente ejemplo contiene una opción de ValueProvider que es un número entero, además de una función simple que suma un número entero. La función depende del número entero ValueProvider. Durante la ejecución, la canalización aplica MySumFn a cada número entero en una PCollection que contenga [1, 2, 3]. Si el valor de entorno de ejecución es 10, la PCollection resultante contendrá [11, 12, 13].

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.utils.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java: SDK 1.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply(MapElements.via(
        new SimpleFunction<Integer, String>() {
          public String apply(Integer i) {
            return i.toString();
          }
        }))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Usa StaticValueProvider

Para proporcionar un valor estático a tu canalización, usa StaticValueProvider.

En este ejemplo, se usa MySumFn, que es un DoFn que toma un valor ValueProvider<Integer>. Si conoces el valor del parámetro de antemano, puedes usar StaticValueProvider para especificar tu valor estático como un ValueProvider.

Java: SDK 2.x

Este código obtiene el valor en el entorno de ejecución de la canalización:

  .apply(ParDo.of(new MySumFn(options.getInt())))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Este código obtiene el valor en el entorno de ejecución de la canalización:

  beam.ParDo(MySumFn(user_options.templated_int))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java: SDK 1.x

Este código obtiene el valor en el entorno de ejecución de la canalización:

  .apply(ParDo.of(new MySumFn(options.getInt())))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

También puedes usar StaticValueProvider cuando implementes un módulo de E/S que admite parámetros regulares y parámetros de entorno de ejecución. StaticValueProvider reduce la duplicación de código que genera la implementación de dos métodos similares.

Java: SDK 2.x

El código fuente para este ejemplo es de TextIO.java de Apache Beam en GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

En este ejemplo, hay un único constructor que acepta un argumento de string o de ValueProvider. Si el argumento es una string, se convierte en un StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java: SDK 1.x

El código fuente para este ejemplo es de TextIO.java de Apache Beam en GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Usa NestedValueProvider

Nota: El SDK de Apache Beam para Python no admite NestedValueProvider.

Para procesar un valor de otro objeto ValueProvider, usa NestedValueProvider.

NestedValueProvider toma un ValueProvider y un traductor SerializableFunction como entrada. Cuando llamas a .get() en un NestedValueProvider, el traductor crea un nuevo valor basado en el valor de ValueProvider. Esta traducción te permite usar un valor de ValueProvider para crear el valor final que deseas:

  • Ejemplo 1: El usuario proporciona un nombre de archivo file.txt. La transformación antepone la ruta de acceso gs://directory_name/ al nombre del archivo. Llamar a .get() muestra gs://directory_name/file.txt.
  • Ejemplo 2: El usuario proporciona una substring para una consulta de BigQuery, por ejemplo, una fecha específica. La transformación usa la substring para crear la consulta completa. Llamar a .get() muestra la consulta completa.

Nota: NestedValueProvider solo acepta una entrada de valor. No puedes usar NestedValueProvider para combinar dos valores diferentes.

El siguiente código usa NestedValueProvider para implementar el primer ejemplo: el usuario proporciona un nombre de archivo y la transformación le antepone la ruta de acceso al nombre del archivo.

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

El SDK de Apache Beam para Python no admite NestedValueProvider.

Java: SDK 1.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply(TextIO.Write.named("OutputNums").to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Metadatos

Puedes ampliar tus plantillas con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla. Si deseas crear metadatos para tu plantilla, debes seguir estos pasos:

  1. Crea un archivo con formato JSON llamado <template-name>_metadata con los parámetros de la tabla a continuación.

    Nota: No uses el nombre <template-name>_metadata.json para el archivo que creas. Si bien el archivo contiene JSON, no puede terminar en la extensión de archivo .json.

  2. Almacena el archivo JSON en Cloud Storage en la misma carpeta que la plantilla.

    Nota: La plantilla debe almacenarse en <template-name> y los metadatos en <template-name>_metadata.

Parámetros de metadatos

Clave de parámetro Obligatorio Descripción del valor
name El nombre de tu plantilla
description No Un breve párrafo de texto que describe las plantillas
parameters No. El valor predeterminado es un arreglo vacío Un arreglo de parámetros adicionales que usará la plantilla
name El nombre del parámetro que se usa en tu plantilla
label Una etiqueta legible que se usará en la IU para etiquetar el parámetro
help_text Un breve párrafo de texto que describe el parámetro
is_optional No. El valor predeterminado es falso true si el parámetro es obligatorio y false si es opcional
regexes No. El valor predeterminado es un arreglo vacío Un arreglo de expresiones regulares POSIX-egrep en formato de string que se usará para validar el valor del parámetro. Por ejemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] es una expresión regular individual que valida que el valor comienza con una letra y luego tiene uno o más caracteres

Ejemplo de archivo de metadatos

{
  "name": "WordCount",
  "description": "An example pipeline that counts words in the input file.",
  "parameters": [{
    "name": "inputFile",
    "label": "Input Cloud Storage File(s)",
    "help_text": "Path of the file pattern glob to read from.",
    "regexes": ["^gs:\/\/[^\n\r]+$"],
    "is_optional": true
  },
  {
    "name": "output",
    "label": "Output Cloud Storage File Prefix",
    "help_text": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
    "regexes": ["^gs:\/\/[^\n\r]+$"]
  }]
}

Parámetros de entorno de ejecución y E/S de canalización

Java: SDK 2.x

Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider. A fin de determinar la compatibilidad con un conector y un método específicos, consulta la documentación de referencia de la API para el conector de E/S. Los métodos compatibles tienen una sobrecarga con un ValueProvider. Si un método no tiene una sobrecarga, no admite parámetros de entorno de ejecución. Los siguientes conectores de E/S tienen al menos compatibilidad parcial con ValueProvider:

  • IO basados en archivos: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requiere el SDK 2.3.0 o posterior)
  • PubSubIO
  • SpannerIO

* Nota: Si deseas ejecutar una canalización por lotes que lea desde BigQuery, debes usar .withTemplateCompatibility() en todas las operaciones de lectura de BigQuery.

Python

Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider. A fin de determinar la compatibilidad con los conectores de E/S y sus métodos, consulta la documentación de referencia de la API para el conector. Los siguientes conectores de E/S aceptan parámetros de entorno de ejecución:

  • IO basados en archivos: textio, avroio, tfrecordio

Java: SDK 1.x

La siguiente tabla contiene la lista completa de métodos que aceptan parámetros de entorno de ejecución.

E/S Método
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

* Solo puedes ejecutar las plantillas de canalización por lotes de BigQuery una vez, ya que el ID de trabajo de BigQuery se establece en el momento de la creación de la plantilla.

Crea plantillas y habilítalas por etapas

Después de escribir tu canalización, debes crear tu archivo de plantilla y habilitarlo por etapas. Usa el comando para tu versión del SDK.

Nota: Después de crear una plantilla y habilitarla por etapas, la ubicación de etapa de pruebas contendrá archivos adicionales que son necesarios para ejecutar tu plantilla. Si borras la ubicación de etapa de pruebas, la ejecución de la plantilla fallará.

Java: SDK 2.x

Este comando de Maven crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --templateLocation.

Reemplaza [YOUR_PROJECT_ID] por tu ID del proyecto y [YOUR_BUCKET_NAME] por el nombre de tu depósito de Cloud Storage.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
    

Python

Este comando de Python crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --template_location.

Reemplaza [YOUR_PROJECT_ID] por tu ID del proyecto y [YOUR_BUCKET_NAME] por el nombre de tu depósito de Cloud Storage.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project [YOUR_PROJECT_ID] \
    --staging_location gs://[YOUR_BUCKET_NAME]/staging \
    --temp_location gs://[YOUR_BUCKET_NAME]/temp \
    --template_location gs://[YOUR_BUCKET_NAME]/templates/mytemplate

Java: SDK 1.x

Este comando de Maven crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --dataflowJobFile.

Reemplaza [YOUR_PROJECT_ID] por tu ID del proyecto y [YOUR_BUCKET_NAME] por el nombre de tu depósito de Cloud Storage.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=TemplatingDataflowPipelineRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --dataflowJobFile=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
    

Después de crear tu plantilla y habilitarla por etapas, el siguiente paso es ejecutarla.

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.