Criar modelos do Dataflow clássico

Neste documento, vai aprender a criar um modelo clássico personalizado a partir do código do pipeline do Dataflow. Os modelos clássicos agrupam pipelines do Dataflow existentes para criar modelos reutilizáveis que pode personalizar para cada tarefa alterando parâmetros específicos do pipeline. Em vez de escrever o modelo, usa um comando para gerar o modelo a partir de um pipeline existente.

Segue-se uma breve vista geral do processo. Os detalhes deste processo são fornecidos nas secções seguintes.

  1. No código do pipeline, use a interface ValueProvider para todas as opções do pipeline que quer definir ou usar no tempo de execução. Use objetos DoFn que aceitem parâmetros de tempo de execução.
  2. Estenda o seu modelo com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo clássico é executado. Alguns exemplos destes metadados incluem o nome do seu modelo clássico personalizado e parâmetros opcionais.
  3. Verifique se os conetores de E/S do pipeline suportam objetos ValueProvider e faça as alterações necessárias.
  4. Crie e prepare o modelo clássico personalizado.
  5. Execute o modelo clássico personalizado.

Para saber mais sobre os diferentes tipos de modelos do Dataflow, as respetivas vantagens e quando escolher um modelo clássico, consulte o artigo Modelos do Dataflow.

Autorizações necessárias para executar um modelo clássico

As autorizações de que precisa para executar o modelo clássico do Dataflow dependem de onde executa o modelo e se a origem e o destino do pipeline estão noutro projeto.

Para mais informações sobre a execução de pipelines do Dataflow localmente ou através da Google Cloud Platform, consulte o artigo Segurança e autorizações do Dataflow.

Para ver uma lista das funções e autorizações do Dataflow, consulte o artigo Controlo de acesso do Dataflow.

Limitações

  • A seguinte opção de pipeline não é suportada com modelos clássicos. Se precisar de controlar o número de threads de arneses de trabalho, use modelos flexíveis.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • O executor do Dataflow não suporta as opções ValueProvider para parâmetros de subscrição e tópicos do Pub/Sub. Se precisar de opções do Pub/Sub nos parâmetros de tempo de execução, use modelos flexíveis.

Acerca dos parâmetros de tempo de execução e da interface ValueProvider

A interface ValueProvider permite que os pipelines aceitem parâmetros de tempo de execução. O Apache Beam oferece três tipos de objetos ValueProvider.

Nome Descrição
RuntimeValueProvider

RuntimeValueProvider é o tipo de ValueProvider predefinido. RuntimeValueProvider permite que o seu pipeline aceite um valor que só está disponível durante a execução do pipeline. O valor não está disponível durante a construção do pipeline, pelo que não pode usar o valor para alterar o gráfico de fluxo de trabalho do pipeline.

Pode usar isAccessible() para verificar se o valor de um ValueProvider está disponível. Se chamar get() antes da execução do pipeline, o Apache Beam devolve um erro:
Value only available at runtime, but accessed from a non-runtime context.

Use RuntimeValueProvider quando não souber o valor antecipadamente. Para alterar os valores dos parâmetros em tempo de execução, não defina valores para os parâmetros no modelo. Defina os valores dos parâmetros quando criar tarefas a partir do modelo.

StaticValueProvider

StaticValueProvider permite-lhe fornecer um valor estático ao seu pipeline. O valor está disponível durante a construção do pipeline, pelo que pode usá-lo para alterar o gráfico do fluxo de trabalho do pipeline.

Use StaticValueProvider quando souber o valor antecipadamente. Consulte a secção StaticValueProviderpara ver exemplos.

NestedValueProvider

NestedValueProvider permite-lhe calcular um valor a partir de outro objeto ValueProvider. NestedValueProvider envolve um ValueProvider e o tipo do ValueProvider envolvido determina se o valor é acessível durante a criação do pipeline.

Use NestedValueProvider quando quiser usar o valor para calcular outro valor no tempo de execução. Consulte a secção NestedValueProviderpara ver exemplos.

Use parâmetros de tempo de execução no código do pipeline

