Creazione di modelli Dataflow classici

In questo documento imparerai a creare un modello classico personalizzato dal codice della pipeline Dataflow. I modelli classici creano pacchetti di pipeline Dataflow esistenti per creare modelli riutilizzabili che puoi personalizzare per ogni job modificando parametri specifici della pipeline. Anziché scrivere il modello, utilizza un comando per generarlo da una pipeline esistente.

Di seguito è riportata una breve panoramica della procedura. I dettagli di questa procedura sono forniti nelle sezioni successive.

  1. Nel codice della pipeline, utilizza l'interfaccia ValueProvider per tutte le opzioni della pipeline che vuoi impostare o utilizzare durante il runtime. Utilizza oggetti DoFn che accettano parametri di runtime.
  2. Estendi il modello con metadati aggiuntivi in modo che i parametri personalizzati vengano convalidati quando viene eseguito il modello classico. Alcuni esempi di metadati di questo tipo sono il nome del modello classico personalizzato e i parametri facoltativi.
  3. Controlla se i connettori I/O della pipeline supportano gli oggetti ValueProvider e apporta le modifiche necessarie.
  4. Creare e gestire temporaneamente il modello classico personalizzato.
  5. Esegui il modello classico personalizzato.

Per saperne di più sui diversi tipi di modelli Dataflow, sui relativi vantaggi e su quando scegliere un modello classico, consulta Modelli Dataflow.

Autorizzazioni richieste per eseguire un modello classico

Le autorizzazioni necessarie per eseguire il modello classico di Dataflow dipendono da dove viene eseguito il modello e da se l'origine e il sink per la pipeline si trovano in un altro progetto.

Per ulteriori informazioni sull'esecuzione delle pipeline Dataflow in locale o utilizzando Google Cloud, consulta Sicurezza e autorizzazioni di Dataflow.

Per un elenco dei ruoli e delle autorizzazioni di Dataflow, vedi Controllo dell'accesso di Dataflow.

Limitazioni

  • La seguente opzione pipeline non è supportata con i modelli classici. Se devi controllare il numero di thread di cablaggio worker, utilizza Modelli flessibili.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • L'esecutore di Dataflow non supporta le opzioni ValueProvider per gli argomenti e i parametri di sottoscrizione di Pub/Sub. Se hai bisogno di opzioni Pub/Sub nei parametri di runtime, utilizza i modelli flessibili.

Informazioni sui parametri di runtime e sull'interfaccia ValueProvider

L'interfaccia ValueProvider consente alle pipeline di accettare parametri di runtime. Apache Beam fornisce tre tipi di oggetti ValueProvider.

Nome Descrizione
RuntimeValueProvider

RuntimeValueProvider è il tipo predefinito di ValueProvider. RuntimeValueProvider consente alla pipeline di accettare un valore disponibile solo durante l'esecuzione della pipeline. Il valore non è disponibile durante la creazione della pipeline, quindi non può essere utilizzato per modificare il grafico del flusso di lavoro della pipeline.

Puoi utilizzare isAccessible() per verificare se il valore di ValueProvider è disponibile. Se chiami get() prima dell'esecuzione della pipeline, Apache Beam restituisce un errore:
Value only available at runtime, but accessed from a non-runtime context.

Utilizza RuntimeValueProvider quando non conosci il valore in anticipo. Per modificare i valori dei parametri in fase di runtime, non impostare valori per i parametri nel modello. Imposta i valori dei parametri quando crei job dal modello.

StaticValueProvider

StaticValueProvider consente di fornire un valore statico alla pipeline. Il valore è disponibile durante la creazione della pipeline, quindi puoi utilizzarlo per modificare il grafico del flusso di lavoro della pipeline.

Utilizza StaticValueProvider quando conosci il valore in anticipo. Consulta la sezione StaticValueProvider per alcuni esempi.

NestedValueProvider

NestedValueProvider consente di calcolare un valore da un altro oggetto ValueProvider. NestedValueProvider esegue il wrapping di un ValueProvider e il tipo di wrapping di ValueProvider determina se il valore è accessibile durante la creazione della pipeline.

Utilizza NestedValueProvider quando vuoi usare il valore per calcolare un altro valore in fase di runtime. Consulta la sezione NestedValueProvider per trovare esempi.

