Impostazione delle opzioni pipeline

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questa pagina spiega come impostare le opzioni della pipeline per i job Dataflow. Queste opzioni di pipeline configurano come e dove viene eseguita la tua pipeline e quali risorse utilizza.

L'esecuzione della pipeline è separata dall'esecuzione del programma Apache Beam. Il programma Apache Beam scritto da te costituisce una pipeline per l'esecuzione differita. Ciò significa che il programma genera una serie di passaggi che possono essere eseguiti da qualsiasi runner Apache Beam supportato. I runner compatibili includono il runner Dataflow su Google Cloud e il runner diretto che esegue la pipeline direttamente in un ambiente locale.

Utilizzo delle opzioni della pipeline

Puoi impostare il runner della pipeline e altri parametri di esecuzione usando la classe dell'SDK Apache Beam PipelineOptions.

Esistono due metodi per specificare le opzioni della pipeline:

  • Impostali in modo programmatico fornendo un elenco di opzioni di pipeline.
  • Impostali direttamente sulla riga di comando quando esegui il codice della pipeline.

Impostazione delle opzioni di pipeline in modo programmatico

Puoi impostare le opzioni della pipeline in modo programmatico mediante la creazione e la modifica di un oggetto PipelineOptions.

Java

Crea un oggetto PipelineOptions utilizzando il metodo PipelineOptionsFactory.fromArgs.

Per un esempio, consulta l'esempio di lancio su Dataflow.

Python

Crea un oggetto PipelineOptions.

Per un esempio, consulta l'esempio di lancio su Dataflow.

Go

L'impostazione programmatica delle opzioni della pipeline mediante PipelineOptions non è supportata nell'SDK Apache Beam per Go. Utilizza gli argomenti a riga di comando Go.

Per un esempio, consulta l'esempio di lancio su Dataflow.

Impostazione delle opzioni di pipeline sulla riga di comando.

Puoi impostare le opzioni della pipeline utilizzando gli argomenti a riga di comando.

Java

Per visualizzare un esempio di questa sintassi, consulta gli esempi della guida rapida di Java.

Python

Per visualizzare un esempio di questa sintassi, consulta gli esempi della guida rapida di Python.

Go

Per visualizzare un esempio di questa sintassi, consulta gli esempi di guida rapida.

Accesso all'oggetto opzioni pipeline

Passi PipelineOptions quando crei l'oggetto Pipeline nel tuo programma Apache Beam. Quando il servizio Dataflow esegue la tua pipeline, invia una copia della PipelineOptions a ogni worker.

Java

Puoi accedere a PipelineOptions all'interno di qualsiasi istanza DoFn di ParDo utilizzando il metodo ProcessContext.getPipelineOptions.

Python

Questa funzionalità non è supportata nell'SDK Apache Beam per Python.

Go

Puoi accedere alle opzioni della pipeline utilizzando beam.PipelineOptions.

Avvio in Dataflow

Puoi eseguire il tuo job sulle risorse Google Cloud gestite utilizzando il servizio runner Dataflow. L'esecuzione della pipeline con Dataflow crea un job Dataflow, che utilizza le risorse Compute Engine e Cloud Storage nel progetto Google Cloud.

Impostazione delle opzioni richieste

Per eseguire la pipeline utilizzando Dataflow, imposta le seguenti opzioni di pipeline:

Java

  • project: l'ID del tuo progetto Google Cloud.
  • runner: il runner della pipeline che esegue la tua pipeline. Per l'esecuzione di Google Cloud, questo deve essere DataflowRunner.
  • gcpTempLocation: un percorso di Cloud Storage per Dataflow per archiviare la maggior parte dei file temporanei. Se vuoi specificare un bucket, devi crearlo in anticipo. Se non imposti gcpTempLocation, puoi impostare l'opzione pipeline tempLocation, quindi gcpTempLocation viene impostato sul valore tempLocation. Se non è specificato nessuno, viene creato un elemento gcpTempLocation predefinito.
  • stagingLocation: un percorso di Cloud Storage per Dataflow per archiviare temporaneamente i file binari. Se utilizzi l'SDK Apache 2.28 o versioni successive, non impostare questa opzione. Per l'SDK Apache Beam 2.28 o versioni precedenti, se non imposti questa opzione, quello che hai specificato per tempLocation viene utilizzato per la posizione temporanea.

    Il valore predefinito di gcpTempLocation viene creato se né il valore predefinito né il valore tempLocation sono specificati. Se tempLocation è specificato e gcpTempLocation non lo è, tempLocation deve essere un percorso di Cloud Storage e gcpTempLocation per impostazione predefinita. Se tempLocation non è specificato e gcpTempLocation è, tempLocation non viene completato.

Python

  • project: l'ID del progetto Google Cloud.
  • region: l'endpoint della regione per il job Dataflow.
  • runner: il runner della pipeline che esegue la tua pipeline. Per l'esecuzione di Google Cloud, questo deve essere DataflowRunner.
  • temp_location: un percorso di Cloud Storage per Dataflow per archiviare temporaneamente i file dei job creati durante l'esecuzione della pipeline.

Go

  • project: l'ID del progetto Google Cloud.
  • region: l'endpoint della regione per il job Dataflow.
  • runner: il runner della pipeline che esegue la tua pipeline. Per l'esecuzione di Google Cloud, questo deve essere dataflow.
  • staging_location: un percorso di Cloud Storage per Dataflow per archiviare temporaneamente i file dei job creati durante l'esecuzione della pipeline.

Impostazione delle opzioni di pipeline in modo programmatico

Il codice di esempio che segue mostra come creare una pipeline impostando in modo programmatico il runner e altre opzioni richieste per eseguire la pipeline mediante Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

L'SDK Apache Beam per Go utilizza gli argomenti a riga di comando Go. Utilizza flag.Set() per impostare i valori del flag.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Dopo aver creato la pipeline, specifica tutte le operazioni di lettura, trasformazione e scrittura della pipeline ed eseguila.

Utilizzo delle opzioni della pipeline dalla riga di comando

L'esempio seguente mostra come utilizzare le opzioni della pipeline specificate nella riga di comando. Questo esempio non imposta le opzioni della pipeline in modo programmatico.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Per analizzare le opzioni della riga di comando, utilizza il modulo Python argparse.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Utilizza il pacchetto flag per analizzare le opzioni della riga di comando. Devi analizzare le opzioni prima di chiamare beam.Init(). In questo esempio, output è un'opzione della riga di comando.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Dopo aver creato la pipeline, specifica tutte le operazioni di lettura, trasformazione e scrittura della pipeline ed eseguila.

Controllare le modalità di esecuzione

Quando un programma Apache Beam esegue una pipeline su un servizio come Dataflow, può eseguirla in modo asincrono o bloccarla fino al completamento della pipeline. Puoi modificare questo comportamento utilizzando le seguenti istruzioni.

Java

Quando un programma Apache Beam Java esegue una pipeline su un servizio come Dataflow, in genere viene eseguito in modo asincrono. Per eseguire una tubazione e attendere fino al completamento del job, imposta DataflowRunner come runner della pipeline e chiama esplicitamente pipeline.run().waitUntilFinish().

Quando utilizzi DataflowRunner e chiami waitUntilFinish() sull'oggetto PipelineResult restituito da pipeline.run(), la pipeline viene eseguita su Google Cloud, ma il codice locale attende il completamento del job cloud e restituisce l'oggetto DataflowPipelineJob finale. Durante l'esecuzione del job, il servizio Dataflow stampa gli aggiornamenti dello stato dei job e i messaggi della console durante l'attesa.

Python

Quando un programma Apache Beam Python esegue una pipeline su un servizio come Dataflow, in genere viene eseguito in modo asincrono. Per bloccare fino al completamento della pipeline, utilizza il metodo wait_until_finish() dell'oggetto PipelineResult, restituito dal metodo run() del runner.

Go

