Crea plantillas clásicas de Dataflow

Con este documento, aprenderás a crear una plantilla clásica personalizada a partir de tu código de canalización de Dataflow. Las plantillas clásicas empaquetan las canalizaciones de Dataflow existentes a fin de crear plantillas reutilizables que puedas personalizar para cada trabajo mediante el cambio de parámetros de canalización específicos. En lugar de escribir la plantilla, debes usar un comando para generar la plantilla a partir de una canalización existente.

A continuación, se presenta una breve descripción general del proceso. Los detalles de este proceso se proporcionan en secciones posteriores.

  1. En tu código de canalización, usa la interfaz ValueProvider para todas las opciones de canalización que deseas configurar o usar en el entorno de ejecución. Usa objetos DoFn que acepten parámetros de entorno de ejecución.
  2. Amplía tu plantilla con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla clásica. Algunos ejemplos de estos metadatos son el nombre de la plantilla clásica personalizada y los parámetros opcionales.
  3. Comprueba si los conectores de E/S de la canalización admiten objetos ValueProvider y realiza los cambios necesarios.
  4. Crea la plantilla clásica personalizada y habilítala por etapas.
  5. Ejecuta la plantilla clásica personalizada.

Para obtener información sobre los diferentes tipos de plantillas de Dataflow, sus beneficios y cuándo elegir una plantilla clásica, consulta Plantillas de Dataflow.

Permisos necesarios para ejecutar una plantilla clásica

Los permisos que necesitas a fin de ejecutar la plantilla clásica de Dataflow dependen del lugar en el que ejecutas la plantilla y de si la fuente y el receptor de la canalización están en otro proyecto.

Para obtener más información sobre cómo ejecutar canalizaciones de Dataflow de forma local o mediante Google Cloud, consulta Seguridad y permisos de Dataflow.

Para obtener una lista de las funciones y los permisos de Dataflow, consulta Control de acceso de Dataflow.

Limitaciones

  • La siguiente opción de canalización no es compatible con las plantillas clásicas. Si necesitas controlar la cantidad de subprocesos de aprovechamiento de trabajadores, usa las plantillas flexibles.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • El ejecutor de Dataflow no admite las opciones de ValueProvider para los temas de Pub/Sub y los parámetros de suscripción. Si necesitas opciones de Pub/Sub en los parámetros del entorno de ejecución, usa las plantillas de Flex.

Parámetros de entorno de ejecución y la interfaz de ValueProvider

La interfaz de ValueProvider permite que las canalizaciones acepten parámetros de entorno de ejecución. Apache Beam proporciona tres tipos de objetos ValueProvider.

Nombre Descripción
RuntimeValueProvider

RuntimeValueProvider es el tipo de ValueProvider predeterminado. RuntimeValueProvider permite que tu canalización acepte un valor que solo está disponible durante la ejecución de la canalización. El valor no está disponible durante la construcción de la canalización, por lo que no puedes usarlo para cambiar el grafo de flujo de trabajo de tu canalización.

Puedes usar isAccessible() para verificar si el valor de un objeto ValueProvider está disponible. Si llamas a get() antes de la ejecución de la canalización, Apache Beam muestra un error:
Value only available at runtime, but accessed from a non-runtime context.

Usa RuntimeValueProvider cuando no conozcas el valor de antemano. Para cambiar los valores de los parámetros en el entorno de ejecución, no establezcas valores para los parámetros en la plantilla. Establece los valores para los parámetros cuando crees trabajos a partir de la plantilla.

StaticValueProvider

StaticValueProvider te permite proporcionar un valor estático a la canalización. El valor está disponible durante la construcción de la canalización, por lo que puedes usarlo para cambiar el grafo de flujo de trabajo de tu canalización.

Usa StaticValueProvider cuando conozcas el valor de antemano. Consulta la sección de StaticValueProvider para ver ejemplos.

NestedValueProvider

NestedValueProvider te permite procesar un valor de otro objeto ValueProvider. NestedValueProvider une un ValueProvider, y el tipo del ValueProvider unido determina si se puede acceder al valor durante la creación de la canalización.

Utiliza NestedValueProvider cuando desees usar el valor para procesar otro valor en el entorno de ejecución. Consulta la sección de NestedValueProvider para obtener ejemplos.

