Imposta le opzioni della pipeline Dataflow

Questa pagina spiega come impostare le opzioni della pipeline per i job Dataflow. Queste opzioni configurano come e dove viene eseguita la pipeline e quali risorse utilizza.

L'esecuzione della pipeline è separata da quella del programma Apache Beam. Il programma Apache Beam che hai scritto crea una pipeline per l'esecuzione differita. Ciò significa che il programma genera una serie di passaggi che qualsiasi runner Apache Beam supportato può eseguire. I runner compatibili includono l'esecutore di Dataflow su Google Cloud e l'esecutore diretto che esegue la pipeline direttamente in un ambiente locale.

Puoi passare i parametri in un job Dataflow in fase di runtime. Per ulteriori informazioni sull'impostazione delle opzioni della pipeline in fase di runtime, consulta Configurazione delle opzioni della pipeline.

Utilizza le opzioni della pipeline con gli SDK Apache Beam

Puoi usare i seguenti SDK per impostare le opzioni della pipeline per i job Dataflow:

  • SDK Apache Beam per Python
  • SDK Apache Beam per Java
  • SDK Apache Beam per Go

Per utilizzare gli SDK, devi impostare il runner della pipeline e altri parametri di esecuzione utilizzando la classe dell'SDK Apache Beam PipelineOptions.

Esistono due metodi per specificare le opzioni della pipeline:

  • Imposta le opzioni della pipeline in modo programmatico fornendo un elenco di opzioni della pipeline.
  • Imposta le opzioni della pipeline direttamente dalla riga di comando quando esegui il codice della pipeline.

Imposta le opzioni della pipeline in modo programmatico

Puoi impostare le opzioni della pipeline in modo programmatico creando e modificando un oggetto PipelineOptions.

Java

Crea un oggetto PipelineOptions utilizzando il metodo PipelineOptionsFactory.fromArgs.

Per un esempio, consulta la sezione Avvio su Dataflow di esempio in questa pagina.

Python

Crea un oggetto PipelineOptions.

Per un esempio, consulta la sezione Avvio su Dataflow di esempio in questa pagina.

Go

L'impostazione programmatica delle opzioni della pipeline mediante PipelineOptions non è supportata nell'SDK Apache Beam per Go. Utilizza gli argomenti della riga di comando Go.

Per un esempio, consulta la sezione Avvio su Dataflow di esempio in questa pagina.

Imposta le opzioni della pipeline dalla riga di comando

Puoi impostare le opzioni della pipeline utilizzando gli argomenti della riga di comando.

Java

La sintassi di esempio seguente proviene dalla pipeline WordCount nella guida rapida di Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del tuo progetto Google Cloud
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • REGION: una regione Dataflow, us-central1

Python

La sintassi di esempio seguente è della pipeline WordCount nella guida rapida di Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Sostituisci quanto segue:

  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job di Dataflow, ad esempio europe-west1

    Il flag --region sostituisce la regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.

  • STORAGE_BUCKET: nome del bucket Cloud Storage

  • PROJECT_ID: l'ID del progetto Google Cloud

Go

La sintassi di esempio seguente proviene dalla pipeline WordCount nella guida rapida di Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Sostituisci quanto segue:

  • BUCKET_NAME: nome del bucket Cloud Storage

  • PROJECT_ID: l'ID del progetto Google Cloud

  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow. Ad esempio: europe-west1. Il flag --region sostituisce la regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.

Imposta opzioni pipeline sperimentali

Negli SDK Java, Python e Go, l'opzione della pipeline experiments abilita funzionalità di Dataflow sperimentali o pre-GA.

Imposta in modo programmatico

Per impostare l'opzione experiments in modo programmatico, utilizza la sintassi seguente.

Java

Nell'oggetto PipelineOptions, includi l'opzione experiments utilizzando la seguente sintassi. In questo esempio, le dimensioni del disco di avvio vengono impostate su 80 GB con il flag dell'esperimento.

