Como criar modelos clássicos

Os modelos do Dataflow usam parâmetros do ambiente de execução para aceitar valores que estão disponíveis somente durante a execução do pipeline. Para personalizar a execução de um pipeline de modelo, é possível transmitir esses parâmetros para funções executadas dentro do pipeline (como uma DoFn).

Para criar um modelo do pipeline do Apache Beam, modifique seu código de canal para aceitar parâmetros de ambiente de execução:

Em seguida, crie e organize seu modelo.

Parâmetros de ambiente de execução e a interface ValueProvider

A interface ValueProvider permite que os pipelines aceitem parâmetros de ambiente de execução. O Apache Beam fornece três tipos de objetos ValueProvider.

Nome Descrição
RuntimeValueProvider

RuntimeValueProvider é o tipo de ValueProvider padrão. RuntimeValueProvider permite que seu pipeline aceite um valor que esteja disponível somente durante a execução do pipeline. O valor não está disponível durante a construção do pipeline. Portanto, não é possível usá-lo para alterar o gráfico do fluxo de trabalho do pipeline.

Use isAccessible() para verificar se o valor de ValueProvider está disponível. Se você chamar get() antes da execução do pipeline, o Apache Beam retornará um erro:
Value only available at runtime, but accessed from a non-runtime context.

Use RuntimeValueProvider quando você não souber o valor antecipadamente.

StaticValueProvider

StaticValueProvider permite fornecer um valor estático para seu pipeline. O valor está disponível durante a construção do canal, portanto, você pode usá-lo para alterar o gráfico do fluxo de trabalho do canal.

Use StaticValueProvider quando souber o valor antecipadamente. Consulte a seção StaticValueProvider para ver exemplos.

NestedValueProvider

NestedValueProvider permite calcular um valor de outro objeto ValueProvider. NestedValueProvider encapsula uma ValueProvider, e o tipo do encapsulamento ValueProvider determina se o valor é acessível durante a construção do pipeline.

Use NestedValueProvider quando quiser usar o valor para computar outro valor no ambiente de execução. Consulte a seção NestedValueProvider para ver exemplos.

O executor do Dataflow não é compatível com opções ValueProvider para tópicos e parâmetros de assinatura do Pub/Sub. Se você precisar de opções do Pub/Sub nos parâmetros de ambiente de execução, alterne para o uso de Flex Templates.

Como modificar o código para usar parâmetros de tempo de execução

Esta seção mostra como usar ValueProvider, StaticValueProvider e NestedValueProvider.

Como usar o ValueProvider nas opções do pipeline

Use ValueProvider para todas as opções de pipeline que você quer definir ou usar no ambiente de execução.

Por exemplo, o snippet de código WordCount a seguir não é compatível com parâmetros de ambiente de execução. Esse código adiciona uma opção de arquivo de entrada, cria um pipeline e lê linhas do arquivo de entrada:

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

Para adicionar suporte ao parâmetro de ambiente de execução, modifique a opção do arquivo de entrada para usar ValueProvider.

Java: SDK 2.x

Use ValueProvider<String> em vez de String para o tipo da opção de arquivo de entrada.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Substitua add_argument por add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

Como usar ValueProvider nas funções

Para usar valores de parâmetro de ambiente de execução em suas próprias funções, atualize as funções para usar os parâmetros ValueProvider.

O exemplo a seguir contém uma opção ValueProvider de número inteiro e uma função simples que adiciona um número inteiro. A função depende do número inteiro ValueProvider. Durante a execução, o pipeline aplica MySumFn para todo número inteiro em uma PCollection que contém [1, 2, 3]. Se o valor do ambiente de execução for 10, o resultado PCollection contém [11, 12, 13].

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java: SDK 1.x

Como usar StaticValueProvider

Para fornecer um valor estático ao seu pipeline, use StaticValueProvider.

Este exemplo usa MySumFn, que é uma DoFn que leva uma ValueProvider<Integer>. Se você souber o valor do parâmetro com antecedência, use StaticValueProvider para especificar seu valor estático como uma ValueProvider.

Java: SDK 2.x

Este código recebe o valor no ambiente de execução do pipeline:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Em vez disso, use StaticValueProvider com um valor estático:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Este código recebe o valor no ambiente de execução do pipeline:

  beam.ParDo(MySumFn(user_options.templated_int))

Em vez disso, use StaticValueProvider com um valor estático:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java: SDK 1.x

Também é possível usar StaticValueProvider ao implementar um módulo de E/S que seja compatível com parâmetros regulares e parâmetros de ambiente de execução. StaticValueProvider reduz a duplicação de código ao implementar dois métodos semelhantes.

