Klassische Dataflow-Vorlagen erstellen

In diesem Dokument erfahren Sie, wie Sie eine benutzerdefinierte klassische Vorlage aus Ihrem Dataflow-Pipelinecode erstellen. Klassische Vorlagen verpacken vorhandene Dataflow-Pipelines, um wiederverwendbare Vorlagen zu erstellen, die Sie für jeden Job anpassen können, indem Sie bestimmte Pipelineparameter ändern. Anstatt die Vorlage zu schreiben, verwenden Sie einen Befehl, um die Vorlage aus einer vorhandenen Pipeline zu generieren.

Es folgt ein kurzer Überblick über den Prozess. Dieser Vorgang wird in den folgenden Abschnitten erläutert.

  1. Verwenden Sie in Ihrem Pipelinecode die Schnittstelle ValueProvider für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten. Verwenden Sie DoFn-Objekte, die Laufzeitparameter akzeptieren.
  2. Erweitern Sie Ihre Vorlage um zusätzliche Metadaten, damit benutzerdefinierte Parameter bei der Ausführung der klassischen Vorlage validiert werden. Beispiele für solche Metadaten sind der Name Ihrer benutzerdefinierten klassischen Vorlage und optionale Parameter.
  3. Prüfen Sie, ob die Pipeline-E/A-Connectors ValueProvider-Objekte unterstützen, und nehmen Sie bei Bedarf Änderungen vor.
  4. Erstellen Sie die benutzerdefinierte klassische Vorlage und stellen Sie sie bereit.
  5. Führen Sie die benutzerdefinierte klassische Vorlage aus.

Weitere Informationen zu den verschiedenen Arten von Dataflow-Vorlagen, ihren Vorteilen und zur Auswahl einer klassischen Vorlage finden Sie unter Dataflow-Vorlagen.

Erforderliche Berechtigungen für die Ausführung einer klassischen Vorlage

Die Berechtigungen, die Sie zum Ausführen der klassischen Dataflow-Vorlage benötigen, hängen davon ab, wo Sie die Vorlage ausführen und ob sich die Quelle und Senke für die Pipeline in einem anderen Projekt befinden.

Weitere Informationen zum lokalen Ausführen von Dataflow-Pipelines oder zum Verwenden von Google Cloud finden Sie unter Sicherheit und Berechtigungen in Dataflow.

Eine Liste der Dataflow-Rollen und -Berechtigungen finden Sie unter Dataflow-Zugriffssteuerung.

Beschränkungen

  • Die folgende Pipelineoption wird mit klassischen Vorlagen nicht unterstützt. Wenn Sie die Anzahl der Worker-Harness-Threads steuern müssen, verwenden Sie Flex-Vorlagen.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Der Dataflow-Runner unterstützt die ValueProvider-Optionen für Pub/Sub-Themen und Abo-Parameter nicht. Wenn Sie Pub/Sub-Optionen in Ihren Laufzeitparametern benötigen, verwenden Sie flexible Vorlagen.

Informationen zu Laufzeitparametern und der ValueProvider-Schnittstelle

Die Schnittstelle ValueProvider ermöglicht das Akzeptieren von Laufzeitparametern durch Pipelines. Apache Beam bietet drei Typen von ValueProvider-Objekten.

Name Beschreibung
RuntimeValueProvider

RuntimeValueProvider ist der Standardtyp ValueProvider. Durch RuntimeValueProvider akzeptiert Ihre Pipeline einen Wert, der nur während der Ausführung der Pipeline verfügbar ist. Der Wert ist nicht bei der Pipelineerstellung verfügbar. Sie können ihn daher nicht zum Ändern der Workflowgrafik Ihrer Pipeline verwenden.

Mit isAccessible() können Sie prüfen, ob der Wert von ValueProvider verfügbar ist. Wenn Sie get() vor der Pipelineausführung aufrufen, gibt Apache Beam einen Fehler zurück:
Value only available at runtime, but accessed from a non-runtime context.