options.setExperiments("streaming_boot_disk_size_gb=80")

Per un esempio che mostra come creare l'oggetto PipelineOptions, consulta la sezione Avvia su Dataflow di esempio in questa pagina.

Python

Nell'oggetto PipelineOptions, includi l'opzione experiments utilizzando la seguente sintassi. In questo esempio, le dimensioni del disco di avvio vengono impostate su 80 GB con il flag dell'esperimento.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

Per un esempio che mostra come creare l'oggetto PipelineOptions, consulta la sezione Avvia su Dataflow di esempio in questa pagina.

Go

L'impostazione programmatica delle opzioni della pipeline mediante PipelineOptions non è supportata nell'SDK Apache Beam per Go. Utilizza gli argomenti della riga di comando Go.

Imposta dalla riga di comando

Per impostare l'opzione experiments nella riga di comando, utilizza la seguente sintassi.

Java

In questo esempio, le dimensioni del disco di avvio vengono impostate su 80 GB con il flag dell'esperimento.

--experiments=streaming_boot_disk_size_gb=80

Python

In questo esempio, le dimensioni del disco di avvio vengono impostate su 80 GB con il flag dell'esperimento.

--experiments=streaming_boot_disk_size_gb=80

Go

In questo esempio, le dimensioni del disco di avvio vengono impostate su 80 GB con il flag dell'esperimento.

--experiments=streaming_boot_disk_size_gb=80

Imposta in un modello

Per abilitare una funzionalità sperimentale durante l'esecuzione di un modello Dataflow, utilizza il flag --additional-experiments.

Modello classico

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

modello flessibile

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Accedi all'oggetto opzioni pipeline

Quando crei l'oggetto Pipeline nel programma Apache Beam, supera PipelineOptions. Quando il servizio Dataflow esegue la pipeline, invia una copia di PipelineOptions a ogni worker.

Java

Accedi a PipelineOptions all'interno di qualsiasi istanza DoFn della trasformazione ParDo utilizzando il metodo ProcessContext.getPipelineOptions.

Python

Questa funzionalità non è supportata nell'SDK Apache Beam per Python.

Go

Accedi alle opzioni della pipeline utilizzando beam.PipelineOptions.

Avvia su Dataflow

Esegui il job su risorse Google Cloud gestite utilizzando il servizio runner Dataflow. L'esecuzione della tua pipeline con Dataflow crea un job Dataflow che utilizza le risorse di Compute Engine e Cloud Storage nel tuo progetto Google Cloud. Per informazioni sulle autorizzazioni Dataflow, consulta Sicurezza e autorizzazioni di Dataflow.

I job Dataflow utilizzano Cloud Storage per archiviare i file temporanei durante l'esecuzione della pipeline. Per evitare che ti vengano addebitati costi di archiviazione non necessari, disattiva la funzionalità di eliminazione temporanea sui bucket utilizzati dai job Dataflow per l'archiviazione temporanea. Per maggiori informazioni, consulta Rimuovere un criterio di eliminazione temporanea da un bucket.

Imposta le opzioni richieste

Per eseguire la pipeline utilizzando Dataflow, imposta le seguenti opzioni della pipeline:

Java

  • project: l'ID del tuo progetto Google Cloud.
  • runner: l'esecutore della pipeline che esegue la pipeline. Per l'esecuzione di Google Cloud, deve essere DataflowRunner.
  • gcpTempLocation: un percorso Cloud Storage per consentire a Dataflow di archiviare la maggior parte dei file temporanei. Se vuoi specificare un bucket, devi crearlo in anticipo. Se non imposti gcpTempLocation, puoi impostare l'opzione della pipeline tempLocation, quindi gcpTempLocation viene impostato sul valore di tempLocation. Se nessuno dei due viene specificato, viene creato un valore predefinito di gcpTempLocation.
  • stagingLocation: un percorso Cloud Storage per Dataflow in cui inserire temporaneamente i file binari. Se utilizzi l'SDK Apache Beam 2.28 o versioni successive, non impostare questa opzione. Per l'SDK Apache Beam 2.28 o versioni precedenti, se non imposti questa opzione, per la posizione temporanea viene utilizzato ciò che hai specificato per tempLocation.

    Viene creato un valore gcpTempLocation predefinito se non è specificato né questo valore né tempLocation. Se tempLocation è specificato, ma gcpTempLocation non lo è, tempLocation deve essere un percorso Cloud Storage e il valore predefinito è gcpTempLocation. Se tempLocation non è specificato e gcpTempLocation è, tempLocation non viene compilato.

