Klassische Vorlagen erstellen

Dataflow-Vorlagen verwenden Laufzeitparameter zum Akzeptieren von Werten, die nur bei der Pipelineausführung verfügbar sind. Zur Anpassung der Ausführung einer als Vorlage verwendeten Pipeline können Sie diese Parameter an Funktionen übergeben, die in der Pipeline ausgeführt werden (z. B. DoFn).

Für das Erstellen einer Vorlage aus Ihrer Apache Beam-Pipeline müssen Sie den Pipelinecode so ändern, dass dieser Laufzeitparameter unterstützt:

  • Verwenden Sie ValueProvider für alle Pipelineoptionen in klassischen Vorlagen, die Sie zur Laufzeit festlegen oder verwenden möchten.
  • Rufen Sie dort, wo Sie Parameter für Ihre Pipeline festlegen möchten, E/A-Methoden auf, die Laufzeitparameter akzeptieren
  • Verwenden Sie DoFn-Objekte, die Laufzeitparameter akzeptieren.

Erstellen Sie dann Ihre Vorlage und stellen Sie sie bereit.

Laufzeitparameter und die ValueProvider-Schnittstelle

Die Schnittstelle ValueProvider ermöglicht das Akzeptieren von Laufzeitparametern durch Pipelines. Apache Beam bietet drei Typen von ValueProvider-Objekten.

Name Beschreibung
RuntimeValueProvider

RuntimeValueProvider ist der Standardtyp ValueProvider. Durch RuntimeValueProvider akzeptiert Ihre Pipeline einen Wert, der nur während der Ausführung der Pipeline verfügbar ist. Der Wert ist nicht bei der Pipelineerstellung verfügbar. Sie können ihn daher nicht zum Ändern der Workflowgrafik Ihrer Pipeline verwenden.

Mit isAccessible() können Sie prüfen, ob der Wert von ValueProvider verfügbar ist. Wenn Sie get() vor der Pipelineausführung aufrufen, gibt Apache Beam einen Fehler zurück:
Value only available at runtime, but accessed from a non-runtime context.

Verwenden Sie RuntimeValueProvider, wenn Sie den Wert im Voraus nicht kennen.

StaticValueProvider

Mit StaticValueProvider können Sie einen statischen Wert für Ihre Pipeline angeben. Der Wert ist bei der Erstellung der Pipeline verfügbar. Sie können daher mit diesem Wert die Workflowgrafik Ihrer Pipeline ändern.

Verwenden Sie StaticValueProvider, wenn Sie den Wert vorab kennen. Beispiele finden Sie im Abschnitt "StaticValueProvider".

NestedValueProvider

Mit NestedValueProvider können Sie einen Wert aus einem anderen ValueProvider-Objekt berechnen. NestedValueProvider umschließt ValueProvider, und der Typ des zusammengefassten ValueProvider bestimmt, ob der Wert während der Pipelinekonstruktion zugänglich ist.

Verwenden Sie NestedValueProvider, wenn Sie den Wert zur Berechnung eines anderen Werts zur Laufzeit nutzen möchten. Beispiele finden Sie im Abschnitt "NestedValueProvider".

Der Dataflow-Runner unterstützt keine ValueProvider-Optionen für Pub/Sub-Themen und Abo-Parameter. Wenn Sie in Ihren Laufzeitparameter Pub/Sub-Optionen benötigen, wechseln Sie zu Flex-Vorlagen.

Code für die Verwendung von Laufzeitparametern ändern

In diesem Abschnitt wird die Verwendung von ValueProvider, StaticValueProvider und NestedValueProvider erläutert.

ValueProvider in Pipelineoptionen

Verwenden Sie ValueProvider für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten.

Das folgende WordCount-Code-Snippet unterstützt beispielsweise keine Laufzeitparameter. Mit dem Code wird eine Eingabedateioption hinzugefügt, eine Pipeline erstellt und es werden Zeilen aus der Eingabedatei gelesen.

Java: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

