Crear plantillas clásicas de Dataflow

En este documento, se explica cómo crear una plantilla clásica personalizada a partir del código de tu canalización de Dataflow. Las plantillas clásicas empaquetan las canalizaciones de Dataflow para crear plantillas reutilizables que puedes personalizar para cada trabajo cambiando parámetros específicos de la canalización. En lugar de escribir la plantilla, usa un comando para generarla a partir de una canalización.

A continuación, se incluye un breve resumen del proceso. En las secciones siguientes se ofrecen detalles sobre este proceso.

  1. En el código de la canalización, usa la interfaz ValueProvider para todas las opciones de la canalización que quieras definir o usar en el tiempo de ejecución. Usa objetos DoFn que acepten parámetros de tiempo de ejecución.
  2. Amplía tu plantilla con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla clásica. Entre los ejemplos de estos metadatos se incluyen el nombre de tu plantilla clásica personalizada y los parámetros opcionales.
  3. Comprueba si los conectores de entrada y salida de la canalización admiten objetos ValueProvider y haz los cambios necesarios.
  4. Crea y prepara la plantilla clásica personalizada.
  5. Ejecuta la plantilla clásica personalizada.

Para obtener información sobre los distintos tipos de plantillas de Dataflow, sus ventajas y cuándo elegir una plantilla clásica, consulta Plantillas de Dataflow.

Permisos necesarios para ejecutar una plantilla clásica

Los permisos que necesitas para ejecutar la plantilla clásica de Dataflow dependen de dónde la ejecutes y de si el origen y el receptor de la canalización están en otro proyecto.

Para obtener más información sobre cómo ejecutar canalizaciones de Dataflow de forma local o mediante Google Cloud Platform, consulta Seguridad y permisos de Dataflow.

Para ver una lista de los roles y permisos de Dataflow, consulta Control de acceso de Dataflow.

Limitaciones

  • La siguiente opción de canalización no se admite con las plantillas clásicas. Si necesitas controlar el número de hilos de la herramienta de trabajo, usa plantillas Flex.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • El runner de Dataflow no admite las opciones ValueProvider para los temas de Pub/Sub y los parámetros de suscripción. Si necesitas opciones de Pub/Sub en tus parámetros de entorno de ejecución, usa plantillas Flex.

Acerca de los parámetros de tiempo de ejecución y la interfaz ValueProvider

La interfaz ValueProvider permite que las canalizaciones acepten parámetros de tiempo de ejecución. Apache Beam proporciona tres tipos de objetos ValueProvider.

Nombre Descripción
RuntimeValueProvider

RuntimeValueProvider es el tipo de ValueProvider predeterminado. RuntimeValueProvider permite que tu canalización acepte un valor que solo está disponible durante la ejecución de la canalización. El valor no está disponible durante la creación de la canalización, por lo que no puedes usarlo para cambiar el gráfico de flujo de trabajo de la canalización.

Puedes usar isAccessible() para comprobar si el valor de a ValueProvider está disponible. Si llamas a get() antes de ejecutar la canalización, Apache Beam devuelve un error:
Value only available at runtime, but accessed from a non-runtime context.

Usa RuntimeValueProvider cuando no sepas el valor con antelación. Para cambiar los valores de los parámetros en el tiempo de ejecución, no les asigne valores en la plantilla. Define los valores de los parámetros al crear tareas a partir de la plantilla.

StaticValueProvider

StaticValueProvider te permite proporcionar un valor estático a tu canalización. El valor está disponible durante la creación de la canalización, por lo que puedes usarlo para cambiar el gráfico de flujo de trabajo de la canalización.

Usa StaticValueProvider cuando sepas el valor con antelación. Consulta la sección StaticValueProvider para ver ejemplos.

NestedValueProvider

NestedValueProvider te permite calcular un valor a partir de otro objeto ValueProvider. NestedValueProvider envuelve un ValueProvider, y el tipo del ValueProvider envuelto determina si se puede acceder al valor durante la creación de la canalización.

Usa NestedValueProvider cuando quieras usar el valor para calcular otro valor en el tiempo de ejecución. Consulta la sección NestedValueProvider para ver algunos ejemplos.

Usar parámetros de tiempo de ejecución en el código de la canalización

En esta sección se explica cómo usar ValueProvider, StaticValueProvider y NestedValueProvider.

Usar ValueProvider en las opciones de la canalización

