Neste documento, você aprenderá a criar um modelo clássico personalizado usando o código do pipeline do Dataflow. Os modelos clássicos empacotam os pipelines atuais do Dataflow para criar modelos reutilizáveis que podem ser personalizados para cada job alterando os parâmetros específicos do pipeline. Em vez de gravar o modelo, use um comando para gerá-lo de um pipeline atual.
A seguir, há uma breve visão geral do processo. Os detalhes desse processo são fornecidos nas seções a seguir.
- No código do pipeline, use a interface
ValueProvider
para todas as opções de pipeline que você quer definir ou usar no ambiente de execução. Use objetosDoFn
que aceitam parâmetros de ambiente de execução. - Estenda seu modelo com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo clássico for executado. Exemplos desses metadados incluem o nome do seu modelo clássico personalizado e parâmetros opcionais.
- Verifique se os conectores de E/S do pipeline aceitam objetos
ValueProvider
e faça as alterações conforme necessário. - Crie e organize o modelo clássico personalizado.
- Executar o modelo clássico personalizado.
Para saber mais sobre os diferentes tipos de modelos do Dataflow, os benefícios deles e quando escolher um modelo clássico, consulte Modelos do Dataflow.
Permissões necessárias para executar um modelo clássico
As permissões necessárias para executar o modelo clássico do Dataflow dependem de onde o modelo é executado e se a origem e o coletor do pipeline estão em outro projeto.
Para mais informações sobre como executar pipelines do Dataflow localmente ou usando o Google Cloud, consulte Segurança e permissões do Dataflow.
Para uma lista de papéis e permissões do Dataflow, consulte Controle de acesso do Dataflow.
Limitações
- A
opção de pipeline
a seguir não é compatível com os modelos clássicos. Se você precisar controlar o número de
linhas de execução de arcabouços de workers, use
modelos Flex.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- O executor do Dataflow não oferece suporte às opções
ValueProvider
para tópicos e parâmetros de assinatura do Pub/Sub. Se você precisar de opções do Pub/Sub nos seus parâmetros de ambiente de execução, use modelos flexíveis.
Sobre os parâmetros de ambiente de execução e a interface ValueProvider
A interface ValueProvider
permite que os pipelines aceitem parâmetros de ambiente de execução. O Apache Beam fornece três tipos de objetos ValueProvider
.
Nome | Descrição |
---|---|
RuntimeValueProvider |
Use Use |
StaticValueProvider |
Use |
NestedValueProvider |
Use |
Use parâmetros do ambiente de execução no código do pipeline
Esta seção mostra como usar ValueProvider
, StaticValueProvider
e NestedValueProvider
.
Use ValueProvider
nas opções do pipeline
Use ValueProvider
para todas as opções de pipeline que você quer definir ou usar no ambiente de execução.
Por exemplo, o snippet de código WordCount
a seguir não é compatível com parâmetros de ambiente de execução. Esse código adiciona uma opção de arquivo de entrada, cria um canal e lê linhas do arquivo de entrada:
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Para adicionar suporte ao parâmetro de ambiente de execução, modifique a opção do arquivo de entrada para usar ValueProvider
.
Java
Use ValueProvider<String>
em vez de String
para o tipo da opção de arquivo de entrada.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Substitua add_argument
por add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Use ValueProvider
nas suas funções
Para usar valores de parâmetro de ambiente de execução em suas próprias funções, atualize as funções para usar os parâmetros ValueProvider
.
O exemplo a seguir contém uma opção ValueProvider
de número inteiro e uma função simples que adiciona um número inteiro. A função depende do número inteiro ValueProvider
. Durante a execução, o pipeline aplica MySumFn
para todo número inteiro em uma PCollection
que contém [1, 2, 3]
. Se o valor do ambiente de execução for 10, o resultado PCollection
contém [11, 12, 13]
.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Usar StaticValueProvider
Para fornecer um valor estático ao seu pipeline, use StaticValueProvider
.
Este exemplo usa MySumFn
, que é uma DoFn
que leva uma ValueProvider<Integer>
. Se você souber o valor do parâmetro com antecedência, use StaticValueProvider
para especificar seu valor estático como uma ValueProvider
.
Java
Este código recebe o valor no ambiente de execução do canal:
.apply(ParDo.of(new MySumFn(options.getInt())))
Em vez disso, use StaticValueProvider
com um valor estático:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Este código recebe o valor no ambiente de execução do pipeline:
beam.ParDo(MySumFn(user_options.templated_int))
Em vez disso, use StaticValueProvider
com um valor estático:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Também é possível usar StaticValueProvider
ao implementar um módulo de E/S que seja compatível com parâmetros regulares e parâmetros de ambiente de execução.
StaticValueProvider
reduz a duplicação de código ao implementar dois métodos semelhantes.
Java
O código-fonte deste exemplo é do TextIO.java do Apache Beam no GitHub.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
Neste exemplo, há um único construtor que aceita argumentos string
ou ValueProvider
. Se o argumento for um string
, ele será convertido em StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Usar NestedStaticValueProvider
Para computar um valor de outro objeto ValueProvider
, use NestedValueProvider
.
NestedValueProvider
leva uma ValueProvider
e uma SerializableFunction
como entrada. Quando você chamar .get()
em uma NestedValueProvider
, o tradutor cria um novo valor com base no valor ValueProvider
. Essa
tradução permite usar um valor ValueProvider
para criar
o valor final desejado.
No exemplo a seguir, o usuário fornece o nome de arquivo file.txt
. A
transformação precede o caminho gs://directory_name/
ao
nome do arquivo. Chamar .get()
retorna gs://directory_name/file.txt
.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Use metadados no código do pipeline
É possível ampliar o modelo com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo for executado. Se você quiser criar metadados para seu modelo, siga estas etapas:
- Crie um arquivo no formato JSON chamado
TEMPLATE_NAME_metadata
usando os parâmetros em Parâmetros de metadados e o formato em Exemplo de arquivo de metadados. SubstituaTEMPLATE_NAME
pelo nome do seu modelo.Verifique se o arquivo de metadados não tem uma extensão de nome de arquivo. Por exemplo, se o nome do modelo for
myTemplate
, o arquivo de metadados dele precisará sermyTemplate_metadata
. - Armazene o arquivo JSON no Cloud Storage na mesma pasta do modelo.
Parâmetros de metadados
Chave de parâmetro | Obrigatório | Descrição do valor | |
---|---|---|---|
name |
Sim | O nome do seu modelo. | |
description |
Não | Um parágrafo curto descrevendo o parâmetro. | |
streaming |
Não | Se true , o modelo é compatível com streaming. O valor padrão é
false . |
|
supportsAtLeastOnce |
Não | Se true , o modelo é compatível com o processamento "Pelo menos uma vez". O valor padrão é false . Defina esse parâmetro como true se o modelo for projetado para funcionar com o modo de streaming "Pelo menos uma vez".
|
|
supportsExactlyOnce |
Não | Se true , o modelo é compatível com o processamento "Exatamente uma vez". O valor padrão é true . |
|
defaultStreamingMode |
Não | O modo de streaming padrão, para modelos compatíveis com os modos "pelo menos uma vez" e
"exatamente uma". Use um dos seguintes valores: "AT_LEAST_ONCE" , "EXACTLY_ONCE" . Se não for especificado, o modo de streaming padrão será exatamente uma vez.
|
|
parameters |
Não | Uma matriz de parâmetros adicionais que o modelo usa. Uma matriz vazia é usada por padrão. | |
name |
Sim | O nome do parâmetro usado no seu modelo. | |
label |
Sim | Uma string legível que é usada no Console do Google Cloud para rotular o parâmetro. | |
helpText |
Sim | Um parágrafo curto que descreve o parâmetro. | |
isOptional |
Não | false se o parâmetro for obrigatório e true se o parâmetro for opcional. A menos que definido com um valor, isOptional assume como padrão false .
Se você não incluir essa chave de parâmetro nos metadados, eles se tornarão um parâmetro
obrigatório. |
|
regexes |
Não | Uma matriz de expressões regulares POSIX-egrep em formato de string que será usada para validar o
valor do parâmetro. Por exemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] é uma
expressão regular única que valida que o valor comece com uma letra e tenha um ou
mais caracteres. Uma matriz vazia é usada por padrão. |
Exemplo de arquivo de metadados
Java
O serviço do Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo do WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
O serviço do Dataflow usa os seguintes metadados para validar os parâmetros personalizados do modelo do WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
É possível fazer o download de arquivos de metadados para os modelos fornecidos pelo Google no diretório de modelos do Dataflow.
Conectores de E/S de pipeline compatíveis e ValueProvider
Java
Alguns conectores de E/S contêm métodos que aceitam objetos ValueProvider
. Para determinar a compatibilidade com um conector e método específico, consulte a documentação de referência da API para o conector de E/S. Os métodos compatíveis têm uma sobrecarga com ValueProvider
. Se um método não tiver uma sobrecarga, ele não suportará parâmetros de ambiente de execução. Os seguintes conectores de E/S têm compatibilidade pelo menos parcial com ValueProvider
:
- E/S com base em arquivo:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(requer o SDK 2.3.0 ou posterior)PubSubIO
SpannerIO
Python
Alguns conectores de E/S contêm métodos que aceitam objetos ValueProvider
. Para determinar o suporte para conectores de E/S e os métodos deles, consulte a documentação de referência da API para o conector. Os seguintes conectores de E/S aceitam parâmetros de ambiente de execução.
- E/S com base em arquivo:
textio
,avroio
,tfrecordio
.
Crie e organize um modelo clássico
Depois de gravar o canal, crie e organize seu arquivo de modelo. Depois de criar e organizar um modelo, o local de teste conterá arquivos adicionais necessários para executar seu modelo. Se você excluir o local de organização, o modelo não será executado. O job do Dataflow não é executado imediatamente após o modelo ser organizado. Para executar um job personalizado do Dataflow com base em modelo, use o Console do Google Cloud, a API REST do Dataflow ou a CLI gcloud.
O exemplo a seguir mostra como organizar um arquivo de modelo:
Java
Este comando do Maven cria e organiza um modelo no local do Cloud Storage especificado com --templateLocation
.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
Verifique se o caminho templateLocation
está correto. Substitua:
com.example.myclass
: sua classe JavaPROJECT_ID
: ID do projetoBUCKET_NAME
: o nome do bucket do Cloud StorageTEMPLATE_NAME
: o nome do seu modelo.REGION
: a região onde o job do Dataflow será implantado
Python
Este comando do Python cria e organiza um modelo no local do Cloud Storage especificado com --template_location
.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
Verifique se o caminho template_location
está correto. Substitua:
examples.mymodule
: seu módulo PythonPROJECT_ID
: ID do projetoBUCKET_NAME
: o nome do bucket do Cloud StorageTEMPLATE_NAME
: o nome do seu modelo.REGION
: a região onde o job do Dataflow será implantado
Depois de criar e organizar o modelo, a próxima etapa será executar o modelo.