Quando un programma Apache Beam Go esegue una pipeline su Dataflow, è sincrono per impostazione predefinita e si blocca fino al completamento della pipeline. Se non vuoi bloccare, sono disponibili due opzioni:

  1. Avvia il job in una routine Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Utilizza il flag della riga di comando --async, incluso nel pacchetto jobopts.

Per visualizzare i dettagli di esecuzione, monitorare l'avanzamento e verificare lo stato di completamento del job, utilizza l'interfaccia di monitoraggio di Dataflow o l'interfaccia a riga di comando di Dataflow.

Utilizzo delle origini di streaming

Java

Se la tua pipeline legge da un'origine dati senza limiti, come Pub/Sub, la pipeline viene eseguita automaticamente in modalità di streaming.

Python

Se la tua pipeline utilizza un'origine dati senza limiti, ad esempio Pub/Sub, devi impostare l'opzione streaming su true.

Go

Se la tua pipeline legge da un'origine dati senza limiti, come Pub/Sub, la pipeline viene eseguita automaticamente in modalità di streaming.

Per impostazione predefinita, i job di flusso utilizzano un tipo di macchina di Compute Engine pari o superiore a n1-standard-2.

Lancio locale

Invece di eseguire la pipeline su risorse cloud gestite, puoi scegliere di eseguire la pipeline in locale. L'esecuzione locale offre alcuni vantaggi per i test, il debug o l'esecuzione della pipeline su set di dati di piccole dimensioni. Ad esempio, l'esecuzione locale rimuove la dipendenza dal servizio Dataflow remoto e dal progetto Google Cloud associato.

Quando utilizzi l'esecuzione locale, devi eseguire la pipeline con set di dati sufficientemente piccoli da rientrare nella memoria locale. Puoi creare un piccolo set di dati in memoria utilizzando una trasformazione Create oppure puoi usare una trasformazione Read per lavorare con file locali o da remoto di piccole dimensioni. L'esecuzione locale offre un modo rapido e semplice per eseguire test e debug con meno dipendenze esterne, ma è limitata dalla memoria disponibile nell'ambiente locale.

Il codice di esempio riportato di seguito mostra come creare una pipeline che viene eseguita nel tuo ambiente locale.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Dopo aver creato la pipeline, eseguila.

Creazione di opzioni di pipeline personalizzate

Puoi aggiungere le tue opzioni personalizzate oltre agli standard PipelineOptions. La riga di comando di Apache Beam può anche analizzare le opzioni personalizzate utilizzando gli argomenti della riga di comando specificati nello stesso formato.

Java

Per aggiungere le tue opzioni, definisci un'interfaccia con metodi getter e setter per ogni opzione, come nell'esempio seguente:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Per aggiungere le tue opzioni, utilizza il metodo add_argument() (che si comporta esattamente come il modulo relativo all'argomento di Python standard), come nell'esempio seguente:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Per aggiungere le tue opzioni, utilizza il pacchetto di segnalazioni Go come mostrato nell'esempio seguente:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Puoi anche specificare una descrizione da visualizzare quando un utente passa --help come argomento della riga di comando e un valore predefinito.

Java

Puoi impostare la descrizione e il valore predefinito utilizzando le annotazioni nel seguente modo:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Ti consigliamo di registrare l'interfaccia con PipelineOptionsFactory e di passare l'interfaccia durante la creazione dell'oggetto PipelineOptions. Quando registri la tua interfaccia con PipelineOptionsFactory, --help può trovare l'interfaccia delle opzioni personalizzate e aggiungerla al risultato del comando --help. PipelineOptionsFactory convalida la compatibilità delle tue opzioni personalizzate con tutte le altre opzioni registrate.

Il seguente codice di esempio mostra come registrare l'interfaccia delle opzioni personalizzate con PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Ora la tua pipeline può accettare --myCustomOption=value come argomento della riga di comando.

Python

Puoi impostare la descrizione e il valore predefinito nel seguente modo:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Puoi impostare la descrizione e il valore predefinito nel seguente modo:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)