Wenn Sie eine Unterstützung für Laufzeitparameter hinzufügen möchten, ändern Sie die Eingabedateioption zur Verwendung von ValueProvider.

Java: SDK 2.x

Verwenden Sie ValueProvider<String> anstelle von String für den Typ der Eingabedateioption.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Ersetzen Sie add_argument durch add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java: SDK 1.x

ValueProvider in Funktionen

Zur Nutzung von Laufzeitparameterwerten in Ihren eigenen Funktionen müssen Sie für die Funktionen die Verwendung von ValueProvider-Parametern festlegen.

Das folgende Beispiel enthält eine ValueProvider-Ganzzahloption und eine einfache Funktion, die eine Ganzzahl hinzufügt. Die Funktion hängt von der ValueProvider-Ganzzahl ab. Bei der Ausführung wendet die Pipeline MySumFn auf jede Ganzzahl in einer PCollection an, die [1, 2, 3] enthält. Wenn der Laufzeitwert 10 ist, enthält die resultierende PCollection den Wert [11, 12, 13].

Java: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java: SDK 1.x

StaticValueProvider

StaticValueProvider ermöglicht die Bereitstellung eines statischen Werts für Ihre Pipeline.

In diesem Beispiel wird MySumFn verwendet. Dabei handelt es sich um ein DoFn-Objekt, das einen ValueProvider<Integer> annimmt. Wenn Sie den Wert des Parameters im Voraus kennen, können Sie StaticValueProvider verwenden, um Ihren statischen Wert als ValueProvider anzugeben.

Java: SDK 2.x

Dieser Code ruft den Wert zur Pipelinelaufzeit ab:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Stattdessen können Sie StaticValueProvider mit einem statischen Wert verwenden:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Dieser Code ruft den Wert zur Pipelinelaufzeit ab:

  beam.ParDo(MySumFn(user_options.templated_int))

Stattdessen können Sie StaticValueProvider mit einem statischen Wert verwenden:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java: SDK 1.x

Sie können StaticValueProvider auch verwenden, wenn Sie ein E/A-Modul implementieren, das sowohl reguläre Parameter als auch Laufzeitparameter unterstützt. StaticValueProvider reduziert die Codeduplizierung durch die Implementierung zweier ähnlicher Methoden.

Java: SDK 2.x

Der Quellcode in diesem Beispiel stammt aus TextIO.java auf GitHub von Apache Beam.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In diesem Beispiel gibt es einen einzelnen Konstruktor, der sowohl ein string- als auch ein ValueProvider-Argument akzeptiert. Wenn das Argument ein string ist, wird es in einen StaticValueProvider umgewandelt.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java: SDK 1.x

NestedValueProvider

Zur Berechnung eines Werts aus einem anderen ValueProvider-Objekt verwenden Sie NestedValueProvider.

NestedValueProvider nimmt einen ValueProvider und einen SerializableFunction-Übersetzer als Eingabe. Wenn Sie .get() für einen NestedValueProvider aufrufen, erstellt der Übersetzer einen neuen Wert basierend auf dem ValueProvider-Wert. Bei dieser Übersetzung können Sie einen ValueProvider-Wert verwenden, um den gewünschten endgültigen Wert zu erstellen.

Im folgenden Beispiel stellt der Nutzer den Dateinamen file.txt bereit. Durch die Transformation wird der Pfad gs://directory_name/ dem Dateinamen vorangestellt. Der Aufruf von .get() gibt gs://directory_name/file.txt zurück.

Java: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Java: SDK 1.x

Metadaten

Sie können Ihre Vorlagen um zusätzliche Metadaten erweitern, damit benutzerdefinierte Parameter bei der Ausführung der Vorlage überprüft werden. Wenn Sie Metadaten für Ihre Vorlage erstellen möchten, müssen Sie so vorgehen:

  1. Erstellen Sie mit den Parametern aus der Tabelle unten eine JSON-formatierte Datei mit dem Namen <template-name>_metadata.
  2. Speichern Sie die JSON-formatierte Datei in Cloud Storage im selben Ordner wie die Vorlage.

Metadatenparameter