Usa parámetros de entorno de ejecución en tu código de canalización

En esta sección, se explica cómo utilizar ValueProvider, StaticValueProvider y NestedValueProvider.

Usa ValueProvider en las opciones de canalización

Usa ValueProvider para todas las opciones de canalización que deseas configurar o usar en el entorno de ejecución.

Por ejemplo, el siguiente fragmento de código de WordCount no admite parámetros de entorno de ejecución. El código agrega una opción de archivo de entrada, crea una canalización y lee líneas del archivo de entrada:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Si deseas agregar la compatibilidad con parámetros de entorno de ejecución, modifica la opción de archivo de entrada para usar ValueProvider.

Java

Usa ValueProvider<String> en lugar de String como el tipo de la opción de archivo de entrada.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Reemplaza add_argument por add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Usa ValueProvider en tus funciones

Si deseas usar los valores de los parámetros de entorno de ejecución en tus propias funciones, actualízalas para usar los parámetros de ValueProvider.

El siguiente ejemplo contiene una opción de ValueProvider que es un número entero, además de una función simple que suma un número entero. La función depende del número entero ValueProvider. Durante la ejecución, la canalización aplica MySumFn a cada número entero en una PCollection que contenga [1, 2, 3]. Si el valor del entorno de ejecución es 10, la PCollection resultante contendrá [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Usa StaticValueProvider

Para proporcionar un valor estático a tu canalización, usa StaticValueProvider.

En este ejemplo, se usa MySumFn, que es un DoFn que toma un valor ValueProvider<Integer>. Si conoces el valor del parámetro con anticipación, puedes usar StaticValueProvider para especificar tu valor estático como un ValueProvider.

Java

Este código obtiene el valor en el entorno de ejecución de la canalización:

  .apply(ParDo.of(new MySumFn(options.getInt())))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Este código obtiene el valor en el entorno de ejecución de la canalización:

  beam.ParDo(MySumFn(user_options.templated_int))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

También puedes usar StaticValueProvider cuando implementes un módulo E/S que admita parámetros normales y de entorno de ejecución. StaticValueProvider reduce la duplicación de código que genera la implementación de dos métodos similares.

Java

El código fuente para este ejemplo es de TextIO.java de Apache Beam en GitHub.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

En este ejemplo, hay un único constructor que acepta un argumento de string o de ValueProvider. Si el argumento es una string, se convierte en un StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Usa NestedStaticValueProvider

Para procesar un valor de otro objeto ValueProvider, usa NestedValueProvider.

NestedValueProvider toma un ValueProvider y un traductor SerializableFunction como entrada. Cuando llamas a .get() en un NestedValueProvider, el traductor crea un nuevo valor basado en el valor ValueProvider. Esta traducción te permite usar un valor ValueProvider para crear el valor final que deseas.

En el siguiente ejemplo, el usuario proporciona el nombre de archivo file.txt. La transformación antepone la ruta de acceso gs://directory_name/ al nombre del archivo. Si se llama a .get(), se muestra gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Usa metadatos en tu código de canalización

Puedes ampliar tu plantilla con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla. Si deseas crear metadatos para tu plantilla, sigue estos pasos:

  1. Crea un archivo con formato JSON que se llame TEMPLATE_NAME_metadata con los parámetros de Parámetros de metadatos y el formato en Archivo de metadatos de ejemplo. Reemplaza TEMPLATE_NAME con el nombre de tu plantilla.

    Asegúrate de que el archivo de metadatos no tenga una extensión de nombre de archivo. Por ejemplo, si el nombre de tu plantilla es myTemplate, su archivo de metadatos debe ser myTemplate_metadata.

  2. Almacena el archivo de metadatos en Cloud Storage en la misma carpeta que la plantilla.

Parámetros de metadatos

Clave de parámetro Obligatorio Descripción del valor
name El nombre de tu plantilla
description No Un breve párrafo de texto que describe la plantilla
streaming No Si es true, esta plantilla admite transmisión. El valor predeterminado es false.
supportsAtLeastOnce No Si es true, esta plantilla admite el procesamiento al menos una vez. El valor predeterminado es false. Establece este parámetro en true si la plantilla está diseñada para funcionar con el modo de transmisión al menos una vez.
supportsExactlyOnce No Si es true, esta plantilla admite el procesamiento de tipo “exactamente una vez”. El valor predeterminado es true.
defaultStreamingMode No El modo de transmisión predeterminado, para plantillas que admiten el modo al menos una vez y el modo "exactamente una vez". Usa uno de los siguientes valores: "AT_LEAST_ONCE", "EXACTLY_ONCE". Si no se especifica, el modo de transmisión predeterminado es “exactamente una vez”.
parameters No Un array de parámetros adicionales que usa la plantilla De forma predeterminada, se usa un array vacío.
name El nombre del parámetro que se usa en tu plantilla.
label Una string legible que se usa en la consola de Google Cloud para etiquetar el parámetro.
helpText Un breve párrafo de texto que describe el parámetro.
isOptional No false si el parámetro es obligatorio y true si el parámetro es opcional A menos que se configure con un valor, isOptional se establece de forma predeterminada en false. Si no incluyes esta clave de parámetro en tus metadatos, estos se convierten en un parámetro obligatorio.
regexes No Un array de expresiones regulares POSIX-egrep en formato de string que se usará para validar el valor del parámetro. Por ejemplo, ["^[a-zA-Z][a-zA-Z0-9]+"] es una expresión regular individual que valida que el valor comienza con una letra y, luego, tiene uno o más caracteres. De forma predeterminada, se usa un array vacío.

Ejemplo de archivo de metadatos

Java

El servicio de Dataflow usa los siguientes metadatos para validar los parámetros personalizados de la plantilla de WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

El servicio de Dataflow usa los siguientes metadatos para validar los parámetros personalizados de la plantilla de WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Puedes descargar archivos de metadatos para las plantillas proporcionadas por Google desde el directorio de plantillas de Dataflow.

Conectores de E/S de canalización compatibles y ValueProvider

Java

Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider. A fin de determinar la compatibilidad con un conector y un método específicos, consulta la documentación de referencia de la API para el conector de E/S. Los métodos compatibles tienen una sobrecarga con un ValueProvider. Si un método no tiene una sobrecarga, no admite parámetros de entorno de ejecución. Los siguientes conectores de E/S tienen, al menos, compatibilidad parcial con ValueProvider:

  • E/S que se basan en archivos: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requiere el SDK 2.3.0 o posterior)
  • PubSubIO
  • SpannerIO