Esta secção explica como usar ValueProvider, StaticValueProvider e NestedValueProvider.

Use ValueProvider nas opções do pipeline

Use ValueProvider para todas as opções de pipeline que quer definir ou usar no tempo de execução.

Por exemplo, o seguinte fragmento do código WordCount não suporta parâmetros de tempo de execução. O código adiciona uma opção de ficheiro de entrada, cria um pipeline e lê linhas do ficheiro de entrada:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Para adicionar suporte de parâmetros de tempo de execução, modifique a opção de ficheiro de entrada para usar ValueProvider.

Java

Use ValueProvider<String> em vez de String para o tipo da opção de ficheiro de entrada.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Substituir add_argument por add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Use ValueProvider nas suas funções

Para usar valores de parâmetros de tempo de execução nas suas próprias funções, atualize as funções para usar parâmetros ValueProvider.

O exemplo seguinte contém uma opção ValueProvider de número inteiro e uma função simples que adiciona um número inteiro. A função depende do número inteiro ValueProvider. Durante a execução, o pipeline aplica MySumFn a todos os números inteiros num PCollection que contenha [1, 2, 3]. Se o valor de tempo de execução for 10, o elemento PCollection resultante contém [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Usar StaticValueProvider

Para fornecer um valor estático ao seu pipeline, use StaticValueProvider.

Este exemplo usa MySumFn, que é um DoFn que usa um ValueProvider<Integer>. Se souber o valor do parâmetro antecipadamente, pode usar StaticValueProvider para especificar o valor estático como ValueProvider.

Java

Este código obtém o valor no tempo de execução do pipeline:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Em alternativa, pode usar StaticValueProvider com um valor estático:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Este código obtém o valor no tempo de execução do pipeline:

  beam.ParDo(MySumFn(user_options.templated_int))

Em alternativa, pode usar StaticValueProvider com um valor estático:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Também pode usar StaticValueProvider quando implementa um módulo de E/S que suporta parâmetros normais e parâmetros de tempo de execução. StaticValueProvider reduz a duplicação de código da implementação de dois métodos semelhantes.

Java

O código-fonte deste exemplo é do ficheiro TextIO.java do Apache Beam no GitHub.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

Neste exemplo, existe um único construtor que aceita um argumento string ou ValueProvider. Se o argumento for um string, é convertido num StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Usar NestedStaticValueProvider

Para calcular um valor a partir de outro objeto ValueProvider, use NestedValueProvider.

NestedValueProvider aceita um ValueProvider e um tradutor SerializableFunction como entrada. Quando chama .get() numa NestedValueProvider, o tradutor cria um novo valor com base no valor ValueProvider. Esta tradução permite-lhe usar um valor ValueProvider para criar o valor final que quer.

No exemplo seguinte, o utilizador indica o nome do ficheiro file.txt. A transformação antepõe o caminho gs://directory_name/ ao nome do ficheiro. A ligar para .get() devolve gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Use metadados no código do pipeline

Pode expandir o modelo com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo é executado. Se quiser criar metadados para o seu modelo, siga estes passos:

  1. Crie um ficheiro formatado em JSON denominado TEMPLATE_NAME_metadata usando os parâmetros em Parâmetros de metadados e o formato em Ficheiro de metadados de exemplo. Substitua TEMPLATE_NAME pelo nome do seu modelo.

    Certifique-se de que o ficheiro de metadados não tem uma extensão de nome de ficheiro. Por exemplo, se o nome do modelo for myTemplate, o respetivo ficheiro de metadados tem de ser myTemplate_metadata.

  2. Armazene o ficheiro de metadados no Cloud Storage na mesma pasta que o modelo.

Parâmetros de metadados

Chave do parâmetro Obrigatória Descrição do valor
name Sim O nome do modelo.
description Não Um breve parágrafo de texto que descreve o modelo.
streaming Não Se for true, este modelo suporta streaming. O valor predefinido é false.
supportsAtLeastOnce Não Se true, este modelo suporta o processamento pelo menos uma vez. O valor predefinido é false. Defina este parâmetro como true se o modelo for concebido para funcionar com o modo de streaming, pelo menos, uma vez.
supportsExactlyOnce Não Se true, este modelo suporta o processamento exatamente uma vez. O valor predefinido é true.
defaultStreamingMode Não O modo de streaming predefinido para modelos que suportam o modo pelo menos uma vez e o modo exatamente uma vez. Use um dos seguintes valores: "AT_LEAST_ONCE", "EXACTLY_ONCE". Se não for especificado, o modo de streaming predefinido é exatamente uma vez.
parameters Não Uma matriz de parâmetros adicionais que o modelo usa. Por predefinição, é usado um array vazio.
name Sim O nome do parâmetro usado no seu modelo.
label Sim Uma string legível que é usada na Google Cloud consola para etiquetar o parâmetro.
helpText Sim Um breve parágrafo de texto que descreve o parâmetro.
isOptional Não false se o parâmetro for obrigatório e true se o parâmetro for opcional. A menos que seja definido com um valor, isOptional tem como predefinição false. Se não incluir esta chave de parâmetro para os metadados, os metadados tornam-se um parâmetro obrigatório.
regexes Não Uma matriz de expressões regulares POSIX-egrep em formato de string que é usada para validar o valor do parâmetro. Por exemplo, ["^[a-zA-Z][a-zA-Z0-9]+"] é uma única expressão regular que valida se o valor começa com uma letra e, em seguida, tem um ou mais carateres. Por predefinição, é usado um conjunto vazio.

Exemplo de ficheiro de metadados

Java

O serviço Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

O serviço Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Pode transferir ficheiros de metadados para os modelos fornecidos pela Google a partir do Dataflow diretório de modelos.

Conetores de E/S de pipeline suportados e ValueProvider

Java

Alguns conetores de E/S contêm métodos que aceitam objetos ValueProvider. Para determinar o suporte de um conetor e um método específicos, consulte a documentação de referência da API do conetor de E/S. Os métodos suportados têm uma sobrecarga com um ValueProvider. Se um método não tiver uma sobrecarga, o método não suporta parâmetros de tempo de execução. Os seguintes conetores de E/S têm, pelo menos, suporte ValueProvider parcial:

  • IOs baseadas em ficheiros: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requer o SDK 2.3.0 ou posterior)
  • PubSubIO
  • SpannerIO