Parameterschlüssel Erforderlich Beschreibung des Werts
name Ja Der Name Ihrer Vorlage.
description Nein Ein kurzer Textabschnitt, der die Vorlagen beschreibt.
parameters Nein. Die Standardeinstellung ist ein leeres Array. Ein Array von zusätzlichen Parametern, die von der Vorlage verwendet werden.
name Ja Der Name des Parameters, der in Ihrer Vorlage verwendet wird.
label Ja Ein aussagekräftiges Etikett, mit dem der Parameter in der Benutzeroberfläche gekennzeichnet wird.
helpText Ja Ein kurzer Textabschnitt, der den Parameter beschreibt.
isOptional Nein. Die Standardeinstellung ist "false". false, wenn der Parameter erforderlich ist, und true, wenn der Parameter optional ist. Wenn Sie diesen Parameterschlüssel nicht für Ihre Metadaten angeben, werden die Metadaten zu einem erforderlichen Parameter.
regexes Nein. Die Standardeinstellung ist ein leeres Array. Ein Array von regulären POSIX-egrep-Ausdrücken in Stringform, die verwendet werden, um den Wert des Parameters zu prüfen. Beispiel: ["^[a-zA-Z][a-zA-Z0-9]+"] ist ein einzelner regulärer Ausdruck, der bestätigt, dass der Wert mit einem Buchstaben beginnt und dann ein oder mehrere Zeichen enthält.

Beispiel-Metadatendatei

Java

Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Sie können Metadatendateien für die von Google bereitgestellten Vorlagen aus dem Vorlagenverzeichnis von Dataflow herunterladen.

Pipeline-E/A und Laufzeitparameter

Java: SDK 2.x

Einige E/A-Connectors enthalten Methoden, die ValueProvider-Objekte akzeptieren. Informationen zur Unterstützung für einen bestimmten Connector und für eine bestimmte Methode finden Sie in der API-Referenzdokumentation für den jeweiligen E/A-Connector. Unterstützte Methoden haben eine Überlastung mit einem ValueProvider. Hat eine Methode keine Überlastung, unterstützt sie keine Laufzeitparameter. Für die folgenden E/A-Connectors ist zumindest eine teilweise ValueProvider-Unterstützung verfügbar:

  • Dateibasierte E/A-Vorgänge: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (erfordert SDK 2.3.0 oder höher)
  • PubSubIO
  • SpannerIO

Python

Einige E/A-Connectors enthalten Methoden, die ValueProvider-Objekte akzeptieren. Informationen zur Unterstützung von E/A-Connectors und ihrer Methoden finden Sie in der API-Referenzdokumentation für den jeweiligen Connector. Die folgenden E/A-Connectors akzeptieren Laufzeitparameter:

  • Dateibasierte E/A-Vorgänge: textio, avroio, tfrecordio

Java: SDK 1.x

Vorlagen erstellen und zum Staging führen

Wenn Sie Ihre Pipeline geschrieben haben, müssen Sie Ihre Vorlagendatei erstellen und bereitstellen, d. h., ein Staging ausführen.

Die folgenden Beispiele zeigen, wie Sie eine Vorlagendatei bereitstellen:

Java: SDK 2.x

Mit diesem Maven-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --templateLocation erstellt und bereitgestellt.

Hinweis: Wenn Sie das Apache Beam SDK für Java 2.15.0 oder höher verwenden, müssen Sie auch --region angeben.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
                  --region=REGION"
    

Prüfen Sie, ob der Pfad templateLocation korrekt ist. Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • TEMPLATE_NAME: Der Name Ihrer Vorlage
  • com.example.myclass: Ihre Java-Klasse

Python

Mit diesem Python-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --template_location erstellt und bereitgestellt.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME

Prüfen Sie, ob der Pfad template_location korrekt ist. Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • TEMPLATE_NAME: Der Name Ihrer Vorlage
  • examples.mymodule: Ihr Python-Modul

Java: SDK 1.x

Der nächste Schritt nach dem Erstellen und Staging der Vorlage ist dann das Ausführen dieser Vorlage.