In diesem Dokument erfahren Sie, wie Sie eine benutzerdefinierte klassische Vorlage aus Ihrem Dataflow-Pipelinecode erstellen. Klassische Vorlagen verpacken vorhandene Dataflow-Pipelines, um wiederverwendbare Vorlagen zu erstellen, die Sie für jeden Job anpassen können, indem Sie bestimmte Pipelineparameter ändern. Anstatt die Vorlage zu schreiben, verwenden Sie einen Befehl, um die Vorlage aus einer vorhandenen Pipeline zu generieren.
Es folgt ein kurzer Überblick über den Prozess. Dieser Vorgang wird in den folgenden Abschnitten erläutert.
- Verwenden Sie in Ihrem Pipelinecode die Schnittstelle
ValueProvider
für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten. Verwenden SieDoFn
-Objekte, die Laufzeitparameter akzeptieren. - Erweitern Sie Ihre Vorlage um zusätzliche Metadaten, damit benutzerdefinierte Parameter bei der Ausführung der klassischen Vorlage validiert werden. Beispiele für solche Metadaten sind der Name Ihrer benutzerdefinierten klassischen Vorlage und optionale Parameter.
- Prüfen Sie, ob die Pipeline-E/A-Connectors
ValueProvider
-Objekte unterstützen, und nehmen Sie bei Bedarf Änderungen vor. - Erstellen Sie die benutzerdefinierte klassische Vorlage und stellen Sie sie bereit.
- Führen Sie die benutzerdefinierte klassische Vorlage aus.
Weitere Informationen zu den verschiedenen Arten von Dataflow-Vorlagen, ihren Vorteilen und zur Auswahl einer klassischen Vorlage finden Sie unter Dataflow-Vorlagen.
Erforderliche Berechtigungen für die Ausführung einer klassischen Vorlage
Die Berechtigungen, die Sie zum Ausführen der klassischen Dataflow-Vorlage benötigen, hängen davon ab, wo Sie die Vorlage ausführen und ob sich die Quelle und Senke für die Pipeline in einem anderen Projekt befinden.
Weitere Informationen zum lokalen Ausführen von Dataflow-Pipelines oder zum Verwenden von Google Cloud finden Sie unter Sicherheit und Berechtigungen in Dataflow.
Eine Liste der Dataflow-Rollen und -Berechtigungen finden Sie unter Dataflow-Zugriffssteuerung.
Beschränkungen
- Die folgende Pipelineoption wird bei klassischen Vorlagen nicht unterstützt. Wenn Sie die Anzahl der Worker-Harness-Threads steuern müssen, verwenden Sie flexible Vorlagen.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Der Dataflow-Runner unterstützt
ValueProvider
-Optionen für Pub/Sub-Themen und Abo-Parameter nicht. Wenn Sie Pub/Sub-Optionen in Ihren Laufzeitparametern benötigen, verwenden Sie flexible Vorlagen.
Informationen zu Laufzeitparametern und der ValueProvider
-Schnittstelle
Die Schnittstelle ValueProvider
ermöglicht das Akzeptieren von Laufzeitparametern durch Pipelines. Apache Beam bietet drei Typen von ValueProvider
-Objekten.
Name | Beschreibung |
---|---|
RuntimeValueProvider |
Mit Verwenden Sie |
StaticValueProvider |
Mit Verwenden Sie |
NestedValueProvider |
Mit Verwenden Sie |
Laufzeitparameter im Pipelinecode verwenden
In diesem Abschnitt wird die Verwendung von ValueProvider
, StaticValueProvider
und NestedValueProvider
erläutert.
ValueProvider
in Ihren Pipelineoptionen verwenden
Verwenden Sie ValueProvider
für alle Pipelineoptionen, die Sie zur Laufzeit festlegen oder verwenden möchten.
Das folgende WordCount
-Code-Snippet unterstützt beispielsweise keine Laufzeitparameter. Mit dem Code wird eine Eingabedateioption hinzugefügt, eine Pipeline erstellt und es werden Zeilen aus der Eingabedatei gelesen.
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Wenn Sie eine Unterstützung für Laufzeitparameter hinzufügen möchten, ändern Sie die Eingabedateioption zur Verwendung von ValueProvider
.
Java
Verwenden Sie ValueProvider<String>
anstelle von String
für den Typ der Eingabedateioption.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Ersetzen Sie add_argument
durch add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
ValueProvider
in Funktionen verwenden
Zur Nutzung von Laufzeitparameterwerten in Ihren eigenen Funktionen müssen Sie für die Funktionen die Verwendung von ValueProvider
-Parametern festlegen.
Das folgende Beispiel enthält eine ValueProvider
-Ganzzahloption und eine einfache Funktion, die eine Ganzzahl hinzufügt. Die Funktion hängt von der ValueProvider
-Ganzzahl ab. Bei der Ausführung wendet die Pipeline MySumFn
auf jede Ganzzahl in einer PCollection
an, die [1, 2, 3]
enthält. Wenn der Laufzeitwert 10 ist, enthält die resultierende PCollection
den Wert [11, 12, 13]
.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
StaticValueProvider
verwenden
StaticValueProvider
ermöglicht die Bereitstellung eines statischen Werts für Ihre Pipeline.
In diesem Beispiel wird MySumFn
verwendet. Dabei handelt es sich um ein DoFn
-Objekt, das einen ValueProvider<Integer>
annimmt. Wenn Sie den Wert des Parameters im Voraus kennen, können Sie StaticValueProvider
verwenden, um Ihren statischen Wert als ValueProvider
anzugeben.
Java
Dieser Code ruft den Wert zur Pipelinelaufzeit ab:
.apply(ParDo.of(new MySumFn(options.getInt())))
Stattdessen können Sie StaticValueProvider
mit einem statischen Wert verwenden:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Dieser Code ruft den Wert zur Pipelinelaufzeit ab:
beam.ParDo(MySumFn(user_options.templated_int))
Stattdessen können Sie StaticValueProvider
mit einem statischen Wert verwenden:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Sie können StaticValueProvider
auch verwenden, wenn Sie ein E/A-Modul implementieren, das sowohl reguläre Parameter als auch Laufzeitparameter unterstützt.
StaticValueProvider
reduziert die Codeduplizierung durch die Implementierung zweier ähnlicher Methoden.
Java
Der Quellcode in diesem Beispiel stammt aus TextIO.java auf GitHub von Apache Beam.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
In diesem Beispiel gibt es einen einzelnen Konstruktor, der sowohl ein string
- als auch ein ValueProvider
-Argument akzeptiert. Wenn das Argument ein string
ist, wird es in einen StaticValueProvider
umgewandelt.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
NestedStaticValueProvider
verwenden
Zur Berechnung eines Werts aus einem anderen ValueProvider
-Objekt verwenden Sie NestedValueProvider
.
NestedValueProvider
nimmt einen ValueProvider
und einen SerializableFunction
-Übersetzer als Eingabe. Wenn Sie .get()
für einen NestedValueProvider
aufrufen, erstellt der Übersetzer einen neuen Wert basierend auf dem ValueProvider
-Wert. Bei dieser Übersetzung können Sie einen ValueProvider
-Wert verwenden, um den gewünschten endgültigen Wert zu erstellen:
Im folgenden Beispiel stellt der Nutzer den Dateinamen file.txt
bereit. Durch die Transformation wird der Pfad gs://directory_name/
dem Dateinamen vorangestellt. Der Aufruf von .get()
gibt gs://directory_name/file.txt
zurück.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Metadaten in Pipeline-Code verwenden
Sie können Ihre Vorlage um zusätzliche Metadaten erweitern, damit benutzerdefinierte Parameter bei der Ausführung der Vorlage überprüft werden. Wenn Sie Metadaten für Ihre Vorlage erstellen möchten, gehen Sie so vor:
- Erstellen Sie eine JSON-formatierte Datei mit dem Namen
TEMPLATE_NAME_metadata
. Verwenden Sie dazu die Parameter in Metadatenparametern und das Format in Beispiel-Metadatendatei. Ersetzen SieTEMPLATE_NAME
durch den Namen Ihrer Vorlage.Achten Sie darauf, dass die Metadatendatei keine Dateiendung hat. Wenn Ihr Vorlagenname beispielsweise
myTemplate
lautet, muss die MetadatendateimyTemplate_metadata
lauten. - Speichern Sie die Metadatendatei in Cloud Storage im selben Ordner wie die Vorlage.
Metadatenparameter
Parameterschlüssel | Erforderlich | Beschreibung des Werts | |
---|---|---|---|
name |
Ja | Der Name Ihrer Vorlage. | |
description |
Nein | Ein kurzer Textabschnitt, der die Vorlage beschreibt. | |
streaming |
Nein | Wenn true , unterstützt diese Vorlage das Streaming. Der Standardwert ist false . |
|
supportsAtLeastOnce |
Nein | Wenn true , unterstützt diese Vorlage die mindestens einmalige Verarbeitung. Der Standardwert ist false . Setzen Sie diesen Parameter auf true , wenn die Vorlage für den "Mindestens einmal"-Streamingmodus ausgelegt ist.
|
|
supportsExactlyOnce |
Nein | Bei true unterstützt diese Vorlage die genau einmalige Verarbeitung. Der Standardwert ist true . |
|
defaultStreamingMode |
Nein | Den Standard-Streamingmodus für Vorlagen, die sowohl den "Mindestens einmal"-Modus als auch den "Genau einmal"-Modus unterstützen. Verwenden Sie einen der folgenden Werte: "AT_LEAST_ONCE" ,
"EXACTLY_ONCE" . Wenn keine Angabe gemacht wird, wird der Standard-Streamingmodus "genau einmal" verwendet.
|
|
parameters |
Nein | Ein Array von zusätzlichen Parametern, die die Vorlage verwendet. Ein leeres Array wird standardmäßig verwendet. | |
name |
Ja | Der Name des Parameters, der in Ihrer Vorlage verwendet wird. | |
label |
Ja | Ein für Menschen lesbarer String, der in der Google Cloud Console verwendet wird, um den Parameter mit einem Label zu versehen. | |
helpText |
Ja | Ein kurzer Textabschnitt, der den Parameter beschreibt. | |
isOptional |
Nein | false , wenn der Parameter erforderlich ist, und true , wenn der Parameter optional ist. Sofern kein Wert festgelegt ist, wird für isOptional standardmäßig der Wert false verwendet.
Wenn Sie diesen Parameterschlüssel nicht für Ihre Metadaten angeben, werden die Metadaten zu einem erforderlichen Parameter. |
|
regexes |
Nein | Ein Array von regulären POSIX-egrep-Ausdrücken in Stringform, die verwendet werden, um den Wert des Parameters zu validieren. Beispiel: ["^[a-zA-Z][a-zA-Z0-9]+"] ist ein einzelner regulärer Ausdruck, der validiert, dass der Wert mit einem Buchstaben beginnt und dann ein oder mehrere Zeichen enthält. Ein leerer Array wird standardmäßig verwendet. |
Beispiel-Metadatendatei
Java
Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Der Dataflow-Dienst verwendet die folgenden Metadaten, um die benutzerdefinierten Parameter der WordCount-Vorlage zu validieren:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Sie können Metadatendateien für die von Google bereitgestellten Vorlagen aus dem Vorlagenverzeichnis von Dataflow herunterladen.
Unterstützte Pipeline-E/A-Connectors und ValueProvider
Java
Einige E/A-Connectors enthalten Methoden, die ValueProvider
-Objekte akzeptieren. Informationen zur Unterstützung für einen bestimmten Connector und für eine bestimmte Methode finden Sie in der API-Referenzdokumentation für den jeweiligen E/A-Connector. Unterstützte Methoden haben eine Überlastung mit einem ValueProvider
. Hat eine Methode keine Überlastung, unterstützt sie keine Laufzeitparameter. Für die folgenden E/A-Connectors ist zumindest eine teilweise ValueProvider
-Unterstützung verfügbar:
- Dateibasierte E/A-Vorgänge:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(erfordert SDK 2.3.0 oder höher)PubSubIO
SpannerIO
Python
Einige E/A-Connectors enthalten Methoden, die ValueProvider
-Objekte akzeptieren. Informationen zur Unterstützung von E/A-Connectors und ihrer Methoden finden Sie in der API-Referenzdokumentation für den jeweiligen Connector. Die folgenden E/A-Connectors akzeptieren Laufzeitparameter:
- Dateibasierte E/A-Vorgänge:
textio
,avroio
,tfrecordio
Klassische Vorlage erstellen und bereitstellen
Wenn Sie Ihre Pipeline geschrieben haben, müssen Sie Ihre Vorlagendatei erstellen und bereitstellen, d. h., ein Staging ausführen. Wenn Sie eine Vorlage erstellen und ein Staging ausführen, enthält der Staging-Pfad zusätzliche Dateien, die zur Ausführung Ihrer Vorlage erforderlich sind. Wenn Sie den Staging-Speicherort löschen, kann die Vorlage nicht ausgeführt werden. Der Dataflow-Job wird nicht unmittelbar nach dem Staging der Vorlage ausgeführt. Zum Ausführen eines benutzerdefinierten vorlagenbasierten Dataflow-Jobs können Sie die Google Cloud Console, die Dataflow REST API oder die gcloud CLI verwenden.
Das folgende Beispiel zeigt das Staging einer Vorlagendatei:
Java
Mit diesem Maven-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --templateLocation
erstellt und bereitgestellt.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
Prüfen Sie, ob der Pfad templateLocation
korrekt ist. Dabei gilt:
com.example.myclass
: Ihre Java-KlassePROJECT_ID
: Ihre Projekt-IDBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsTEMPLATE_NAME
: Der Name Ihrer VorlageREGION
: Die Region, in der Ihr Dataflow-Job bereitgestellt werden soll
Python
Mit diesem Python-Befehl wird eine Vorlage am Cloud Storage-Speicherort mit --template_location
erstellt und bereitgestellt.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
Prüfen Sie, ob der Pfad template_location
korrekt ist. Dabei gilt:
examples.mymodule
: Ihr Python-ModulPROJECT_ID
: Ihre Projekt-IDBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsTEMPLATE_NAME
: Der Name Ihrer VorlageREGION
: Die Region, in der Ihr Dataflow-Job bereitgestellt werden soll
Der nächste Schritt nach dem Erstellen und Staging der Vorlage ist dann das Ausführen dieser Vorlage.