Python

Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider. A fin de determinar la compatibilidad con los conectores de E/S y sus métodos, consulta la documentación de referencia de la API para el conector. Los siguientes conectores de E/S aceptan parámetros de entorno de ejecución:

  • E/S basadas en archivos: textio, avroio, tfrecordio

Crea una plantilla clásica y almacénala en etapa intermedia

Después de escribir tu canalización, debes crear tu archivo de plantilla y habilitarlo por etapas. Después de crear una plantilla y habilitarla por etapas, la ubicación de etapa de pruebas contendrá archivos adicionales que son necesarios para ejecutar tu plantilla. Si borras la ubicación de etapa de pruebas, la plantilla no se ejecutará. El trabajo de Dataflow no se ejecuta de inmediato después de habilitar por etapas la plantilla. Para ejecutar un trabajo de Dataflow personalizado basado en plantillas, puedes usar la consola de Google Cloud, la API de REST de Dataflow o la CLI de gcloud.

En el siguiente ejemplo, se muestra cómo almacenar en etapa intermedia un archivo de plantilla:

Java

Este comando de Maven crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Verifica que la ruta de templateLocation sea correcta. Reemplaza lo siguiente:

  • com.example.myclass: Tu clase Java
  • PROJECT_ID: El ID de tu proyecto
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • TEMPLATE_NAME: El nombre de tu plantilla
  • REGION: la región en la que se implementará tu trabajo de Dataflow

Python

Este comando de Python crea una plantilla y la habilita por etapas en la ubicación de Cloud Storage especificada con --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verifica que la ruta de template_location sea correcta. Reemplaza lo siguiente:

  • examples.mymodule: Tu módulo de Python
  • PROJECT_ID: El ID de tu proyecto
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • TEMPLATE_NAME: El nombre de tu plantilla
  • REGION: la región en la que se implementará tu trabajo de Dataflow

Después de crear tu plantilla y habilitarla por etapas, el siguiente paso es ejecutarla.