Os modelos do Dataflow usam parâmetros do ambiente de execução para aceitar valores que estão disponíveis somente durante a execução do pipeline. Para personalizar a execução de um pipeline de modelo, é possível transmitir esses parâmetros para funções executadas dentro do pipeline (como uma DoFn
).
Para criar um modelo do pipeline do Apache Beam, modifique seu código de canal para aceitar parâmetros de ambiente de execução:
- Use
ValueProvider
para todas as opções de pipeline que você quer definir ou usar no ambiente de execução. - Chame métodos de E/S que aceitam parâmetros de ambiente de execução sempre que você quiser parametrizar seu canal.
- Use objetos
DoFn
que aceitam parâmetros de ambiente de execução.
Em seguida, crie e organize seu modelo.
Parâmetros de ambiente de execução e a interface ValueProvider
A interface ValueProvider
permite que os pipelines aceitem parâmetros de ambiente de execução. O Apache Beam fornece três tipos de objetos ValueProvider
.
Nome | Descrição |
---|---|
RuntimeValueProvider |
Use Use |
StaticValueProvider |
Use |
NestedValueProvider |
Use Observação: o SDK do Apache Beam para Python não é compatível com |
Como modificar o código para usar parâmetros de ambiente de execução
Esta seção mostra como usar ValueProvider
, StaticValueProvider
e NestedValueProvider
.
Como usar o ValueProvider nas opções do pipeline
Use ValueProvider
para todas as opções de pipeline que você quer definir ou usar no ambiente de execução.
Por exemplo, o snippet de código WordCount
a seguir não é compatível com parâmetros de ambiente de execução. Esse código adiciona uma opção de arquivo de entrada, cria um pipeline e lê linhas do arquivo de entrada:
Java: SDK 2.x
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
Para adicionar suporte ao parâmetro de ambiente de execução, modifique a opção do arquivo de entrada para usar ValueProvider
.
Java: SDK 2.x
Use ValueProvider<String>
em vez de String
para o tipo da opção de arquivo de entrada.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Substitua add_argument
por add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
Como usar ValueProvider nas funções
Para usar valores de parâmetro de ambiente de execução em suas próprias funções, atualize as funções para usar os parâmetros ValueProvider
.
O exemplo a seguir contém uma opção ValueProvider
de número inteiro e uma função simples que adiciona um número inteiro. A função depende do número inteiro ValueProvider
. Durante a execução, o pipeline aplica MySumFn
para todo número inteiro em uma PCollection
que contém [1, 2, 3]
. Se o valor do ambiente de execução for 10, o resultado PCollection
contém [11, 12, 13]
.
Java: SDK 2.x
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Java: SDK 1.x
Como usar StaticValueProvider
Para fornecer um valor estático ao seu pipeline, use StaticValueProvider
.
Este exemplo usa MySumFn
, que é uma DoFn
que leva uma ValueProvider<Integer>
. Se você souber o valor do parâmetro com antecedência, use StaticValueProvider
para especificar seu valor estático como uma ValueProvider
.
Java: SDK 2.x
Este código recebe o valor no ambiente de execução do pipeline:
.apply(ParDo.of(new MySumFn(options.getInt())))
Em vez disso, use StaticValueProvider
com um valor estático:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Este código recebe o valor no ambiente de execução do pipeline:
beam.ParDo(MySumFn(user_options.templated_int))
Em vez disso, use StaticValueProvider
com um valor estático:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Java: SDK 1.x
Também é possível usar StaticValueProvider
ao implementar um módulo de E/S que seja compatível com parâmetros regulares e parâmetros de ambiente de execução.
StaticValueProvider
reduz a duplicação de código ao implementar dois métodos semelhantes.
Java: SDK 2.x
O código-fonte deste exemplo é do TextIO.java do Apache Beam no GitHub.
// Create a StaticValueProviderfrom a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
Neste exemplo, há um único construtor que aceita argumentos string
ou ValueProvider
. Se o argumento for um string
, ele será convertido em StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, basestring): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Java: SDK 1.x
Como usar NestedValueProvider
Observação: o SDK do Apache Beam para Python não é compatível com NestedValueProvider
.
Para computar um valor de outro objeto ValueProvider
, use NestedValueProvider
.
NestedValueProvider
leva uma ValueProvider
e uma SerializableFunction
como entrada. Quando você chamar .get()
em uma NestedValueProvider
, o tradutor cria um novo valor com base no valor ValueProvider
. Esta tradução permite que você use um valor ValueProvider
para criar o valor final desejado:
- Exemplo 1: o usuário fornece um nome de arquivo
file.txt
. A transformação precede o caminho do arquivogs://directory_name/
ao nome do arquivo. Chamar.get()
retornags://directory_name/file.txt
. - Exemplo 2: o usuário fornece uma substring para uma consulta do BigQuery, como uma data específica. A transformação usa a substring para criar a consulta completa. Chamar
.get()
retorna a consulta completa.
Observação: NestedValueProvider
aceita apenas uma entrada de valor. Não é possível usar um NestedValueProvider
para combinar dois valores diferentes.
O código a seguir usa NestedValueProvider
para implementar o primeiro exemplo: o usuário fornece um nome de arquivo e a transformação precede o caminho do arquivo para o nome do arquivo.
Java: SDK 2.x
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Python
O SDK do Apache Beam para Python não é compatível com NestedValueProvider
.
Java: SDK 1.x
Metadados
Amplie seus modelos com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo for executado. Se quiser criar metadados para seu modelo:
- Crie um arquivo formatado em JSON com o nome
<template-name>_metadata
usando os parâmetros da tabela abaixo.Observação: não nomeie o arquivo que você criou de
<template-name>_metadata.json
. Embora o arquivo contenha JSON, ele não pode terminar com a extensão de arquivo.json
. - Armazene o arquivo JSON no Cloud Storage na mesma pasta do modelo.
Observação: o modelo precisa ser armazenado em
<template-name>
e os metadados precisam ser armazenados em<template-name>_metadata
.
Parâmetros de metadados
Chave de parâmetro | Obrigatório | Descrição do valor | |
---|---|---|---|
name |
Sim | O nome do seu modelo. | |
description |
Não | Um parágrafo curto descrevendo os modelos. | |
parameters |
Não. Padronizado como uma matriz vazia. | Uma matriz de parâmetros adicionais que serão utilizados pelo modelo. | |
name |
Sim | O nome do parâmetro usado no seu modelo. | |
label |
Sim | Um rótulo legível que será usado na IU para rotular o parâmetro. | |
helpText |
Sim | Um parágrafo curto descrevendo o parâmetro. | |
isOptional |
Não. Padronizado como falso. | false se o parâmetro for obrigatório e true se o parâmetro for opcional. |
|
regexes |
Não. Padronizado como uma matriz vazia. | Uma matriz de expressões regulares POSIX-egrep em formato de string que será usada para validar o valor do parâmetro. Por exemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] é uma expressão regular única que valida que o valor comece com uma letra e tenha um ou mais caracteres. |
Exemplo de arquivo de metadados
O serviço do Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo do WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
É possível fazer o download desse arquivo de metadados no diretório de modelos do Dataflow.
E/S e parâmetros de ambiente de execução do pipeline
Java: SDK 2.x
Alguns conectores de E/S contêm métodos que aceitam objetos ValueProvider
. Para determinar a compatibilidade com um conector e método específico, consulte a documentação de referência da API para o conector de E/S. Os métodos compatíveis têm uma sobrecarga com ValueProvider
. Se um método não tiver uma sobrecarga, ele não suportará parâmetros de ambiente de execução. Os seguintes conectores de E/S têm compatibilidade pelo menos parcial com ValueProvider
:
- E/S com base em arquivo:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(requer o SDK 2.3.0 ou posterior)PubSubIO
SpannerIO
* Observação: se você quiser executar um pipeline em lote que lê do BigQuery, precisa usar .withTemplateCompatibility()
em todas as leituras do BigQuery.
Python
Alguns conectores de E/S contêm métodos que aceitam objetos ValueProvider
. Para determinar o suporte para conectores de E/S e os métodos deles, consulte a documentação de referência da API para o conector. Os seguintes conectores de E/S aceitam parâmetros de ambiente de execução.
- E/S com base em arquivo:
textio
,avroio
,tfrecordio
.
Java: SDK 1.x
Como criar e organizar modelos
Depois de gravar o canal, crie e organize seu arquivo de modelo. Use o comando para sua versão do SDK.
Observação: depois de criar e organizar um modelo, o local de teste conterá arquivos adicionais necessários para executar seu modelo. Caso o local de organização seja excluído, a execução do modelo falhará.
Java: SDK 2.x
Este comando do Maven cria e organiza um modelo no local do Cloud Storage especificado com --templateLocation
.
Substitua
YOUR_PROJECT_ID
pelo ID do projeto.Substitua
YOUR_BUCKET_NAME
pelo nome do bucket no Cloud Storage.Substitua
YOUR_TEMPLATE_NAME
pelo nome do seu modelo.Substitua
com.example.myclass
pela sua classe Java.Verifique se o caminho
templateLocation
está correto.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=YOUR_PROJECT_ID \ --stagingLocation=gs://YOUR_BUCKET_NAME/staging \ --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
Python
Este comando do Python cria e organiza um modelo no local do Cloud Storage especificado com --template_location
. Faça as seguintes alterações no comando:
Substitua
YOUR_PROJECT_ID
pelo ID do projeto.Substitua
YOUR_BUCKET_NAME
pelo nome do bucket no Cloud Storage.Substitua
YOUR_TEMPLATE_NAME
pelo nome do seu modelo.Substitua
examples.mymodule
pelo seu módulo Python.Verifique se o caminho
template_location
está correto.
python -m examples.mymodule \ --runner DataflowRunner \ --project YOUR_PROJECT_ID \ --staging_location gs://YOUR_BUCKET_NAME/staging \ --temp_location gs://YOUR_BUCKET_NAME/temp \ --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
Java: SDK 1.x
Depois de criar e organizar o modelo, a próxima etapa será executar o modelo.