In questo documento scoprirai come creare un modello classico personalizzato dal codice della pipeline Dataflow. I modelli classici impacchettano le pipeline Dataflow esistenti per creare modelli riutilizzabili che puoi personalizzare per ogni job modificando parametri specifici della pipeline. Anziché scrivere il modello, utilizzi un comando per generarlo da una pipeline esistente.
Di seguito è riportata una breve panoramica della procedura. I dettagli di questa procedura sono riportati nelle sezioni successive.
- Nel codice della pipeline, utilizza l'interfaccia
ValueProvider
per tutte le opzioni della pipeline che vuoi impostare o utilizzare in fase di esecuzione. Utilizza oggettiDoFn
che accettano parametri di runtime. - Espandi il modello con metadati aggiuntivi in modo che i parametri personalizzati vengano convalidati quando viene eseguito il modello classico. Alcuni esempi di questi metadati sono il nome del modello classico personalizzato e i parametri facoltativi.
- Verifica se i connettori I/O della pipeline supportano gli oggetti
ValueProvider
e apporta le modifiche necessarie. - Crea e esegui il deployment del modello classico personalizzato.
- Esegui il modello classico personalizzato.
Per scoprire i diversi tipi di modelli Dataflow, i relativi vantaggi e quando scegliere un modello classico, consulta Modelli Dataflow.
Autorizzazioni necessarie per l'esecuzione di un modello classico
Le autorizzazioni necessarie per eseguire il modello Dataflow classico dipendono da dove lo esegui e se l'origine e la destinazione della pipeline si trovano in un altro progetto.
Per ulteriori informazioni sull'esecuzione delle pipeline Dataflow localmente o utilizzando Google Cloud, consulta Sicurezza e autorizzazioni di Dataflow.
Per un elenco dei ruoli e delle autorizzazioni di Dataflow, consulta Controllo dell'accesso a Dataflow.
Limitazioni
- La seguente
opzione di pipeline
non è supportata con i modelli classici. Se devi controllare il numero di thread del cablaggio dei worker, utilizza i modelli flessibili.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Il programma di esecuzione Dataflow non supporta le opzioni
ValueProvider
per gli argomenti Pub/Sub e i parametri di sottoscrizione. Se hai bisogno di opzioni Pub/Sub nei parametri di runtime, utilizza i modelli flessibili.
Informazioni sui parametri di runtime e sull'interfaccia ValueProvider
L'interfaccia ValueProvider
consente alle pipeline di accettare parametri di runtime. Apache Beam fornisce tre tipi di oggetti ValueProvider
.
Nome | Descrizione |
---|---|
RuntimeValueProvider |
Puoi utilizzare Utilizza |
StaticValueProvider |
Utilizza |
NestedValueProvider |
Utilizza |
Utilizzare i parametri di runtime nel codice della pipeline
Questa sezione illustra come utilizzare ValueProvider
,
StaticValueProvider
e NestedValueProvider
.
Utilizzare ValueProvider
nelle opzioni della pipeline
Utilizza ValueProvider
per tutte le opzioni della pipeline che vuoi impostare o utilizzare in fase di esecuzione.
Ad esempio, lo snippet di codice WordCount
seguente non supporta i parametri di runtime. Il codice aggiunge un'opzione di file di input, crea una pipeline e legge le righe dal file di input:
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Per aggiungere il supporto dei parametri di runtime, modifica l'opzione del file di input da utilizzare
ValueProvider
.
Java
Utilizza ValueProvider<String>
anziché String
per il tipo di opzione del file di input.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Sostituisci add_argument
con add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Utilizzare ValueProvider
nelle funzioni
Per utilizzare i valori dei parametri di runtime nelle tue funzioni, aggiorna le funzioni
in modo che utilizzino i parametri ValueProvider
.
L'esempio seguente contiene un'opzione ValueProvider
di tipo intero e una semplice funzione che aggiunge un intero. La funzione dipende dall'intero ValueProvider
. Durante l'esecuzione, la pipeline
applica MySumFn
a ogni numero intero in un PCollection
che contiene [1, 2, 3]
. Se il valore di runtime è 10, il valore risultante
PCollection
contiene [11, 12, 13]
.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Utilizza StaticValueProvider
Per fornire un valore statico alla pipeline, utilizza
StaticValueProvider
.
Questo esempio utilizza MySumFn
, che è un DoFn
che accetta un ValueProvider<Integer>
. Se conosci il valore del parametro in anticipo, puoi utilizzare StaticValueProvider
per specificare il valore statico come ValueProvider
.
Java
Questo codice recupera il valore al momento dell'esecuzione della pipeline:
.apply(ParDo.of(new MySumFn(options.getInt())))
In alternativa, puoi utilizzare StaticValueProvider
con un valore statico:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Questo codice recupera il valore al momento dell'esecuzione della pipeline:
beam.ParDo(MySumFn(user_options.templated_int))
In alternativa, puoi utilizzare StaticValueProvider
con un valore statico:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Puoi utilizzare StaticValueProvider
anche quando implementi un modulo I/O che supporta sia i parametri regolari sia i parametri di runtime.
StaticValueProvider
riduce la duplicazione del codice dovuta all'implementazione di due metodi simili.
Java
Il codice sorgente di questo esempio proviene da TextIO.java su GitHub di Apache Beam.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
In questo esempio, è presente un singolo costruttore che accetta sia un argomento string
che un ValueProvider
. Se l'argomento è un string
, viene convertito in un StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Utilizza NestedStaticValueProvider
Per calcolare un valore da un altro oggetto ValueProvider
, utilizza
NestedValueProvider
.
NestedValueProvider
accetta come input un traduttore ValueProvider
e un traduttore SerializableFunction
. Quando chiami
.get()
su un NestedValueProvider
, il traduttore
crea un nuovo valore in base al valore ValueProvider
. Questa traduzione consente di utilizzare un valore ValueProvider
per creare il valore finale desiderato.
Nell'esempio seguente, l'utente fornisce il nome file file.txt
. La
trasformazione antepone il percorso gs://directory_name/
al
nome del file. La chiamata a .get()
restituisce
gs://directory_name/file.txt
.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Utilizzare i metadati nel codice della pipeline
Puoi estendere il modello con metadati aggiuntivi in modo che i parametri personalizzati vengano convalidati quando il modello viene eseguito. Se vuoi creare metadati per il modello, segui questi passaggi:
- Crea un file in formato JSON denominato
TEMPLATE_NAME_metadata
utilizzando i parametri in Parametri dei metadati e il formato in File di metadati di esempio. SostituisciTEMPLATE_NAME
con il nome del modello.Assicurati che il file dei metadati non abbia un'estensione del nome. Ad esempio, se il nome del modello è
myTemplate
, il file dei metadati deve esseremyTemplate_metadata
. - Archivia il file di metadati in Cloud Storage nella stessa cartella del modello.
Parametri dei metadati
Chiave parametro | Obbligatorio | Descrizione del valore | |
---|---|---|---|
name |
Sì | Il nome del modello. | |
description |
No | Un breve paragrafo di testo che descrive il modello. | |
streaming |
No | Se true , questo modello supporta lo streaming. Il valore predefinito è
false . |
|
supportsAtLeastOnce |
No | Se true , questo modello supporta l'elaborazione almeno una volta. Il valore predefinito è false . Imposta questo parametro su true se il modello è progettato per funzionare con la modalità flusso di dati Almeno una volta.
|
|
supportsExactlyOnce |
No | Se true , questo modello supporta
l'elaborazione exactly-once. Il valore predefinito è true . |
|
defaultStreamingMode |
No | La modalità di streaming predefinita per i modelli che supportano sia la modalità almeno una volta sia la modalità esattamente una volta. Utilizza uno dei seguenti valori: "AT_LEAST_ONCE" ,
"EXACTLY_ONCE" . Se non specificato, la modalità di streaming predefinita è esattamente una volta.
|
|
parameters |
No | Un array di parametri aggiuntivi utilizzati dal modello. Per impostazione predefinita viene utilizzato un array vuoto. | |
name |
Sì | Il nome del parametro utilizzato nel modello. | |
label |
Sì | Una stringa leggibile che viene utilizzata nella console Google Cloud per etichettare il parametro. | |
helpText |
Sì | Un breve paragrafo di testo che descrive il parametro. | |
isOptional |
No | false se il parametro è obbligatorio e true se è facoltativo. A meno che non venga impostato un valore, il valore predefinito di isOptional è false .
Se non includi questa chiave di parametro per i metadati, questi diventano un parametro obbligatorio. |
|
regexes |
No | Un array di espressioni regolari POSIX-egrep in formato stringa utilizzato per convalidare il valore del parametro. Ad esempio, ["^[a-zA-Z][a-zA-Z0-9]+"] è un'unica expressione regolare che convalida che il valore inizi con una lettera e contenga uno o più caratteri. Per impostazione predefinita viene utilizzato un array vuoto. |
File di metadati di esempio
Java
Il servizio Dataflow utilizza i seguenti metadati per convalidare i parametri personalizzati del modello WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Il servizio Dataflow utilizza i seguenti metadati per convalidare i parametri personalizzati del modello WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Puoi scaricare i file di metadati per i modelli forniti da Google dalla directory dei modelli di Dataflow.
Connettori I/O della pipeline e ValueProvider
supportati
Java
Alcuni connettori I/O contengono metodi che accettano oggetti ValueProvider
. Per determinare il supporto di un connettore e un metodo specifici, consulta la documentazione di riferimento dell'API per il connettore I/O. I metodi supportati hanno un sovraccarico con un
ValueProvider
. Se un metodo non ha un sovraccarico, non supporta i parametri di runtime. I seguenti connettori I/O hanno un supporto ValueProvider
almeno parziale:
- I/O basati su file:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(richiede SDK 2.3.0 o versioni successive)PubSubIO
SpannerIO
Python
Alcuni connettori I/O contengono metodi che accettano oggetti ValueProvider
. Per determinare il supporto dei connettori I/O
e dei relativi metodi, consulta la documentazione di riferimento dell'API
per il connettore. I seguenti connettori di I/O accettano parametri di runtime:
- I/O basati su file:
textio
,avroio
,tfrecordio
Creare e eseguire il deployment di un modello classico
Dopo aver scritto la pipeline, devi creare e archiviare il file del modello. Quando crei e gestisci temporaneamente un modello, la posizione di gestione temporanea contiene file aggiuntivi necessari per l'esecuzione del modello. Se elimini la posizione di staging, l'esecuzione del modello non va a buon fine. Il job Dataflow non viene eseguito immediatamente dopo l'archiviazione del modello. Per eseguire un job Dataflow basato su un modello personalizzato, puoi utilizzare la console , l'API REST Dataflow o l'interfaccia a riga di comando gcloud.
L'esempio seguente mostra come eseguire il commit di un file modello:
Java
Questo comando Maven crea e gestisce un modello nella posizione Cloud Storage
specificata con --templateLocation
.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
Verifica che il percorso templateLocation
sia corretto. Sostituisci quanto segue:
com.example.myclass
: la tua classe JavaPROJECT_ID
: il tuo ID progettoBUCKET_NAME
: il nome del bucket Cloud StorageTEMPLATE_NAME
: il nome del modelloREGION
: la regione in cui eseguire il deployment del job Dataflow
Python
Questo comando Python crea e gestisce un modello nella posizione Cloud Storage specificata con --template_location
.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
Verifica che il percorso template_location
sia corretto. Sostituisci quanto segue:
examples.mymodule
: il tuo modulo PythonPROJECT_ID
: il tuo ID progettoBUCKET_NAME
: il nome del bucket Cloud StorageTEMPLATE_NAME
: il nome del modelloREGION
: la regione in cui eseguire il deployment del job Dataflow
Dopo aver creato e archiviato il modello, il passaggio successivo consiste nell'eseguire il modello.