Python

  • project: il tuo ID progetto Google Cloud.
  • region: la regione per il job Dataflow.
  • runner: l'esecutore della pipeline che esegue la pipeline. Per l'esecuzione di Google Cloud, deve essere DataflowRunner.
  • temp_location: un percorso Cloud Storage per consentire a Dataflow di inserire in un'area intermedia i file temporanei del job creati durante l'esecuzione della pipeline.

Go

  • project: il tuo ID progetto Google Cloud.
  • region: la regione per il job Dataflow.
  • runner: l'esecutore della pipeline che esegue la pipeline. Per l'esecuzione di Google Cloud, deve essere dataflow.
  • staging_location: un percorso Cloud Storage per consentire a Dataflow di inserire in un'area intermedia i file temporanei del job creati durante l'esecuzione della pipeline.

Imposta le opzioni della pipeline in modo programmatico

Il codice di esempio seguente mostra come creare una pipeline impostando in modo programmatico il runner e altre opzioni obbligatorie per l'esecuzione della pipeline mediante Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

L'SDK Apache Beam per Go utilizza gli argomenti della riga di comando Go. Utilizza flag.Set() per impostare i valori dei flag.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Dopo aver creato la pipeline, specifica tutte le operazioni di lettura, trasformazione e scrittura della pipeline, poi eseguila.

Utilizzare le opzioni della pipeline dalla riga di comando

L'esempio seguente mostra come utilizzare le opzioni della pipeline specificate nella riga di comando. Questo esempio non imposta le opzioni della pipeline in modo programmatico.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Utilizza il modulo Python argparse per analizzare le opzioni della riga di comando.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Utilizza il pacchetto Go flag per analizzare le opzioni della riga di comando. Devi analizzare le opzioni prima di chiamare beam.Init(). In questo esempio, output è un'opzione della riga di comando.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Dopo aver creato la pipeline, specifica tutte le operazioni di lettura, trasformazione e scrittura della pipeline, quindi l'esecuzione.

Controlla le modalità di esecuzione

Quando un programma Apache Beam esegue una pipeline su un servizio come Dataflow, può eseguire la pipeline in modo asincrono o bloccare fino al completamento della pipeline. Puoi modificare questo comportamento utilizzando le indicazioni seguenti.

Java

Quando un programma Java Apache Beam esegue una pipeline su un servizio come Dataflow, viene generalmente eseguito in modo asincrono. Per eseguire una pipeline e attendere il completamento del job, imposta DataflowRunner come runner e chiama esplicitamente pipeline.run().waitUntilFinish().

Quando utilizzi DataflowRunner e chiami waitUntilFinish() sull'oggetto PipelineResult restituito da pipeline.run(), la pipeline viene eseguita su Google Cloud, ma il codice locale attende il completamento del job cloud e restituisce l'oggetto DataflowPipelineJob finale. Durante l'esecuzione del job, il servizio Dataflow stampa gli aggiornamenti dello stato del job e i messaggi della console mentre è in attesa.

Python

Quando un programma Python Apache Beam esegue una pipeline su un servizio come Dataflow, viene generalmente eseguito in modo asincrono. Per bloccare fino al completamento della pipeline, usa il metodo wait_until_finish() dell'oggetto PipelineResult, restituito dal metodo run() dell'esecuzione.

Go