Usa ValueProvider para todas las opciones de la canalización que quieras definir o usar en el tiempo de ejecución.

Por ejemplo, el siguiente fragmento de código de WordCount no admite parámetros de tiempo de ejecución. El código añade una opción de archivo de entrada, crea una canalización y lee líneas del archivo de entrada:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Para añadir compatibilidad con parámetros de tiempo de ejecución, modifique la opción del archivo de entrada para usar ValueProvider.

Java

Usa ValueProvider<String> en lugar de String para el tipo de opción de archivo de entrada.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Reemplaza add_argument por add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Usar ValueProvider en tus funciones

Para usar valores de parámetros de tiempo de ejecución en tus propias funciones, actualiza las funciones para que usen parámetros ValueProvider.

El siguiente ejemplo contiene una opción ValueProvider de tipo entero y una función sencilla que añade un entero. La función depende del entero ValueProvider. Durante la ejecución, la canalización aplica MySumFn a cada número entero de un PCollection que contiene [1, 2, 3]. Si el valor de tiempo de ejecución es 10, el PCollection resultante contiene [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Usar StaticValueProvider

Para proporcionar un valor estático a tu canalización, usa StaticValueProvider.

En este ejemplo se usa MySumFn, que es un DoFn que toma un ValueProvider<Integer>. Si conoces el valor del parámetro de antemano, puedes usar StaticValueProvider para especificar tu valor estático como ValueProvider.

Java

Este código obtiene el valor en el tiempo de ejecución de la canalización:

  .apply(ParDo.of(new MySumFn(options.getInt())))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Este código obtiene el valor en el tiempo de ejecución de la canalización:

  beam.ParDo(MySumFn(user_options.templated_int))

En su lugar, puedes usar StaticValueProvider con un valor estático:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

También puedes usar StaticValueProvider cuando implementes un módulo de entrada/salida que admita tanto parámetros normales como parámetros de tiempo de ejecución. StaticValueProvider reduce la duplicación de código al implementar dos métodos similares.

Java

El código fuente de este ejemplo procede de TextIO.java en GitHub de Apache Beam.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

En este ejemplo, hay un solo constructor que acepta un argumento string o ValueProvider. Si el argumento es un string, se convierte en un StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Usar NestedStaticValueProvider

Para calcular un valor a partir de otro objeto ValueProvider, usa NestedValueProvider.

NestedValueProvider toma un ValueProvider y un traductor SerializableFunction como entrada. Cuando llamas a .get() en un NestedValueProvider, el traductor crea un nuevo valor basado en el valor de ValueProvider. Esta traducción te permite usar un valor ValueProvider para crear el valor final que quieras.

En el siguiente ejemplo, el usuario proporciona el nombre de archivo file.txt. La transformación antepone la ruta gs://directory_name/ al nombre del archivo. Llamar a .get() devuelve gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Usar metadatos en el código de la canalización

Puedes ampliar tu plantilla con metadatos adicionales para que los parámetros personalizados se validen cuando se ejecute la plantilla. Si quieres crear metadatos para tu plantilla, sigue estos pasos:

  1. Crea un archivo con formato JSON llamado TEMPLATE_NAME_metadata con los parámetros de Parámetros de metadatos y el formato de Archivo de metadatos de ejemplo. Sustituye TEMPLATE_NAME por el nombre de tu plantilla.

    Asegúrate de que el archivo de metadatos no tenga una extensión de nombre de archivo. Por ejemplo, si el nombre de la plantilla es myTemplate, el archivo de metadatos debe ser myTemplate_metadata.

  2. Almacena el archivo de metadatos en Cloud Storage, en la misma carpeta que la plantilla.

Parámetros de metadatos

Clave de parámetro Obligatorio Descripción del valor
name El nombre de la plantilla.
description No Un breve párrafo de texto que describe la plantilla.
streaming No Si tiene el valor true, esta plantilla admite el streaming. El valor predeterminado es false.
supportsAtLeastOnce No Si true, esta plantilla admite el procesamiento al menos una vez. El valor predeterminado es false. Asigna el valor true a este parámetro si la plantilla se ha diseñado para que funcione con el modo de streaming al menos una vez.
supportsExactlyOnce No Si true, esta plantilla admite el procesamiento una sola vez. El valor predeterminado es true.
defaultStreamingMode No Es el modo de streaming predeterminado para las plantillas que admiten tanto el modo "al menos una vez" como el modo "exactamente una vez". Usa uno de los siguientes valores: "AT_LEAST_ONCE", "EXACTLY_ONCE". Si no se especifica, el modo de transmisión predeterminado es exactamente una vez.
parameters No Una matriz de parámetros adicionales que usa la plantilla. De forma predeterminada, se usa un array vacío.
name El nombre del parámetro que se usa en la plantilla.
label Cadena legible por humanos que se usa en la consola para etiquetar el parámetro. Google Cloud
helpText Un breve párrafo de texto que describe el parámetro.
isOptional No false si el parámetro es obligatorio y true si es opcional. Si no se le asigna ningún valor, isOptional tiene el valor predeterminado false. Si no incluye esta clave de parámetro en los metadatos, estos se convertirán en un parámetro obligatorio.
regexes No Matriz de expresiones regulares POSIX-egrep en formato de cadena que se usa para validar el valor del parámetro. Por ejemplo, ["^[a-zA-Z][a-zA-Z0-9]+"] es una única expresión regular que valida que el valor empieza por una letra y, después, tiene uno o más caracteres. Se usa un array vacío de forma predeterminada.

Archivo de metadatos de ejemplo

Java

El servicio Dataflow usa los siguientes metadatos para validar los parámetros personalizados de la plantilla WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

El servicio Dataflow usa los siguientes metadatos para validar los parámetros personalizados de la plantilla WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Puedes descargar archivos de metadatos de las plantillas proporcionadas por Google desde el directorio de plantillas de Dataflow .

Conectores de entrada y salida de canalizaciones admitidos y ValueProvider

Java

Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider. Para determinar si se admite un conector y un método específicos, consulta la documentación de referencia de la API del conector de E/S. Los métodos admitidos tienen una sobrecarga con un objeto ValueProvider. Si un método no tiene una sobrecarga, no admite parámetros de tiempo de ejecución. Los siguientes conectores de E/S tienen al menos compatibilidad parcial con ValueProvider:

  • Órdenes de inserción basadas en archivos: TextIO, AvroIO, FileIO, TFRecordIO y XmlIO
  • BigQueryIO*
  • BigtableIO (requiere la versión 2.3.0 del SDK o una posterior)
  • PubSubIO
  • SpannerIO

Python

Algunos conectores de E/S contienen métodos que aceptan objetos ValueProvider. Para determinar si se admiten conectores de entrada/salida y sus métodos, consulta la documentación de referencia de la API del conector. Los siguientes conectores de entrada/salida aceptan parámetros de tiempo de ejecución:

  • IOs basados en archivos: textio, avroio y tfrecordio

Crear y poner en fase de pruebas una plantilla clásica

Después de escribir tu canal, debes crear y organizar tu archivo de plantilla. Cuando creas y almacenas en área de stage una plantilla, la ubicación de almacenamiento en área de stage contiene archivos adicionales que son necesarios para ejecutar la plantilla. Si eliminas la ubicación de almacenamiento provisional, la plantilla no se podrá ejecutar. El trabajo de Dataflow no se ejecuta inmediatamente después de organizar la plantilla. Para ejecutar un trabajo de Dataflow basado en una plantilla personalizada, puedes usar la Google Cloud consola, la API REST de Dataflow o la CLI de gcloud.

En el siguiente ejemplo se muestra cómo organizar un archivo de plantilla:

Java

Este comando de Maven crea y pone en el área de stage una plantilla en la ubicación de Cloud Storage especificada con --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Comprueba que la ruta de templateLocation sea correcta. Haz los cambios siguientes:

  • com.example.myclass: tu clase de Java
  • PROJECT_ID: tu ID de proyecto
  • BUCKET_NAME: el nombre de tu segmento de Cloud Storage
  • TEMPLATE_NAME: el nombre de tu plantilla
  • REGION: la región en la que desplegar la tarea de Dataflow

Python

Este comando de Python crea y pone en el área de stage una plantilla en la ubicación de Cloud Storage especificada con --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Comprueba que la ruta de template_location sea correcta. Haz los cambios siguientes:

  • examples.mymodule: tu módulo de Python
  • PROJECT_ID: tu ID de proyecto
  • BUCKET_NAME: el nombre de tu segmento de Cloud Storage
  • TEMPLATE_NAME: el nombre de tu plantilla
  • REGION: la región en la que desplegar la tarea de Dataflow

Después de crear y preparar la plantilla, el siguiente paso es ejecutarla.