Utilizza i parametri di runtime nel codice della pipeline

Questa sezione illustra come utilizzare ValueProvider, StaticValueProvider e NestedValueProvider.

Utilizza ValueProvider nelle opzioni della pipeline

Utilizza ValueProvider per tutte le opzioni della pipeline che vuoi impostare o utilizzare in fase di runtime.

Ad esempio, il seguente snippet di codice WordCount non supporta i parametri di runtime. Il codice aggiunge un'opzione del file di input, crea una pipeline e legge le righe dal file di input:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Per aggiungere il supporto dei parametri di runtime, modifica l'opzione del file di input in modo da utilizzare ValueProvider.

Java

Usa ValueProvider<String> anziché String per il tipo di opzione del file di input.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Sostituisci add_argument con add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Utilizza ValueProvider nelle funzioni

Per utilizzare i valori dei parametri di runtime nelle tue funzioni, aggiorna le funzioni in modo che utilizzino i parametri ValueProvider.

L'esempio seguente contiene un'opzione con numero intero ValueProvider e una funzione semplice che aggiunge un numero intero. La funzione dipende dal numero intero ValueProvider. Durante l'esecuzione, la pipeline applica MySumFn a ogni numero intero in un elemento PCollection che contiene [1, 2, 3]. Se il valore di runtime è 10, il valore PCollection risultante contiene [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Utilizza StaticValueProvider

Per fornire un valore statico alla pipeline, utilizza StaticValueProvider.

In questo esempio viene utilizzato MySumFn, che corrisponde a un DoFn che richiede un ValueProvider<Integer>. Se conosci in anticipo il valore del parametro, puoi utilizzare StaticValueProvider per specificare il valore statico come ValueProvider.

Java

Questo codice recupera il valore al momento del runtime della pipeline:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Puoi invece utilizzare StaticValueProvider con un valore statico:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Questo codice recupera il valore al momento del runtime della pipeline:

  beam.ParDo(MySumFn(user_options.templated_int))

Puoi invece utilizzare StaticValueProvider con un valore statico:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Puoi anche utilizzare StaticValueProvider quando implementi un modulo di I/O che supporta sia parametri regolari che parametri di runtime. StaticValueProvider riduce la duplicazione del codice dall'implementazione di due metodi simili.

Java

Il codice sorgente per questo esempio proviene da TextIO.java su GitHub di Apache Beam.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In questo esempio, è presente un singolo costruttore che accetta sia un argomento string che un argomento ValueProvider. Se l'argomento è string, viene convertito in StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Utilizza NestedStaticValueProvider

Per calcolare un valore da un altro oggetto ValueProvider, utilizza NestedValueProvider.

NestedValueProvider richiede un traduttore ValueProvider e SerializableFunction come input. Quando chiami .get() su un NestedValueProvider, il traduttore crea un nuovo valore in base a quello ValueProvider. Questa traduzione consente di utilizzare un valore ValueProvider per creare il valore finale desiderato.

Nell'esempio seguente, l'utente fornisce il nome file file.txt. La trasformazione antepone il percorso gs://directory_name/ al nome file. La chiamata a .get() restituisce gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Utilizza i metadati nel codice della pipeline

Puoi estendere il modello con metadati aggiuntivi in modo che i parametri personalizzati vengano convalidati quando il modello viene eseguito. Se vuoi creare metadati per il tuo modello, segui questi passaggi:

  1. Crea un file in formato JSON denominato TEMPLATE_NAME_metadata utilizzando i parametri in Parametri metadati e il formato in File di metadati di esempio. Sostituisci TEMPLATE_NAME con il nome del modello.

    Assicurati che il file dei metadati non abbia un'estensione di nome file. Ad esempio, se il nome del modello è myTemplate, il file di metadati deve essere myTemplate_metadata.

  2. Archivia il file di metadati in Cloud Storage nella stessa cartella del modello.

Parametri dei metadati