Python

Alguns conetores de E/S contêm métodos que aceitam objetos ValueProvider. Para determinar o suporte para conectores de E/S e os respetivos métodos, consulte a documentação de referência da API do conector. Os seguintes conetores de E/S aceitam parâmetros de tempo de execução:

  • OIs baseadas em ficheiros: textio, avroio e tfrecordio

Crie e prepare um modelo clássico

Depois de escrever o pipeline, tem de criar e preparar o ficheiro de modelo. Quando cria e prepara um modelo, a localização de preparação contém ficheiros adicionais necessários para executar o seu modelo. Se eliminar a localização de preparação, o modelo não é executado. A tarefa do Dataflow não é executada imediatamente após a preparação do modelo. Para executar uma tarefa do Dataflow personalizada baseada em modelos, pode usar a Google Cloud consola, a API REST do Dataflow ou a CLI gcloud.

O exemplo seguinte mostra como preparar um ficheiro de modelo:

Java

Este comando Maven cria e prepara um modelo na localização do Cloud Storage especificada com --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Verifique se o caminho templateLocation está correto. Substitua o seguinte:

  • com.example.myclass: a sua aula de Java
  • PROJECT_ID: o ID do seu projeto
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage
  • TEMPLATE_NAME: o nome do seu modelo
  • REGION: a região para implementar a sua tarefa do Dataflow

Python

Este comando Python cria e prepara um modelo na localização do Cloud Storage especificada com --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verifique se o caminho template_location está correto. Substitua o seguinte:

  • examples.mymodule: o seu módulo Python
  • PROJECT_ID: o ID do seu projeto
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage
  • TEMPLATE_NAME: o nome do seu modelo
  • REGION: a região para implementar a sua tarefa do Dataflow

Depois de criar e preparar o modelo, o passo seguinte é executar o modelo.