Dataflow-Vorlagen verwenden Laufzeitparameter zum Akzeptieren von Werten, die nur bei der Pipelineausführung verfügbar sind. Zur Anpassung der Ausführung einer als Vorlage verwendeten Pipeline können Sie diese Parameter an Funktionen übergeben, die in der Pipeline ausgeführt werden (z. B. DoFn
).
Für das Erstellen einer Vorlage aus Ihrer Apache Beam-Pipeline müssen Sie den Pipelinecode so ändern, dass dieser Laufzeitparameter unterstützt:
- Verwenden Sie
ValueProvider
für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten. - Rufen Sie dort, wo Sie Parameter für Ihre Pipeline festlegen möchten, E/A-Methoden auf, die Laufzeitparameter akzeptieren
- Verwenden Sie
DoFn
-Objekte, die Laufzeitparameter akzeptieren.
Erstellen Sie dann Ihre Vorlage und stellen Sie sie bereit.
Laufzeitparameter und die ValueProvider-Schnittstelle
Die Schnittstelle ValueProvider
ermöglicht das Akzeptieren von Laufzeitparametern durch Pipelines. Apache Beam bietet drei Typen von ValueProvider
-Objekten.
Name | Beschreibung |
---|---|
RuntimeValueProvider |
Mit Verwenden Sie |
StaticValueProvider |
Mit Verwenden Sie |
NestedValueProvider |
Mit Verwenden Sie Hinweis: Das Apache Beam SDK für Python unterstützt derzeit keinen |
Code für die Verwendung von Laufzeitparametern ändern
In diesem Abschnitt wird die Verwendung von ValueProvider
, StaticValueProvider
und NestedValueProvider
erläutert.
ValueProvider in Pipelineoptionen
Verwenden Sie ValueProvider
für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten.
Das folgende WordCount
-Code-Snippet unterstützt beispielsweise keine Laufzeitparameter. Mit dem Code wird eine Eingabedateioption hinzugefügt, eine Pipeline erstellt und es werden Zeilen aus der Eingabedatei gelesen.
Java: SDK 2.x
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
Wenn Sie eine Unterstützung für Laufzeitparameter hinzufügen möchten, ändern Sie die Eingabedateioption zur Verwendung von ValueProvider
.
Java: SDK 2.x
Verwenden Sie ValueProvider<String>
anstelle von String
für den Typ der Eingabedateioption.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Ersetzen Sie add_argument
durch add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Java: SDK 1.x
ValueProvider in Funktionen
Zur Nutzung von Laufzeitparameterwerten in Ihren eigenen Funktionen müssen Sie für die Funktionen die Verwendung von ValueProvider
-Parametern festlegen.
Das folgende Beispiel enthält eine ValueProvider
-Ganzzahloption und eine einfache Funktion, die eine Ganzzahl hinzufügt. Die Funktion hängt von der ValueProvider
-Ganzzahl ab. Bei der Ausführung wendet die Pipeline MySumFn
auf jede Ganzzahl in einer PCollection
an, die [1, 2, 3]
enthält. Wenn der Laufzeitwert 10 ist, enthält die resultierende PCollection
den Wert [11, 12, 13]
.
Java: SDK 2.x
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Java: SDK 1.x
StaticValueProvider
StaticValueProvider
ermöglicht die Bereitstellung eines statischen Werts für Ihre Pipeline.
In diesem Beispiel wird MySumFn
verwendet. Dabei handelt es sich um ein DoFn
-Objekt, das einen ValueProvider<Integer>
annimmt. Wenn Sie den Wert des Parameters im Voraus kennen, können Sie StaticValueProvider
verwenden, um Ihren statischen Wert als ValueProvider
anzugeben.
Java: SDK 2.x
Dieser Code ruft den Wert zur Pipelinelaufzeit ab:
.apply(ParDo.of(new MySumFn(options.getInt())))
Stattdessen können Sie StaticValueProvider
mit einem statischen Wert verwenden:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Dieser Code ruft den Wert zur Pipelinelaufzeit ab:
beam.ParDo(MySumFn(user_options.templated_int))
Stattdessen können Sie StaticValueProvider
mit einem statischen Wert verwenden:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Java: SDK 1.x
Sie können StaticValueProvider
auch verwenden, wenn Sie ein E/A-Modul implementieren, das sowohl reguläre Parameter als auch Laufzeitparameter unterstützt.
StaticValueProvider
reduziert die Codeduplizierung durch die Implementierung zweier ähnlicher Methoden.
Java: SDK 2.x
Der Quellcode in diesem Beispiel stammt aus TextIO.java auf GitHub von Apache Beam.
// Create a StaticValueProviderfrom a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
In diesem Beispiel gibt es einen einzelnen Konstruktor, der sowohl ein string
- als auch ein ValueProvider
-Argument akzeptiert. Wenn das Argument ein string
ist, wird es in einen StaticValueProvider
umgewandelt.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, basestring): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Java: SDK 1.x
NestedValueProvider
Hinweis: Das Apache Beam SDK für Python unterstützt derzeit keinen NestedValueProvider
.
Zur Berechnung eines Werts aus einem anderen ValueProvider
-Objekt verwenden Sie NestedValueProvider
.
NestedValueProvider
nimmt einen ValueProvider
und einen SerializableFunction
-Übersetzer als Eingabe. Wenn Sie .get()
für einen NestedValueProvider
aufrufen, erstellt der Übersetzer einen neuen Wert basierend auf dem ValueProvider
-Wert. Bei dieser Übersetzung können Sie einen ValueProvider
-Wert verwenden, um den gewünschten endgültigen Wert zu erstellen:
- Beispiel 1: Der Nutzer stellt den Dateinamen
file.txt
bereit. Durch die Transformation wird der Dateipfadgs://directory_name/
dem Dateinamen vorangestellt. Der Aufruf von.get()
gibtgs://directory_name/file.txt
zurück. - Beispiel 2: Der Nutzer stellt einen Teilstring für eine BigQuery-Abfrage bereit, z. B. ein bestimmtes Datum. Die Transformation verwendet den Teilstring, um die vollständige Abfrage zu erstellen. Der Aufruf von
.get()
gibt die vollständige Abfrage zurück.
Hinweis: NestedValueProvider
akzeptiert nur eine Werteingabe. Sie können NestedValueProvider
nicht verwenden, um zwei verschiedene Werte zu kombinieren.
Der folgende Code verwendet NestedValueProvider
, um das erste Beispiel zu implementieren: Der Nutzer stellt einen Dateinamen bereit und die Transformation stellt den Dateipfad dem Dateinamen voran.
Java: SDK 2.x
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Python
Das Apache Beam SDK für Python unterstützt NestedValueProvider
derzeit nicht.
Java: SDK 1.x
Metadaten
Sie können Ihre Vorlagen um zusätzliche Metadaten erweitern, damit benutzerdefinierte Parameter bei der Ausführung der Vorlage überprüft werden. Wenn Sie Metadaten für Ihre Vorlage erstellen möchten, müssen Sie so vorgehen:
- Erstellen Sie mit den Parametern aus der Tabelle unten eine JSON-formatierte Datei mit dem Namen
<template-name>_metadata
.Hinweis: Benennen Sie die von Ihnen erstellte Datei nicht
<template-name>_metadata.json
. Die Datei enthält zwar JSON, der Dateiname darf aber nicht mit der Dateierweiterung.json
enden. - Speichern Sie die JSON-Datei in Cloud Storage im selben Ordner wie die Vorlage.
Hinweis: Die Vorlage muss in
<template-name>
und die Metadaten müssen in<template-name>_metadata
gespeichert werden.
Metadatenparameter
Parameterschlüssel | Erforderlich | Beschreibung des Werts | |
---|---|---|---|
name |
Ja | Der Name Ihrer Vorlage. | |
description |
Nein | Ein kurzer Textabschnitt, der die Vorlagen beschreibt. | |
parameters |
Nein. Die Standardeinstellung ist ein leeres Array. | Ein Array von zusätzlichen Parametern, die von der Vorlage verwendet werden. | |
name |
Ja | Der Name des Parameters, der in Ihrer Vorlage verwendet wird. | |
label |
Ja | Ein aussagekräftiges Etikett, mit dem der Parameter in der Benutzeroberfläche gekennzeichnet wird. | |
helpText |
Ja | Ein kurzer Textabschnitt, der den Parameter beschreibt. | |
isOptional |
Nein. Die Standardeinstellung ist "false". | false , wenn der Parameter erforderlich ist, und true , wenn der Parameter optional ist. |
|
regexes |
Nein. Die Standardeinstellung ist ein leeres Array. | Ein Array von regulären POSIX-egrep-Ausdrücken in Stringform, die verwendet werden, um den Wert des Parameters zu prüfen. Beispiel: ["^[a-zA-Z][a-zA-Z0-9]+"] ist ein einzelner regulärer Ausdruck, der bestätigt, dass der Wert mit einem Buchstaben beginnt und dann ein oder mehrere Zeichen enthält. |
Beispiel-Metadatendatei
Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Sie können diese Metadatendatei aus dem Vorlagenverzeichnis von Dataflow herunterladen.
Pipeline-E/A und Laufzeitparameter
Java: SDK 2.x
Einige E/A-Connectors enthalten Methoden, die ValueProvider
-Objekte akzeptieren. Informationen zur Unterstützung für einen bestimmten Connector und für eine bestimmte Methode finden Sie in der API-Referenzdokumentation für den jeweiligen E/A-Connector. Unterstützte Methoden haben eine Überlastung mit einem ValueProvider
. Hat eine Methode keine Überlastung, unterstützt sie keine Laufzeitparameter. Für die folgenden E/A-Connectors ist zumindest eine teilweise ValueProvider
-Unterstützung verfügbar:
- Dateibasierte E/A-Vorgänge:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(erfordert SDK 2.3.0 oder höher)PubSubIO
SpannerIO
* Hinweis: Wenn Sie eine Batchpipeline ausführen möchten, die BigQuery liest, müssen Sie .withTemplateCompatibility()
für alle BigQuery-Lesevorgänge verwenden.
Python
Einige E/A-Connectors enthalten Methoden, die ValueProvider
-Objekte akzeptieren. Informationen zur Unterstützung von E/A-Connectors und ihrer Methoden finden Sie in der API-Referenzdokumentation für den jeweiligen Connector. Die folgenden E/A-Connectors akzeptieren Laufzeitparameter:
- Dateibasierte E/A-Vorgänge:
textio
,avroio
,tfrecordio
Java: SDK 1.x
Vorlagen erstellen und zum Staging führen
Wenn Sie Ihre Pipeline geschrieben haben, müssen Sie Ihre Vorlagendatei erstellen und bereitstellen, d. h., ein Staging ausführen. Verwenden Sie den jeweiligen Befehl für Ihre SDK-Version.
Hinweis: Nach dem Erstellen und Staging einer Vorlage enthält der Staging-Pfad zusätzliche Dateien, die zur Ausführung Ihrer Vorlage erforderlich sind. Wenn Sie den Staging-Speicherort löschen, schlägt die Ausführung der Vorlage fehl.
Java: SDK 2.x
Mit diesem Maven-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --templateLocation
erstellt und bereitgestellt.
Ersetzen Sie
YOUR_PROJECT_ID
durch Ihre Projekt-ID.Ersetzen Sie
YOUR_BUCKET_NAME
durch den Namen Ihres Cloud Storage-Buckets.Ersetzen Sie
YOUR_TEMPLATE_NAME
durch den Namen Ihrer Vorlage.Ersetzen Sie
com.example.myclass
durch Ihre Java-Klasse.Prüfen Sie, ob der Pfad
templateLocation
korrekt ist.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=YOUR_PROJECT_ID \ --stagingLocation=gs://YOUR_BUCKET_NAME/staging \ --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
Python
Mit diesem Python-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --template_location
erstellt und bereitgestellt. Nehmen Sie folgende Änderungen am Befehl vor:
Ersetzen Sie
YOUR_PROJECT_ID
durch Ihre Projekt-ID.Ersetzen Sie
YOUR_BUCKET_NAME
durch den Namen Ihres Cloud Storage-Buckets.Ersetzen Sie
YOUR_TEMPLATE_NAME
durch den Namen Ihrer Vorlage.Ersetzen Sie
examples.mymodule
durch Ihr Python-Modul.Prüfen Sie, ob der Pfad
template_location
korrekt ist.
python -m examples.mymodule \ --runner DataflowRunner \ --project YOUR_PROJECT_ID \ --staging_location gs://YOUR_BUCKET_NAME/staging \ --temp_location gs://YOUR_BUCKET_NAME/temp \ --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
Java: SDK 1.x
Der nächste Schritt nach dem Erstellen und Staging der Vorlage ist dann das Ausführen dieser Vorlage.