Java: SDK 2.x

O código-fonte deste exemplo é do TextIO.java do Apache Beam no GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

Neste exemplo, há um único construtor que aceita argumentos string ou ValueProvider. Se o argumento for um string, ele será convertido em StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java: SDK 1.x

Como usar NestedValueProvider

Para computar um valor de outro objeto ValueProvider, use NestedValueProvider.

NestedValueProvider leva uma ValueProvider e uma SerializableFunction como entrada. Quando você chamar .get() em uma NestedValueProvider, o tradutor cria um novo valor com base no valor ValueProvider. Esta tradução permite que você use um valor ValueProvider para criar o valor final desejado.

No exemplo a seguir, o usuário fornece o nome de arquivo file.txt. A transformação precede o caminho gs://directory_name/ ao nome do arquivo. Chamar .get() retorna gs://directory_name/file.txt.

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Java: SDK 1.x

Metadados

Amplie seus modelos com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo for executado. Se quiser criar metadados para seu modelo:

  1. Crie um arquivo formatado em JSON com o nome <template-name>_metadata usando os parâmetros da tabela abaixo.
  2. Armazene o arquivo no formato JSON no Cloud Storage na mesma pasta do modelo.

Parâmetros de metadados

Chave de parâmetro Obrigatório Descrição do valor
name Sim O nome do seu modelo.
description Não Um parágrafo curto descrevendo os modelos.
parameters Não. Padronizado como uma matriz vazia. Uma matriz de parâmetros adicionais que serão utilizados pelo modelo.
name Sim O nome do parâmetro usado no seu modelo.
label Sim Um rótulo legível que será usado na IU para rotular o parâmetro.
helpText Sim Um parágrafo curto descrevendo o parâmetro.
isOptional Não. Padronizado como falso. false se o parâmetro for obrigatório e true se o parâmetro for opcional. Se você não incluir essa chave de parâmetro nos metadados, eles se tornarão um parâmetro obrigatório.
regexes Não. Padronizado como uma matriz vazia. Uma matriz de expressões regulares POSIX-egrep em formato de string que será usada para validar o valor do parâmetro. Por exemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] é uma expressão regular única que valida que o valor comece com uma letra e tenha um ou mais caracteres.

Exemplo de arquivo de metadados

Java

O serviço do Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo do WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

O serviço do Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo do wordcount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

É possível fazer o download de arquivos de metadados para os modelos fornecidos pelo Google no diretório de modelos do Dataflow.

E/S e parâmetros de tempo de execução do canal

Java: SDK 2.x

Alguns conectores de E/S contêm métodos que aceitam objetos ValueProvider. Para determinar a compatibilidade com um conector e método específico, consulte a documentação de referência da API para o conector de E/S. Os métodos compatíveis têm uma sobrecarga com ValueProvider. Se um método não tiver uma sobrecarga, ele não suportará parâmetros de ambiente de execução. Os seguintes conectores de E/S têm compatibilidade pelo menos parcial com ValueProvider:

  • E/S com base em arquivo: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requer o SDK 2.3.0 ou posterior)
  • PubSubIO
  • SpannerIO

Python

Alguns conectores de E/S contêm métodos que aceitam objetos ValueProvider. Para determinar o suporte para conectores de E/S e os métodos deles, consulte a documentação de referência da API para o conector. Os seguintes conectores de E/S aceitam parâmetros de ambiente de execução.

  • E/S com base em arquivo: textio, avroio, tfrecordio.

Java: SDK 1.x

Como criar e organizar modelos

Depois de gravar o canal, crie e organize seu arquivo de modelo.

Veja os exemplos a seguir sobre como organizar um arquivo de modelo:

Java: SDK 2.x

Este comando do Maven cria e organiza um modelo no local do Cloud Storage especificado com --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME
                  --region=REGION"
    

Verifique se o caminho templateLocation está correto. Substitua:

  • com.example.myclass: sua classe Java
  • PROJECT_ID: ID do projeto
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • TEMPLATE_NAME: o nome do seu modelo.
  • REGION: o endpoint regional para implantar seu job do Dataflow

Python

Este comando do Python cria e organiza um modelo no local do Cloud Storage especificado com --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verifique se o caminho template_location está correto. Substitua:

  • examples.mymodule: seu módulo Python
  • PROJECT_ID: ID do projeto
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • TEMPLATE_NAME: o nome do seu modelo.
  • REGION: o endpoint regional para implantar seu job do Dataflow

Java: SDK 1.x

Depois de criar e organizar o modelo, a próxima etapa será executar o modelo.