Verwenden Sie RuntimeValueProvider, wenn Sie den Wert im Voraus nicht kennen. Um die Parameterwerte zur Laufzeit zu ändern, legen Sie keine Werte für die Parameter in der Vorlage fest. Legen Sie die Werte für die Parameter fest, wenn Sie Jobs aus der Vorlage erstellen.

StaticValueProvider

Mit StaticValueProvider können Sie einen statischen Wert für Ihre Pipeline angeben. Der Wert ist bei der Erstellung der Pipeline verfügbar. Sie können daher mit diesem Wert die Workflowgrafik Ihrer Pipeline ändern.

Verwenden Sie StaticValueProvider, wenn Sie den Wert vorab kennen. Beispiele finden Sie im Abschnitt "StaticValueProvider".

NestedValueProvider

Mit NestedValueProvider können Sie einen Wert aus einem anderen ValueProvider-Objekt berechnen. NestedValueProvider umschließt ValueProvider, und der Typ des zusammengefassten ValueProvider bestimmt, ob der Wert während der Pipelinekonstruktion zugänglich ist.

Verwenden Sie NestedValueProvider, wenn Sie den Wert zur Berechnung eines anderen Werts zur Laufzeit nutzen möchten. Beispiele finden Sie im Abschnitt "NestedValueProvider".

Laufzeitparameter im Pipelinecode verwenden

In diesem Abschnitt wird die Verwendung von ValueProvider, StaticValueProvider und NestedValueProvider erläutert.

ValueProvider in Ihren Pipelineoptionen verwenden

Verwenden Sie ValueProvider für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten.

Das folgende WordCount-Code-Snippet unterstützt beispielsweise keine Laufzeitparameter. Mit dem Code wird eine Eingabedateioption hinzugefügt, eine Pipeline erstellt und es werden Zeilen aus der Eingabedatei gelesen.

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Wenn Sie eine Unterstützung für Laufzeitparameter hinzufügen möchten, ändern Sie die Eingabedateioption zur Verwendung von ValueProvider.

Java

Verwenden Sie ValueProvider<String> anstelle von String für den Typ der Eingabedateioption.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Ersetzen Sie add_argument durch add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

ValueProvider in Funktionen verwenden

Zur Nutzung von Laufzeitparameterwerten in Ihren eigenen Funktionen müssen Sie für die Funktionen die Verwendung von ValueProvider-Parametern festlegen.