Quando un programma Apache Beam Go esegue una pipeline su Dataflow, è sincrono per impostazione predefinita e si blocca fino al completamento della pipeline. Se non vuoi bloccare, hai due opzioni:

  1. Avvia il job in una routine Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Utilizza il flag della riga di comando --async, che si trova nel pacchetto jobopts.

Per visualizzare i dettagli dell'esecuzione, monitorare l'avanzamento e verificare lo stato di completamento del job, utilizza l'interfaccia di monitoraggio di Dataflow o l'interfaccia a riga di comando di Dataflow.

Usa origini di flussi di dati

Java

Se la pipeline legge da un'origine dati non limitata, come Pub/Sub, viene eseguita automaticamente in modalità flusso.

Python

Se la pipeline utilizza un'origine dati non limitata, come Pub/Sub, devi impostare l'opzione streaming su true.

Go

Se la pipeline legge da un'origine dati non limitata, come Pub/Sub, viene eseguita automaticamente in modalità flusso.

I job di flussi di dati utilizzano un tipo di macchina di Compute Engine pari o superiore a n1-standard-2 per impostazione predefinita.

Avvia localmente

Anziché eseguire la pipeline su risorse cloud gestite, puoi scegliere di eseguirla localmente. L'esecuzione locale presenta determinati vantaggi per i test, il debug o l'esecuzione della pipeline su set di dati di dimensioni ridotte. Ad esempio, l'esecuzione locale rimuove la dipendenza dal servizio Dataflow remoto e dal progetto Google Cloud associato.

Quando utilizzi l'esecuzione locale, devi eseguire la pipeline con set di dati di dimensioni ridotte sufficientemente alla memoria locale. Puoi creare un set di dati in memoria di piccole dimensioni utilizzando una trasformazione Create oppure utilizzare una trasformazione Read per lavorare con piccoli file locali o remoti. L'esecuzione locale offre in genere un modo più rapido e semplice per eseguire test e debug con meno dipendenze esterne, ma è limitata dalla memoria disponibile nel tuo ambiente locale.

Il codice di esempio seguente mostra come costruire una pipeline che viene eseguita nel tuo ambiente locale.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Dopo aver creato la pipeline, eseguila.

Crea opzioni di pipeline personalizzate

Puoi aggiungere le tue opzioni personalizzate oltre a quelle PipelineOptions standard. La riga di comando di Apache Beam può anche analizzare opzioni personalizzate utilizzando argomenti della riga di comando specificati nello stesso formato.

Java

Per aggiungere opzioni personalizzate, definisci un'interfaccia con metodi getter e setter per ogni opzione, come nell'esempio seguente:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Per aggiungere le tue opzioni, usa il metodo add_argument() (che si comporta esattamente come il modulo argparse standard di Python), come nell'esempio seguente:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Per aggiungere opzioni personalizzate, utilizza il pacchetto di flag Go come mostrato nel seguente esempio:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Puoi anche specificare una descrizione, che viene visualizzata quando un utente passa --help come argomento della riga di comando, nonché un valore predefinito.

Java

Puoi impostare la descrizione e il valore predefinito utilizzando le annotazioni, come indicato di seguito:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Ti consigliamo di registrare la tua interfaccia con PipelineOptionsFactory e poi passare l'interfaccia durante la creazione dell'oggetto PipelineOptions. Quando registri la tua interfaccia con PipelineOptionsFactory, --help può trovare la tua interfaccia delle opzioni personalizzate e aggiungerla all'output del comando --help. PipelineOptionsFactory verifica che le opzioni personalizzate siano compatibili con tutte le altre opzioni registrate.

Il codice di esempio seguente mostra come registrare la tua interfaccia delle opzioni personalizzate con PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Ora la tua pipeline può accettare --myCustomOption=value come argomento della riga di comando.

Python

Puoi impostare la descrizione e il valore predefinito come segue:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Puoi impostare la descrizione e il valore predefinito come segue:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)