Chiave parametro Obbligatorio Descrizione del valore
name Il nome del modello.
description No Un breve paragrafo di testo che descrive il modello.
streaming No Se true, questo modello supporta lo streaming. Il valore predefinito è false.
supportsAtLeastOnce No Se true, questo modello supporta l'elaborazione almeno una volta. Il valore predefinito è false. Imposta questo parametro su true se il modello è progettato per funzionare con la modalità di flusso at-least-once.
supportsExactlyOnce No Se true, questo modello supporta l'elaborazione "exactly-once". Il valore predefinito è true.
defaultStreamingMode No La modalità di streaming predefinita, per i modelli che supportano sia la modalità at-least-once che la modalità "exactly-once". Utilizza uno dei seguenti valori: "AT_LEAST_ONCE", "EXACTLY_ONCE". Se non viene specificato, la modalità di streaming predefinita è "exactly-once".
parameters No Un array di parametri aggiuntivi utilizzati dal modello. Per impostazione predefinita viene utilizzato un array vuoto.
name Il nome del parametro utilizzato nel modello.
label Una stringa leggibile utilizzata nella console Google Cloud per etichettare il parametro.
helpText Un breve paragrafo di testo che descrive il parametro.
isOptional No false se il parametro è obbligatorio e true se il parametro è facoltativo. A meno che non venga impostato un valore, il valore predefinito di isOptional è false. Se non includi questa chiave di parametro per i metadati, i metadati diventano un parametro obbligatorio.
regexes No Un array di espressioni regolari POSIX-egrep in forma di stringa utilizzato per convalidare il valore del parametro. Ad esempio, ["^[a-zA-Z][a-zA-Z0-9]+"] è una singola espressione regolare che verifica che il valore inizi con una lettera e poi abbia uno o più caratteri. Per impostazione predefinita viene utilizzato un array vuoto.

Esempio di file di metadati

Java

Il servizio Dataflow utilizza i seguenti metadati per convalidare i parametri personalizzati del modello WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Il servizio Dataflow utilizza i seguenti metadati per convalidare i parametri personalizzati del modello WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Puoi scaricare i file di metadati per i modelli forniti da Google dalla directory dei modelli di Dataflow.

Connettori I/O della pipeline supportati e ValueProvider

Java

Alcuni connettori I/O contengono metodi che accettano oggetti ValueProvider. Per determinare il supporto per un connettore e un metodo specifici, consulta la documentazione di riferimento dell'API per il connettore I/O. I metodi supportati hanno un sovraccarico con un valore ValueProvider. Se un metodo non presenta un sovraccarico, non supporta i parametri di runtime. I seguenti connettori I/O supportano almeno ValueProvider parzialmente:

  • IO basati su file: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (richiede l'SDK 2.3.0 o versioni successive)
  • PubSubIO
  • SpannerIO

Python

Alcuni connettori I/O contengono metodi che accettano oggetti ValueProvider. Per determinare il supporto per i connettori I/O e i relativi metodi, consulta la documentazione di riferimento delle API per il connettore. I seguenti connettori I/O accettano i parametri di runtime:

  • IO basati su file: textio, avroio, tfrecordio

Creare e gestire un modello classico

Dopo aver scritto la pipeline, devi creare e posizionare in un'area intermedia il file del modello. Quando crei e crei un'area temporanea, la posizione di gestione temporanea contiene file aggiuntivi necessari per eseguire il modello. Se elimini la posizione temporanea, il modello non verrà eseguito. Il job Dataflow non viene eseguito immediatamente dopo l'archiviazione graduale del modello. Per eseguire un job di Dataflow basato su modelli personalizzati, puoi utilizzare la console Google Cloud, l'API REST Dataflow o gcloud CLI.

Nell'esempio seguente viene illustrato come posizionare temporaneamente un file modello:

Java

Questo comando Maven crea e implementa un modello nella località di Cloud Storage specificata con --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Verifica che il percorso templateLocation sia corretto. Sostituisci quanto segue:

  • com.example.myclass: la tua classe Java
  • PROJECT_ID: il tuo ID progetto
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • TEMPLATE_NAME: il nome del modello
  • REGION: la regione in cui eseguire il deployment del job Dataflow

Python

Questo comando Python crea e memorizza un modello nella località di Cloud Storage specificata con --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verifica che il percorso template_location sia corretto. Sostituisci quanto segue:

  • examples.mymodule: il tuo modulo Python
  • PROJECT_ID: il tuo ID progetto
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • TEMPLATE_NAME: il nome del modello
  • REGION: la regione in cui eseguire il deployment del job Dataflow

Dopo aver creato e graduale il modello, il passaggio successivo è eseguirlo.