Das folgende Beispiel enthält eine ValueProvider-Ganzzahloption und eine einfache Funktion, die eine Ganzzahl hinzufügt. Die Funktion hängt von der ValueProvider-Ganzzahl ab. Bei der Ausführung wendet die Pipeline MySumFn auf jede Ganzzahl in einer PCollection an, die [1, 2, 3] enthält. Wenn der Laufzeitwert 10 ist, enthält die resultierende PCollection den Wert [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

StaticValueProvider verwenden

StaticValueProvider ermöglicht die Bereitstellung eines statischen Werts für Ihre Pipeline.

In diesem Beispiel wird MySumFn verwendet. Dabei handelt es sich um ein DoFn-Objekt, das einen ValueProvider<Integer> annimmt. Wenn Sie den Wert des Parameters im Voraus kennen, können Sie StaticValueProvider verwenden, um Ihren statischen Wert als ValueProvider anzugeben.

Java

Dieser Code ruft den Wert zur Pipelinelaufzeit ab:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Stattdessen können Sie StaticValueProvider mit einem statischen Wert verwenden:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Dieser Code ruft den Wert zur Pipelinelaufzeit ab:

  beam.ParDo(MySumFn(user_options.templated_int))

Stattdessen können Sie StaticValueProvider mit einem statischen Wert verwenden:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Sie können StaticValueProvider auch verwenden, wenn Sie ein E/A-Modul implementieren, das sowohl reguläre Parameter als auch Laufzeitparameter unterstützt. StaticValueProvider reduziert die Codeduplizierung durch die Implementierung zweier ähnlicher Methoden.

Java

Der Quellcode in diesem Beispiel stammt aus TextIO.java auf GitHub von Apache Beam.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

In diesem Beispiel gibt es einen einzelnen Konstruktor, der sowohl ein string- als auch ein ValueProvider-Argument akzeptiert. Wenn das Argument ein string ist, wird es in einen StaticValueProvider umgewandelt.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

NestedStaticValueProvider verwenden

Zur Berechnung eines Werts aus einem anderen ValueProvider-Objekt verwenden Sie NestedValueProvider.

NestedValueProvider nimmt einen ValueProvider und einen SerializableFunction-Übersetzer als Eingabe. Wenn Sie .get() für einen NestedValueProvider aufrufen, erstellt der Übersetzer einen neuen Wert basierend auf dem ValueProvider-Wert. Bei dieser Übersetzung können Sie einen ValueProvider-Wert verwenden, um den gewünschten endgültigen Wert zu erstellen:

Im folgenden Beispiel stellt der Nutzer den Dateinamen file.txt bereit. Durch die Transformation wird der Pfad gs://directory_name/ dem Dateinamen vorangestellt. Der Aufruf von .get() gibt gs://directory_name/file.txt zurück.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Metadaten in Pipeline-Code verwenden

Sie können Ihre Vorlage um zusätzliche Metadaten erweitern, damit benutzerdefinierte Parameter bei der Ausführung der Vorlage überprüft werden. Wenn Sie Metadaten für Ihre Vorlage erstellen möchten, gehen Sie so vor:

  1. Erstellen Sie eine JSON-formatierte Datei mit dem Namen TEMPLATE_NAME_metadata. Verwenden Sie dazu die Parameter in Metadatenparametern und das Format in Beispiel-Metadatendatei. Ersetzen Sie TEMPLATE_NAME durch den Namen Ihrer Vorlage.

    Achten Sie darauf, dass die Metadatendatei keine Dateiendung hat. Wenn Ihr Vorlagenname beispielsweise myTemplate lautet, muss die Metadatendatei myTemplate_metadata lauten.

  2. Speichern Sie die Metadatendatei in Cloud Storage im selben Ordner wie die Vorlage.

Metadatenparameter

Parameterschlüssel Erforderlich Beschreibung des Werts
name Ja Der Name Ihrer Vorlage.
description Nein Ein kurzer Textabschnitt, der die Vorlage beschreibt.
streaming Nein Wenn true, unterstützt diese Vorlage das Streaming. Der Standardwert ist false.
supportsAtLeastOnce Nein Wenn true, unterstützt diese Vorlage die mindestens einmalige Verarbeitung. Der Standardwert ist false. Setzen Sie diesen Parameter auf true, wenn die Vorlage für den "Mindestens einmal"-Streamingmodus ausgelegt ist.
supportsExactlyOnce Nein Bei true unterstützt diese Vorlage die genau einmalige Verarbeitung. Der Standardwert ist true.
defaultStreamingMode Nein Den Standard-Streamingmodus für Vorlagen, die sowohl den "Mindestens einmal"-Modus als auch den "Genau einmal"-Modus unterstützen. Verwenden Sie einen der folgenden Werte: "AT_LEAST_ONCE", "EXACTLY_ONCE". Wenn keine Angabe gemacht wird, wird der Standard-Streamingmodus "genau einmal" verwendet.
parameters Nein Ein Array von zusätzlichen Parametern, die die Vorlage verwendet. Ein leeres Array wird standardmäßig verwendet.
name Ja Der Name des Parameters, der in Ihrer Vorlage verwendet wird.
label Ja Ein für Menschen lesbarer String, der in der Google Cloud Console verwendet wird, um den Parameter mit einem Label zu versehen.
helpText Ja Ein kurzer Textabschnitt, der den Parameter beschreibt.
isOptional Nein false, wenn der Parameter erforderlich ist, und true, wenn der Parameter optional ist. Sofern kein Wert festgelegt ist, wird für isOptional standardmäßig der Wert false verwendet. Wenn Sie diesen Parameterschlüssel nicht für Ihre Metadaten angeben, werden die Metadaten zu einem erforderlichen Parameter.
regexes Nein Ein Array von regulären POSIX-egrep-Ausdrücken in Stringform, die verwendet werden, um den Wert des Parameters zu validieren. Beispiel: ["^[a-zA-Z][a-zA-Z0-9]+"] ist ein einzelner regulärer Ausdruck, der validiert, dass der Wert mit einem Buchstaben beginnt und dann ein oder mehrere Zeichen enthält. Ein leerer Array wird standardmäßig verwendet.

Beispiel-Metadatendatei

Java

Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Sie können Metadatendateien für die von Google bereitgestellten Vorlagen aus dem Vorlagenverzeichnis von Dataflow herunterladen.

Unterstützte Pipeline-E/A-Connectors und ValueProvider

Java

Einige E/A-Connectors enthalten Methoden, die ValueProvider-Objekte akzeptieren. Informationen zur Unterstützung für einen bestimmten Connector und für eine bestimmte Methode finden Sie in der API-Referenzdokumentation für den jeweiligen E/A-Connector. Unterstützte Methoden haben eine Überlastung mit einem ValueProvider. Hat eine Methode keine Überlastung, unterstützt sie keine Laufzeitparameter. Für die folgenden E/A-Connectors ist zumindest eine teilweise ValueProvider-Unterstützung verfügbar:

  • Dateibasierte E/A-Vorgänge: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (erfordert SDK 2.3.0 oder höher)
  • PubSubIO
  • SpannerIO

Python

Einige E/A-Connectors enthalten Methoden, die ValueProvider-Objekte akzeptieren. Informationen zur Unterstützung von E/A-Connectors und ihrer Methoden finden Sie in der API-Referenzdokumentation für den jeweiligen Connector. Die folgenden E/A-Connectors akzeptieren Laufzeitparameter:

  • Dateibasierte E/A-Vorgänge: textio, avroio, tfrecordio

Klassische Vorlage erstellen und bereitstellen

Wenn Sie Ihre Pipeline geschrieben haben, müssen Sie Ihre Vorlagendatei erstellen und bereitstellen, d. h., ein Staging ausführen. Wenn Sie eine Vorlage erstellen und ein Staging ausführen, enthält der Staging-Pfad zusätzliche Dateien, die zur Ausführung Ihrer Vorlage erforderlich sind. Wenn Sie den Staging-Speicherort löschen, kann die Vorlage nicht ausgeführt werden. Der Dataflow-Job wird nicht unmittelbar nach dem Staging der Vorlage ausgeführt. Zum Ausführen eines benutzerdefinierten vorlagenbasierten Dataflow-Jobs können Sie die Google Cloud Console, die Dataflow REST API oder die gcloud CLI verwenden.

Das folgende Beispiel zeigt das Staging einer Vorlagendatei:

Java

Mit diesem Maven-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --templateLocation erstellt und bereitgestellt.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Prüfen Sie, ob der Pfad templateLocation korrekt ist. Dabei gilt:

  • com.example.myclass: Ihre Java-Klasse
  • PROJECT_ID: Ihre Projekt-ID
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • TEMPLATE_NAME: Der Name Ihrer Vorlage
  • REGION: die Region, in der der Dataflow-Job bereitgestellt werden soll

Python

Mit diesem Python-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --template_location erstellt und bereitgestellt.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Prüfen Sie, ob der Pfad template_location korrekt ist. Dabei gilt:

  • examples.mymodule: Ihr Python-Modul
  • PROJECT_ID: Ihre Projekt-ID
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • TEMPLATE_NAME: Der Name Ihrer Vorlage
  • REGION: die Region, in der der Dataflow-Job bereitgestellt werden soll

Der nächste Schritt nach dem Erstellen und Staging der Vorlage ist dann das Ausführen dieser Vorlage.