In questa pagina viene spiegato come impostare opzioni della pipeline per il tuo di job Dataflow. Queste opzioni della pipeline configurano come e dove le esecuzioni della pipeline e le risorse che utilizza.
L'esecuzione della pipeline è separata da Apache Beam dell'esecuzione del programma. Il programma Apache Beam che hai scritto costruisce una pipeline per l'esecuzione differita. Ciò significa che il programma genera serie di passi eseguibili da qualsiasi runner Apache Beam supportato. I runner compatibili sono quelli di Dataflow Google Cloud e il runner diretto che esegue la pipeline direttamente in dell'ambiente locale.
Puoi passare i parametri in un job Dataflow in fase di runtime. Per ulteriori informazioni sull'impostazione delle opzioni di pipeline in fase di esecuzione, consulta Configurare le opzioni di pipeline.
Usa le opzioni della pipeline con gli SDK Apache Beam
Puoi usare i seguenti SDK per impostare le opzioni della pipeline per i job Dataflow:
- SDK Apache Beam per Python
- SDK Apache Beam per Java
- SDK Apache Beam per Go
Per utilizzare gli SDK, imposta il runner della pipeline e altri parametri di esecuzione
utilizzando la classe dell'SDK Apache Beam PipelineOptions
.
Esistono due metodi per specificare le opzioni della pipeline:
- Imposta le opzioni della pipeline in modo programmatico fornendo un elenco di opzioni per la pipeline.
- Imposta le opzioni della pipeline direttamente sulla riga di comando quando esegui il codice della pipeline.
Impostare le opzioni della pipeline in modo programmatico
Puoi impostare le opzioni della pipeline in modo programmatico creando e modificando
PipelineOptions
oggetto.
Java
Costruire un
PipelineOptions
utilizzando il metodo PipelineOptionsFactory.fromArgs
.
Per un esempio, vedi Esempio di lancio di Dataflow in questa pagina.
Python
Crea un oggetto
PipelineOptions
.
Per un esempio, consulta la sezione Esempio di lancio su Dataflow in questa pagina.
Vai
L'impostazione delle opzioni di pipeline in modo programmatico utilizzando PipelineOptions
non è supportata nell'SDK Apache Beam per Go. Utilizza gli argomenti della riga di comando Go.
Per un esempio, vedi Esempio di lancio di Dataflow in questa pagina.
Imposta opzioni pipeline sulla riga di comando
Puoi impostare le opzioni della pipeline utilizzando gli argomenti della riga di comando.
Java
La seguente sintassi di esempio proviene dalla pipeline WordCount
in
Guida rapida di Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto Google CloudBUCKET_NAME
: il nome del tuo bucket Cloud StorageREGION
: una regione Dataflow,us-central1
Python
La sintassi dell'esempio seguente è tratta dalla pipeline WordCount
nel
quickstart di Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION
: il regione in cui vuoi eseguire il deployment del job Dataflow, ad esempioeurope-west1
Il flag
--region
sostituisce la regione predefinita che è impostato nel server dei metadati, nel client locale o nell'ambiente come la codifica one-hot delle variabili categoriche.STORAGE_BUCKET
: il Nome bucket Cloud StoragePROJECT_ID
: l'ID del progetto Google Cloud
Vai
La seguente sintassi di esempio proviene dalla pipeline WordCount
in
Guida rapida di Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Sostituisci quanto segue:
BUCKET_NAME
: il nome del bucket Cloud StoragePROJECT_ID
: l'ID progetto Google CloudDATAFLOW_REGION
: il region in cui vuoi eseguire il deployment del job Dataflow. Ad esempio:europe-west1
. Il flag--region
sostituisce la regione predefinita impostata nei metadati del server, del client locale o delle variabili di ambiente.
Imposta opzioni sperimentali della pipeline
Negli SDK Java, Python e Go, il experiments
opzione pipeline
abilita le funzionalità Dataflow sperimentali o pre-GA.
Imposta in modo programmatico
Per impostare l'opzione experiments
in modo programmatico, utilizza la seguente sintassi.
Java
Nel tuo
PipelineOptions
includi l'opzione experiments
utilizzando la seguente sintassi.
In questo esempio la dimensione del disco di avvio viene impostata su 80 GB con il flag dell'esperimento.
options.setExperiments("streaming_boot_disk_size_gb=80")
Per un esempio che mostra come creare l'oggetto PipelineOptions
, consulta le
Esempio di lancio di Dataflow
in questa pagina.
Python
Nel tuo
PipelineOptions
includi l'opzione experiments
utilizzando la seguente sintassi.
In questo esempio la dimensione del disco di avvio viene impostata su 80 GB con il flag dell'esperimento.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Per un esempio che mostra come creare l'oggetto PipelineOptions
, consulta le
Esempio di lancio di Dataflow
in questa pagina.
Vai
L'impostazione programmatica delle opzioni della pipeline utilizzando PipelineOptions
non è
supportato nell'SDK Apache Beam per Go. Utilizzare gli argomenti della riga di comando Go.
Impostato sulla riga di comando
Per impostare l'opzione experiments
nella riga di comando, utilizza la seguente sintassi.
Java
In questo esempio la dimensione del disco di avvio viene impostata su 80 GB con il flag dell'esperimento.
--experiments=streaming_boot_disk_size_gb=80
Python
In questo esempio la dimensione del disco di avvio viene impostata su 80 GB con il flag dell'esperimento.
--experiments=streaming_boot_disk_size_gb=80
Vai
In questo esempio la dimensione del disco di avvio viene impostata su 80 GB con il flag dell'esperimento.
--experiments=streaming_boot_disk_size_gb=80
Imposta in un modello
ad abilitare una funzionalità sperimentale durante l'esecuzione di un Dataflow
usa il flag --additional-experiments
.
Modello classico
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
modello flessibile
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Accedi all'oggetto opzioni della pipeline
Quando crei l'oggetto Pipeline
nel programma Apache Beam, passa
PipelineOptions
. Quando viene eseguito il servizio Dataflow
dalla tua pipeline, invia una copia di PipelineOptions
a ogni worker.
Java
Accedi a PipelineOptions
all'interno di qualsiasi istanza DoFn
della trasformazione ParDo
utilizzando il metodo ProcessContext.getPipelineOptions
.
Python
Questa funzionalità non è supportata nell'SDK Apache Beam per Python.
Vai
Accedi alle opzioni della pipeline utilizzando beam.PipelineOptions
.
Lancio su Dataflow
Esegui il tuo job su risorse Google Cloud gestite utilizzando Servizio runner Dataflow. Esegui la pipeline con Dataflow crea un job Dataflow, che utilizza Risorse Compute Engine e Cloud Storage nel tuo Google Cloud progetto. Per informazioni sulle autorizzazioni di Dataflow, consulta Sicurezza e autorizzazioni di Dataflow.
I job Dataflow utilizzano Cloud Storage per archiviare i file temporanei durante l'esecuzione della pipeline. Per evitare che ti vengano addebitati costi di archiviazione non necessari, disattiva la funzionalità di eliminazione temporanea sui bucket I job Dataflow vengono utilizzati per l'archiviazione temporanea. Per ulteriori informazioni, consulta Rimuovere un criterio di eliminazione temporanea da un bucket.
Imposta le opzioni richieste
Per eseguire la pipeline utilizzando Dataflow, imposta quanto segue delle opzioni della pipeline:
Java
project
: l'ID del tuo progetto Google Cloud.runner
: il runner della pipeline che esegue la pipeline. Per Esecuzione Google Cloud, deve essereDataflowRunner
.gcpTempLocation
: un percorso Cloud Storage per Dataflow per organizzare la maggior parte dei file temporanei. Se vuoi se specifichi un bucket, devi crearlo in anticipo. In caso contrario impostagcpTempLocation
, puoi impostare l'opzione pipelinetempLocation
e alloragcpTempLocation
è impostato sul valoretempLocation
. Se nessuna delle due opzioni vengono specificati, viene creato un valoregcpTempLocation
predefinito.stagingLocation
: un percorso Cloud Storage per Dataflow per organizzare i file binari. Se utilizzi l'SDK Apache Beam 2.28 o versioni successive, non impostare questa opzione. Per Apache Beam SDK 2.28 o versioni precedenti. Se non imposti questa opzione, viene specificato pertempLocation
viene usato per la posizione temporanea.Viene creato un
gcpTempLocation
predefinito se né questo nétempLocation
sono specificati. SetempLocation
è specificato egcpTempLocation
non lo è,tempLocation
deve essere un percorso Cloud Storage egcpTempLocation
assume questo valore per impostazione predefinita. SetempLocation
non è specificato egcpTempLocation
lo è,tempLocation
non viene compilato.
Python
project
: l'ID del tuo progetto Google Cloud.region
: la regione per il tuo job Dataflow.runner
: l'esecutore della pipeline che esegue la pipeline. Per Esecuzione Google Cloud, deve essereDataflowRunner
.temp_location
: un percorso Cloud Storage per Dataflow per organizzare i file di job temporanei creati durante dell'esecuzione della pipeline.
Vai
project
: l'ID del tuo progetto Google Cloud.region
: la regione per il tuo job Dataflow.runner
: l'esecutore della pipeline che esegue la pipeline. Per l'esecuzione su Google Cloud, deve esseredataflow
.staging_location
: un percorso Cloud Storage per Dataflow per organizzare i file di job temporanei creati durante dell'esecuzione della pipeline.
Imposta le opzioni della pipeline in modo programmatico
Il codice di esempio seguente mostra come costruire una pipeline mediante impostando in modo programmatico il runner e le altre opzioni richieste per eseguire pipeline utilizzando Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Vai
L'SDK Apache Beam per Go utilizza argomenti della riga di comando Go. Utilizza le funzionalità di
flag.Set()
per impostare i valori del flag.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Dopo aver creato la pipeline, specifica tutte le letture, trasforma, scrive ed esegue la pipeline.
Usa le opzioni della pipeline dalla riga di comando
L'esempio seguente mostra come utilizzare le opzioni della pipeline specificate la riga di comando. Questo esempio non imposta le opzioni della pipeline in modo programmatico.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Utilizza la modulo Python argparse per analizzare le opzioni della riga di comando.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Vai
Utilizza il pacchetto Go flag
per analizzare
le opzioni della riga di comando. Devi analizzare le opzioni prima di chiamare
beam.Init()
. In questo esempio, output
è un'opzione della riga di comando.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Dopo aver costruito la pipeline, specifica tutte le letture, le trasformazioni e le scritture della pipeline, quindi eseguila.
Controlla le modalità di esecuzione
Quando un programma Apache Beam esegue una pipeline su un servizio come Dataflow, il programma può eseguire la pipeline in modo asincrono, o bloccarlo fino al completamento della pipeline. Puoi modificare questo comportamento utilizzando le seguenti indicazioni.
Java
Quando un programma Java Apache Beam esegue una pipeline su un servizio come
Dataflow, viene generalmente eseguito in modo asincrono. Per eseguire un
e attendi il completamento del job, imposta DataflowRunner
come
runner della pipeline e richiama esplicitamente pipeline.run().waitUntilFinish()
.
Quando usi DataflowRunner
e chiami il numero waitUntilFinish()
sul
Oggetto PipelineResult
restituito da pipeline.run()
, la pipeline esegue
su Google Cloud, ma il codice locale attende il completamento del job cloud
restituiscono l'oggetto DataflowPipelineJob
finale. Durante l'esecuzione del job,
Il servizio Dataflow stampa gli aggiornamenti sullo stato del job e i messaggi della console
mentre attende.
Python
Quando un programma Python Apache Beam esegue una pipeline su un servizio come
Dataflow, viene generalmente eseguito in modo asincrono. Per bloccare il processo fino al completamento della pipeline, utilizza il metodo wait_until_finish()
dell'oggetto PipelineResult
, restituito dal metodo run()
del runner.
Vai
Quando un programma Apache Beam Go esegue una pipeline su Dataflow, è sincrona per impostazione predefinita e si blocca fino al completamento della pipeline. Se non vuoi bloccare, ci sono due opzioni:
Avvia il job in una routine Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Utilizza il flag
--async
della riga di comando, che si trova nel pacchettojobopts
.
Per visualizzare i dettagli dell'esecuzione, monitorare l'avanzamento e verificare lo stato di completamento del job, utilizza la Interfaccia di monitoraggio di Dataflow o il Interfaccia a riga di comando di Dataflow.
Usa origini di flussi di dati
Java
Se la pipeline legge da un'origine dati illimitata, come Pub/Sub, la pipeline viene eseguita automaticamente in modalità flusso.
Python
Se la tua pipeline utilizza un'origine dati illimitata, come Pub/Sub,
devi impostare l'opzione streaming
su true.
Vai
Se la pipeline legge da un'origine dati illimitata, come Pub/Sub, la pipeline viene eseguita automaticamente in modalità flusso.
Per impostazione predefinita, i job di streaming utilizzano un tipo di macchina di Compute Engine di n1-standard-2
o superiore.
Lancia a livello locale
Anziché eseguire la pipeline su risorse cloud gestite, puoi scegliere di eseguire la pipeline localmente. L'esecuzione locale presenta alcuni vantaggi per il test, il debug o l'esecuzione della pipeline su piccoli set di dati. Ad esempio: l'esecuzione locale rimuove la dipendenza dall'ambiente Dataflow e il progetto Google Cloud associato.
Quando utilizzi l'esecuzione locale, devi eseguire la pipeline con set di dati di dimensioni ridotte
abbastanza da essere nella memoria locale. Puoi creare un modello di archiviazione
utilizzando una trasformazione Create
oppure puoi utilizzare una trasformazione Read
lavorare con file locali o remoti di piccole dimensioni. L'esecuzione locale in genere offre un modo più rapido e semplice per eseguire test e debug con meno dipendenze esterne, ma è limitata dalla memoria disponibile nell'ambiente locale.
Il codice di esempio seguente mostra come costruire una pipeline in esecuzione dell'ambiente locale.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Vai
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Dopo aver creato la pipeline, eseguila.
Crea opzioni di pipeline personalizzate
Oltre alle opzioni standard, puoi aggiungere le tue opzioni personalizzate
PipelineOptions
. La riga di comando di Apache Beam può anche analizzare
utilizzando argomenti della riga di comando specificati nello stesso formato.
Java
Per aggiungere opzioni personalizzate, definisci un'interfaccia con i metodi getter e setter per ciascuna opzione, come nell'esempio seguente:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Per aggiungere le tue opzioni, utilizza il metodo add_argument()
(che si comporta
esattamente come lo standard Python
modulo argparse),
come nell'esempio seguente:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Vai
Per aggiungere le tue opzioni, utilizza il pacchetto di flag Go come mostrato nell' esempio seguente:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
Puoi anche specificare una descrizione, che viene visualizzata quando un utente passa --help
come argomento della riga di comando, e un valore predefinito.
Java
Puoi impostare la descrizione e il valore predefinito utilizzando le annotazioni, come indicato di seguito:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Ti consigliamo di registrare l'interfaccia con PipelineOptionsFactory
e poi di passarla quando crei l'oggetto PipelineOptions
. Quando
registri la tua interfaccia con PipelineOptionsFactory
, il --help
può
trova l'interfaccia delle opzioni personalizzate e aggiungila all'output di --help
. PipelineOptionsFactory
verifica che le tue opzioni personalizzate siano
compatibili con tutte le altre opzioni registrate.
Il seguente codice di esempio mostra come registrare l'interfaccia delle opzioni personalizzate
con PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Ora la tua pipeline può accettare --myCustomOption=value
come riga di comando
.
Python
Imposta la descrizione e il valore predefinito come segue:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Vai
Imposta la descrizione e il valore predefinito come segue:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)