Parameter zur Pipelineausführung festlegen

Wenn Ihr Apache Beam-Programm eine Pipeline erstellt hat, müssen Sie sie ausführen lassen. Die Pipeline und das Apache Beam-Programm werden getrennt ausgeführt: Zuerst erstellt das Apache Beam-Programm die Pipeline und dann generiert der von Ihnen geschriebene Code eine Reihe von Schritten, die von einem Pipeline-Runner ausgeführt werden müssen. Beim Pipeline-Runner kann es sich um den verwalteten Dataflow-Dienst in Google Cloud, einen Runner-Dienst von einem Drittanbieter oder einen lokalen Pipeline-Runner handeln, der die Schritte direkt in der lokalen Umgebung ausführt.

Sie können den Pipeline-Runner und andere Ausführungsoptionen mit der Klasse PipelineOptions des Apache Beam SDK angeben. Mit PipelineOptions konfigurieren Sie, wie und wo die Pipeline ausgeführt wird und welche Ressourcen sie verwendet.

Meistens bietet es sich an, Ihre Pipeline über den Dataflow-Runner-Dienst mit verwalteten Google Cloud-Ressourcen auszuführen. Wenn Sie die Pipeline mit dem Dataflow-Dienst ausführen, wird ein Dataflow-Job erstellt, der Compute Engine- und Cloud Storage-Ressourcen in Ihrem Google Cloud-Projekt verwendet.

Sie können Ihre Pipeline aber auch lokal ausführen. Dabei werden die Pipeline-Transformationen auf demselben Computer ausgeführt wie das Dataflow-Programm. Eine lokale Ausführung ist zu Test- und Fehlerbehebungszwecken nützlich, insbesondere wenn Ihre Pipeline kleinere In-Memory-Datensätze verwenden kann.

PipelineOptions festlegen

Beim Erstellen des Pipeline-Objekts in Ihrem Dataflow-Programm übergeben Sie PipelineOptions. Wenn der Dataflow-Dienst Ihre Pipeline ausführt, sendet er eine Kopie von PipelineOptions an jede Worker-Instanz.

Java: SDK 2.x

Hinweis: Mit der Methode ProcessContext.getPipelineOptions können Sie in jeder DoFn-Instanz von ParDo auf PipelineOptions zugreifen.

Python

Diese Funktion wird im Apache Beam SDK für Python noch nicht unterstützt.

Java: SDK 1.x

PipelineOptions über Befehlszeilenargumente festlegen

Zum Konfigurieren der Pipeline können Sie ein Objekt der Klasse PipelineOptions erstellen und die Felder direkt festlegen. Die Apache Beam SDKs enthalten jedoch auch einen Befehlszeilenparser, mit dem Sie Felder in PipelineOptions mithilfe von Befehlszeilenargumenten angeben können.

Java: SDK 2.x

Zum Lesen von Optionen aus der Befehlszeile konstruieren Sie Ihr PipelineOptions-Objekt mit der Methode PipelineOptionsFactory.fromArgs wie im folgenden Beispielcode:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Hinweis: Wenn die Methode .withValidation angefügt wird, sucht Dataflow nach erforderlichen Befehlszeilenargumenten und validiert deren Werte.

Mit PipelineOptionsFactory.fromArgs werden Befehlszeilenargumente in folgendem Format interpretiert:

--<option>=<value>

Wenn Sie PipelineOptions auf diese Weise erstellen, können Sie beliebige Optionen in jeder Subschnittstelle von org.apache.beam.sdk.options.PipelineOptions als Befehlszeilenargumente angeben.

Python

Zum Lesen von Optionen aus der Befehlszeile konstruieren Sie Ihr PipelineOptions-Objekt wie im folgenden Beispielcode:

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

Das Argument (flags=argv) für PipelineOptions interpretiert Befehlszeilenargumente in folgendem Format:

--<option>=<value>

Wenn Sie PipelineOptions auf diese Weise erstellen, können Sie alle Optionen durch abgeleitete Klassen von PipelineOptions angeben.

Java: SDK 1.x

Benutzerdefinierte Optionen erstellen

Zusätzlich zu den standardmäßigen PipelineOptions können Sie eigene benutzerdefinierte Optionen hinzufügen. Außerdem kann der Befehlszeilenparser von Dataflow Ihre benutzerdefinierten Optionen mithilfe von Befehlszeilenargumenten im selben Format einrichten.

Java: SDK 2.x

Definieren Sie zum Hinzufügen Ihrer eigenen Optionen eine Schnittstelle mit Getter- und Setter-Methoden für die jeweilige Option wie im folgenden Beispiel:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Verwenden Sie zum Hinzufügen Ihrer eigenen Optionen die Methode add_argument(), die sich genau wie das Python-Standardmodul argparseverhält, wie im folgenden Beispiel:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

Sie können auch eine Beschreibung, die angezeigt wird, wenn ein Nutzer --help als Befehlszeilenargument übergibt, und einen Standardwert angeben.

Java: SDK 2.x

Beschreibungen und Standardwerte legen Sie mithilfe von Annotationen so fest:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Registrieren Sie die Schnittstelle möglichst mit PipelineOptionsFactory und übergeben Sie diese, wenn Sie das Objekt PipelineOptions erstellen. Wenn Sie Ihre Schnittstelle bei PipelineOptionsFactory registrieren, kann --help Ihre Schnittstelle für benutzerdefinierte Optionen finden und der Ausgabe des Befehls --help hinzufügen. PipelineOptionsFactory prüft außerdem, ob Ihre benutzerdefinierten Optionen mit allen anderen registrierten Optionen kompatibel sind.

Am folgenden Beispielcode sehen Sie, wie Sie Ihre Schnittstelle für benutzerdefinierte Optionen bei PipelineOptionsFactory registrieren:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Nun kann Ihre Pipeline --myCustomOption=value als Befehlszeilenargument akzeptieren.

Python

Beschreibungen und Standardwerte legen Sie so fest:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java: SDK 1.x

PipelineOptions für die Ausführung im Cloud Dataflow-Dienst konfigurieren

Um Ihre Pipeline mit dem verwalteten Dataflow-Dienst auszuführen, müssen Sie in PipelineOptions die folgenden Felder festlegen:

Java: SDK 2.x

  • project: Die ID Ihres Google Cloud-Projekts.
  • runner: Der Pipeline-Runner, mit dem das Programm geparst und die Pipeline erstellt wird. Für eine cloudbasierte Ausführung müssen Sie DataflowRunner verwenden.
  • gcpTempLocation: Ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Dateien verwendet. Sie müssen diesen Bucket erstellen, bevor Sie die Pipeline ausführen. Falls Sie gcpTempLocation nicht festlegen, können Sie die Pipeline-Option tempLocation angeben. In diesem Fall wird gcpTempLocation auf den Wert von tempLocation gesetzt. Wenn Sie keines von beiden angeben, wird gcpTempLocation mit einem Standardwert erstellt.
  • stagingLocation: Ein Cloud Storage-Bucket, den Dataflow für das Staging Ihrer Binärdateien verwendet. Wenn Sie diese Option nicht festlegen, wird der für tempLocation angegebene Wert auch als Staging-Speicherort verwendet.
  • Ein Wert für gcpTempLocation wird standardmäßig erstellt, wenn dieser fehlt und tempLocation nicht angegeben wurde. Wenn tempLocation angegeben wird, gcpTempLocation jedoch nicht, muss tempLocation ein Cloud Storage-Pfad sein. gcpTempLocation wird dann standardmäßig darauf festgelegt. Ist tempLocation nicht angegeben, dafür aber gcpTempLocation, wird tempLocation nicht mit Daten befüllt.

Hinweis: Wenn Sie das Apache Beam SDK für Java 2.15.0 oder höher verwenden, müssen Sie auch region angeben.

Python

  • project: Die ID Ihres Google Cloud-Projekts.
  • runner: Der Pipeline-Runner, mit dem das Programm geparst und die Pipeline erstellt wird. Für eine cloudbasierte Ausführung müssen Sie DataflowRunner verwenden.
  • staging_location: Ein Cloud Storage-Pfad, den Dataflow für das Staging von Codepaketen verwendet, die von Workern zum Ausführen des Jobs benötigt werden.
  • temp_location: Ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Jobdateien verwendet, die während der Pipelineausführung erstellt werden.

Hinweis: Wenn Sie das Apache Beam SDK für Python 2.15.0 oder höher verwenden, müssen Sie auch region angeben.

Java: SDK 1.x

Sie können diese Optionen programmatisch oder über die Befehlszeile festlegen. Der folgende Beispielcode zeigt, wie eine Pipeline erstellt wird, indem der Runner und andere erforderliche Optionen zum Ausführen der Pipeline mithilfe des verwalteten Dataflow-Dienstes programmgesteuert festgelegt werden.

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java: SDK 1.x

Nachdem Sie Ihre Pipeline konstruiert haben, legen Sie alle Lese- und Schreibvorgänge sowie Transformationen fest und führen die Pipeline aus.

Der folgende Beispielcode zeigt, wie Sie die erforderlichen Optionen für die Ausführung im Dataflow-Dienst über die Befehlszeile einrichten:

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java: SDK 1.x

Nachdem Sie Ihre Pipeline konstruiert haben, legen Sie alle Lese- und Schreibvorgänge sowie Transformationen fest und führen die Pipeline aus.

Java: SDK 2.x

Verwenden Sie beim Übergeben der erforderlichen Optionen in der Befehlszeile die Optionen --project, --runner, --gcpTempLocation und optional --stagingLocation.

Python

Verwenden Sie beim Übergeben der erforderlichen Optionen in der Befehlszeile die Optionen --project, --runner und --staging_location.

Java: SDK 1.x

Asynchrone Ausführung

Java: SDK 2.x

Mit dem DataflowRunner wird Ihre Pipeline in Google Cloud asynchron ausgeführt. Während der Pipelineausführung können Sie über die Monitoring-Oberfläche oder die Befehlszeile von Dataflow den Jobfortschritt überwachen, Details zur Ausführung ansehen und aktuelle Informationen zu den Ergebnissen der Pipeline erhalten.

Python

Mit dem DataflowRunner wird Ihre Pipeline in Google Cloud asynchron ausgeführt. Während der Pipelineausführung können Sie über die Monitoring-Oberfläche oder die Befehlszeile von Dataflow den Jobfortschritt überwachen, Details zur Ausführung ansehen und aktuelle Informationen zu den Ergebnissen der Pipeline erhalten.

Java: SDK 1.x

Synchrone Ausführung

Java: SDK 2.x

Legen Sie DataflowRunner als den Pipeline-Runner fest und rufen Sie pipeline.run().waitUntilFinish() explizit auf.

Wenn Sie DataflowRunner verwenden und waitUntilFinish() für das von pipeline.run() zurückgegebene Objekt PipelineResult aufrufen, wird die Pipeline in der Cloud ausgeführt, aber der lokale Code wartet auf den Abschluss des Cloudjobs und gibt das endgültige DataflowPipelineJob-Objekt zurück. Während der Jobausführung gibt der Dataflow-Dienst Jobstatusaktualisierungen und Konsolennachrichten aus.

In Java 2.x entspricht der explizite Aufruf von waitUntilFinish() durch das Hauptprogramm der Befehlszeilenoption --runner BlockingDataflowPipelineRunner des Java SDK 1.x, mit der interaktiv veranlasst wird, das Hauptprogramm zu blockieren, bis die Pipeline beendet ist.

Python

Wenn das Hauptprogramm bis zum Abschluss der Pipeline blockiert werden soll, verwenden Sie die Methode wait_until_finish() des PipelineResult-Objekts, das von der run()-Methode des Runners zurückgegeben wird.

Java: SDK 1.x

Hinweis: Der Job kann nicht über Ctrl+C von der Befehlszeile aus abgebrochen werden. Der Dataflow-Dienst führt den Job weiter in Google Cloud aus. Sie müssen den Job über die Monitoring-Oberfläche oder die Befehlszeile von Dataflow abbrechen.

Streamingausführung

Java: SDK 2.x

Falls Ihre Pipeline aus einer unbegrenzten Datenquelle wie Pub/Sub liest, wird die Pipeline automatisch im Streamingmodus ausgeführt.

Wenn die Pipeline unbegrenzte Datenquellen und -senken verwendet, müssen Sie für Ihre unbegrenzten PCollections eine Windowing-Strategie auswählen, bevor Sie eine Aggregation wie GroupByKey nutzen.

Python

Wenn die Pipeline unbegrenzte Datenquellen oder -senken wie Pub/Sub verwendet, müssen Sie die Option streaming auf "true" festlegen.

Streamingjobs verwenden standardmäßig den Compute Engine-Maschinentyp n1-standard-2 oder höher. Überschreiben Sie diese Einstellung nicht, da n1-standard-2 der erforderliche Mindestmaschinentyp zum Ausführen von Streamingjobs ist.

Wenn die Pipeline unbegrenzte Datenquellen und -senken verwendet, müssen Sie für Ihre unbegrenzten PCollections eine Windowing-Strategie auswählen, bevor Sie eine Aggregation wie GroupByKey nutzen.

Java: SDK 1.x

Andere Cloud Dataflow-Pipelineoptionen festlegen

Zum Ausführen Ihrer Pipeline in der Cloud können Sie die folgenden Felder im PipelineOptions-Objekt programmgesteuert festlegen:

Java: SDK 2.x

Feld Typ Beschreibung Standardwert
runner Class (NameOfRunner) Der zu verwendende PipelineRunner. In diesem Feld können Sie den PipelineRunner zur Laufzeit bestimmen. DirectRunner (lokaler Modus)
streaming boolean Gibt an, ob der Streamingmodus aktiviert oder deaktiviert wird (true = aktiviert). Wenn Ihre Pipeline aus einer unbegrenzten Quelle liest, lautet der Standardwert true. Andernfalls lautet er false.
project String Die Projekt-ID Ihres Google Cloud-Projekts. Diese ist erforderlich, wenn Sie Ihre Pipeline mit dem verwalteten Dataflow-Dienst ausführen möchten. Enthält standardmäßig das aktuell im Cloud-SDK konfigurierte Projekt, wenn kein Wert festgelegt wird.
jobName String Der Name des ausgeführten Dataflow-Jobs, wie er in der Dataflow-Jobliste und in den Jobdetails angezeigt wird. Wird auch beim Aktualisieren einer vorhandenen Pipeline verwendet. Dataflow generiert automatisch einen eindeutigen Namen.
gcpTempLocation String Cloud Storage-Pfad für temporäre Dateien. Muss eine gültige Cloud Storage-URL sein, die mit gs:// beginnt.
stagingLocation String Cloud Storage-Pfad für das Staging lokaler Dateien. Muss eine gültige Cloud Storage-URL sein, die mit gs:// beginnt. Enthält den Wert, den Sie für tempLocation angegeben haben, wenn nichts anderes festgelegt ist.
autoscalingAlgorithm String Der Autoscaling-Modus für Ihren Dataflow-Job. Mögliche Werte sind THROUGHPUT_BASED, um das Autoscaling zu aktivieren, oder NONE, um es zu deaktivieren. Weitere Informationen dazu, wie Autoscaling im verwalteten Dataflow-Dienst funktioniert, finden Sie unter Autotuning-Features. THROUGHPUT_BASED ist die Standardeinstellung für alle Batch-Dataflow-Jobs und für Streamingjobs, die Streaming Engine verwenden. NONE ist die Standardeinstellung für Streamingjobs, die keine Streaming Engine verwenden.
numWorkers int Die anfängliche Zahl der Google Compute Engine-Instanzen für die Ausführung der Pipeline. Diese Option legt fest, wie viele Worker der Dataflow-Dienst startet, wenn Ihr Job beginnt. Wenn keine Angabe erfolgt, ermittelt der Dataflow-Dienst die erforderliche Anzahl an Workern.
maxNumWorkers int Die maximale Zahl der Compute Engine-Instanzen, die der Pipeline während der Ausführung zur Verfügung gestellt werden. Diese kann höher als die anfängliche Worker-Anzahl (angegeben durch numWorkers) sein, damit Ihr Job automatisch oder anderweitig skaliert werden kann. Wenn kein Wert angegeben ist, bestimmt der Dataflow-Dienst die Zahl der Worker.
numberOfWorkerHarnessThreads int Die Anzahl der Threads pro Worker-Harness. Wenn kein Wert angegeben ist, bestimmt der Dataflow-Dienst eine geeignete Anzahl von Threads pro Worker.
region String Gibt einen regionalen Endpunkt für die Bereitstellung Ihrer Dataflow-Jobs an. Enthält standardmäßig den Wert us-central1, wenn nichts anderes festgelegt wird.
workerRegion String

Gibt eine Compute Engine-Region an, in der Worker-Instanzen zum Ausführen der Pipeline gestartet werden. Diese Option wird verwendet, um Worker an einem anderen Standort als der region auszuführen, in der Jobs bereitgestellt, verwaltet und überwacht werden. Die Zone für workerRegion wird automatisch zugewiesen.

Hinweis: Diese Option kann nicht mit workerZone oder zone kombiniert werden.

Enthält standardmäßig den für region festgelegten Wert, wenn nichts anderes festgelegt wird.
workerZone String

Gibt eine Compute Engine-Zone an, in der Worker-Instanzen zum Ausführen der Pipeline gestartet werden. Diese Option wird verwendet, um Worker an einem anderen Standort als der region auszuführen, in der Jobs bereitgestellt, verwaltet und überwacht werden.

Hinweis: Diese Option kann nicht mit workerRegion oder zone kombiniert werden.

Wenn Sie entweder region oder workerRegion angeben, verwendet workerZone standardmäßig eine Zone aus der entsprechenden Region. Dieses Verhalten lässt sich durch Angabe einer anderen Zone überschreiben.
zone String (Verworfen) Für Apache Beam SDK 2.17.0 oder eine frühere Version wird hier die Compute Engine-Zone angegeben, in der Worker-Instanzen zum Ausführen der Pipeline gestartet werden. Wenn Sie region angeben, verwendet zone standardmäßig eine Zone aus der entsprechenden Region. Dieses Verhalten lässt sich durch Angabe einer anderen Zone überschreiben.
dataflowKmsKey String Gibt den vom Kunden verwalteten Verschlüsselungsschlüssel (Customer-Managed Encryption Key, CMEK) zum Verschlüsseln inaktiver Daten an. Sie können den Verschlüsselungsschlüssel über Cloud KMS verwalten. Sie müssen auch gcpTempLocation angeben, um dieses Feature verwenden zu können. Wenn kein Wert angegeben ist, verwendet Dataflow die standardmäßige Google Cloud-Verschlüsselung anstelle eines CMEK.
flexRSGoal String Gibt Flexible Resource Scheduling (FlexRS) für automatisch skalierte Batchjobs an. Wirkt sich auf die Parameter numWorkers, autoscalingAlgorithm, zone, region und workerMachineType aus. Weitere Informationen finden Sie im Abschnitt zu den FlexRS-Pipelineoptionen. Wenn nicht angegeben, wird standardmäßig SPEED_OPTIMIZED verwendet. Dies entspricht dem Weglassen dieses Flags. Zum Aktivieren von FlexRS müssen Sie den Wert COST_OPTIMIZED angeben, damit der Dataflow-Dienst alle verfügbaren ermäßigten Ressourcen auswählen kann.
filesToStage List<String> Eine nicht leere Liste lokaler Dateien, Dateiverzeichnisse oder Archive (z. B. JAR- oder ZIP-Dateien), die jedem Worker zur Verfügung gestellt werden. Wenn Sie diese Option einrichten, werden nur die angegebenen Dateien hochgeladen (der Java-Klassenpfad wird ignoriert). Sie müssen alle Ihre Ressourcen in der richtigen Klassenpfadreihenfolge angeben. Die Ressourcen sind nicht auf Code beschränkt, sondern können auch Konfigurationsdateien und andere Ressourcen enthalten, die allen Workern zur Verfügung gestellt werden sollen. Ihr Code kann mithilfe der standardmäßigen Ressourcensuchmethoden von Java auf die aufgeführten Ressourcen zugreifen. Achtung: Sie sollten keinen Verzeichnispfad angeben, da Dataflow die Dateien vor dem Hochladen komprimiert, was eine längere Startzeit zur Folge hat. Nutzen Sie diese Option außerdem nicht, um Daten an Worker zu übertragen, die von der Pipeline verarbeitet werden sollen, denn dies dauert wesentlich länger als mit nativen Cloud Storage/BigQuery APIs in Kombination mit der entsprechenden Dataflow-Datenquelle. Wenn das Feld filesToStage leer ist, leitet Dataflow die Staging-Dateien aus dem Java-Klassenpfad ab. Die Überlegungen und Hinweise aus der Spalte links (Arten der aufzuführenden Dateien und Zugriff darauf über Ihren Code) gelten hierbei ebenso.
network String Das Compute Engine-Netzwerk für den Start von Compute Engine-Instanzen zum Ausführen Ihrer Pipeline. Siehe Netzwerk angeben. Wenn nichts festgelegt ist, geht Google Cloud davon aus, dass Sie ein Standardnetzwerk mit dem Namen default verwenden möchten.
subnetwork String Das Compute Engine-Subnetzwerk für den Start von Compute Engine-Instanzen zum Ausführen Ihrer Pipeline. Siehe Subnetzwerk angeben. Der Dataflow-Dienst ermittelt den Standardwert.
usePublicIps boolean Gibt an, ob Dataflow-Worker öffentliche IP-Adressen verwenden. Wenn der Wert auf false gesetzt ist, verwenden die Dataflow-Worker für die gesamte Kommunikation private IP-Adressen. In diesem Fall wird die Option network ignoriert, wenn die Option subnetwork angegeben ist. Für das angegebene network oder subnetwork muss der private Google-Zugriff aktiviert sein. Wenn Sie diese Option nicht festlegen, gilt der Standardwert true und die Dataflow-Worker verwenden öffentliche IP-Adressen.
enableStreamingEngine boolean Gibt an, ob Streaming Engine von Dataflow aktiviert oder deaktiviert wird (true = aktiviert). Wenn Streaming Engine aktiviert wird, können die Schritte der Streamingpipeline im Dataflow-Dienst-Back-End ausgeführt und damit CPU-, Arbeitsspeicher- und nichtflüchtige Speicherressourcen eingespart werden. Der Standardwert ist false. Dies bedeutet, dass die Schritte der Streamingpipeline ausschließlich auf Worker-VMs ausgeführt werden.
createFromSnapshot String Gibt die Snapshot-ID an, die beim Erstellen eines Streamingjobs verwendet werden soll. Snapshots speichern den Status einer Streamingpipeline und ermöglichen es Ihnen, eine neue Version Ihres Jobs aus diesem Status zu starten. Weitere Informationen zu Snapshots finden Sie unter Snapshots verwenden. Es wird kein Snapshot zum Erstellen eines Jobs verwendet, wenn nichts anderes festgelegt wird.
hotKeyLoggingEnabled boolean Gibt an, dass der Schlüssel im Cloud Logging-Projekt des Nutzers ausgegeben wird, wenn ein „heißer” Schlüssel in der Pipeline erkannt wird. Wenn nicht festgelegt, wird nur das Vorhandensein eines „heißen” Schlüssels protokolliert.
diskSizeGb int

Die für jede Remote-Compute Engine-Worker-Instanz zu verwendende Laufwerkgröße in Gigabyte. Falls festgelegt, geben Sie mindestens 30 GB an, um das Worker-Boot-Image und die lokalen Logs zu berücksichtigen.

Bei Batch-Jobs, die Dataflow Shuffle verwenden, legt diese Option die Größe des Bootlaufwerks einer Worker-VM fest. Bei Batch-Jobs, die Dataflow Shuffle nicht verwenden, legt diese Option die Größe der Laufwerke fest, die zum Speichern von Zufallsdaten verwendet werden. Die Größe des Bootlaufwerks ist davon nicht betroffen.

Wenn ein Streamingjob Streaming Engine verwendet, wird mit dieser Option die Größe der Bootlaufwerke festgelegt. Bei Streamingjobs ohne Streaming Engine wird mit dieser Option die Größe jedes zusätzlichen nichtflüchtigen Speichers festgelegt, der vom Dataflow-Dienst erstellt wird. Das Bootlaufwerk ist nicht betroffen. Wenn ein Streamingjob Streaming Engine nicht verwendet, können Sie die Größe des Bootlaufwerks mit dem Experiments-Flag streaming_boot_disk_size_gb festlegen. Geben Sie beispielsweise --experiments=streaming_boot_disk_size_gb=80 an, um Bootlaufwerke mit einer Größe von 80 GB zu erstellen.

Legen Sie als Wert 0 fest, um die in Ihrem Cloud Platform-Projekt definierte Standardgröße zu verwenden.

Wenn ein Batchjob Dataflow Shuffle verwendet, ist die Standardeinstellung 25 GB. Andernfalls beträgt sie 250 GB.

Wenn für einen Streamingjob Streaming Engine verwendet wird, ist die Standardeinstellung 30 GB. Andernfalls beträgt sie 400 GB.

Warnung: Bei einer geringeren Laufwerkgröße wird die verfügbare Shuffle-E/A reduziert. An Shuffle gebundene Jobs, die nicht mit Dataflow Shuffle oder Streaming Engine arbeiten, können sich die Laufzeit und die Jobkosten erhöhen.

serviceAccount String Legt ein vom Nutzer verwaltetes Controller-Dienstkonto fest und verwendet dafür das Format my-service-account-name@<project-id>.iam.gserviceaccount.com. Weitere Informationen finden Sie im Abschnitt Controller-Dienstkonto auf der Seite "Cloud Dataflow-Sicherheit und -Berechtigungen". Wenn nicht festgelegt, verwenden Worker das Compute Engine-Dienstkonto Ihres Projekts als Controller-Dienstkonto.
workerDiskType String Der Typ des zu verwendenden nichtflüchtigen Speichers, angegeben durch die vollständige URL der Ressource für den Laufwerkstyp. Verwenden Sie beispielsweise compute.googleapis.com/projects//zones//diskTypes/pd-ssd, um einen nichtflüchtigen SSD-Speicher anzugeben. Weitere Informationen finden Sie auf der Referenzseite der Compute Engine API für DiskTypes. Der Dataflow-Dienst ermittelt den Standardwert.
workerMachineType String

Der Compute Engine-Maschinentyp, den Dataflow beim Starten von Worker-VMs verwendet. Sie können jede der verfügbaren Compute Engine-Maschinentypfamilien sowie benutzerdefinierte Maschinentypen verwenden.

Die besten Ergebnisse erzielen Sie mit n1-Maschinentypen. Maschinentypen mit gemeinsam genutztem Kern, beispielsweise Worker der Serien f1 und g1, werden im Rahmen des Service Level Agreements von Dataflow nicht unterstützt.

Beachten Sie, dass Dataflow nach der Anzahl der vCPUs und GB Arbeitsspeicher in Workern abrechnet. Die Abrechnung erfolgt unabhängig von der Maschinentypfamilie.

Wenn Sie diese Option nicht festlegen, wählt der Dataflow-Dienst einen geeigneten Maschinentyp anhand Ihres Jobs aus.

Eine vollständige Liste der Pipelinekonfigurationsoptionen finden Sie in der API-Referenzdokumentation für Java zum PipelineOptions-Interface (und den Unterinterfaces).

Python

Feld Typ Beschreibung Standardwert
runner str Der zu verwendende PipelineRunner. Dieses Feld kann entweder DirectRunner oder DataflowRunner sein. DirectRunner (lokaler Modus)
streaming bool Gibt an, ob der Streamingmodus aktiviert oder deaktiviert wird (true = aktiviert). false
project str Die Projekt-ID Ihres Google Cloud-Projekts. Diese ist erforderlich, wenn Sie Ihre Pipeline mit dem verwalteten Dataflow-Dienst ausführen möchten. Wenn Sie dieses Feld nicht angeben, wird ein Fehler ausgegeben.
job_name String Der Name des ausgeführten Dataflow-Jobs, wie er in der Dataflow-Jobliste und in den Jobdetails angezeigt wird. Dataflow generiert automatisch einen eindeutigen Namen.
temp_location str Cloud Storage-Pfad für temporäre Dateien. Muss eine gültige Cloud Storage-URL sein, die mit gs:// beginnt. Enthält standardmäßig den Wert für staging_location, wenn nichts anderes festgelegt wird. Sie müssen mindestens eine der Optionen temp_location oder staging_location angeben, um Ihre Pipeline in Google Cloud auszuführen.
staging_location str Cloud Storage-Pfad für das Staging lokaler Dateien. Muss eine gültige Cloud Storage-URL sein, die mit gs:// beginnt. Enthält standardmäßig ein Staging-Verzeichnis aus temp_location, wenn nichts anderes festgelegt wird. Sie müssen mindestens eine der Optionen temp_location oder staging_location angeben, um Ihre Pipeline in Google Cloud auszuführen.
autoscaling_algorithm str Der Autoscaling-Modus für Ihren Dataflow-Job. Mögliche Werte sind THROUGHPUT_BASED, um das Autoscaling zu aktivieren, oder NONE, um es zu deaktivieren. Weitere Informationen dazu, wie Autoscaling im verwalteten Dataflow-Dienst funktioniert, finden Sie unter Autotuning-Features. THROUGHPUT_BASED ist die Standardeinstellung für alle Batch-Dataflow-Jobs und für Streamingjobs, die Streaming Engine verwenden. NONE ist die Standardeinstellung für Streamingjobs, die keine Streaming Engine verwenden.
num_workers int Die Anzahl der Compute Engine-Instanzen, die beim Ausführen der Pipeline verwendet werden sollen. Wenn kein Wert angegeben ist, bestimmt der Dataflow-Dienst die Zahl der Worker.
max_num_workers int Die maximale Zahl der Compute Engine-Instanzen, die der Pipeline während der Ausführung zur Verfügung gestellt werden. Diese kann höher als die anfängliche Worker-Anzahl (angegeben durch num_workers) sein, damit Ihr Job automatisch oder anderweitig skaliert werden kann. Wenn kein Wert angegeben ist, bestimmt der Dataflow-Dienst die Zahl der Worker.
number_of_worker_harness_threads int Die Anzahl der Threads pro Worker-Harness. Wenn kein Wert angegeben ist, bestimmt der Dataflow-Dienst eine geeignete Anzahl von Threads pro Worker. Um diesen Parameter verwenden zu können, müssen Sie auch das Flag --experiments=use_runner_v2 verwenden
region str Gibt einen regionalen Endpunkt für die Bereitstellung Ihrer Dataflow-Jobs an. Enthält standardmäßig den Wert us-central1, wenn nichts anderes festgelegt wird.
worker_region String

Gibt eine Compute Engine-Region an, in der Worker-Instanzen zum Ausführen der Pipeline gestartet werden. Diese Option wird verwendet, um Worker an einem anderen Standort als der region auszuführen, in der Jobs bereitgestellt, verwaltet und überwacht werden. Die Zone für worker_region wird automatisch zugewiesen.

Hinweis: Diese Option kann nicht mit worker_zone oder zone kombiniert werden.

Enthält standardmäßig den für region festgelegten Wert, wenn nichts anderes festgelegt wird.
worker_zone String

Gibt eine Compute Engine-Zone an, in der Worker-Instanzen zum Ausführen der Pipeline gestartet werden. Diese Option wird verwendet, um Worker an einem anderen Standort als der region auszuführen, in der Jobs bereitgestellt, verwaltet und überwacht werden.

Hinweis: Diese Option kann nicht mit worker_region oder zone kombiniert werden.

Wenn Sie entweder region oder worker_region angeben, verwendet worker_zone standardmäßig eine Zone aus der entsprechenden Region. Dieses Verhalten lässt sich durch Angabe einer anderen Zone überschreiben.
zone str (Verworfen) Für Apache Beam SDK 2.17.0 oder eine frühere Version wird hier die Compute Engine-Zone angegeben, in der Worker-Instanzen zum Ausführen der Pipeline gestartet werden. Wenn Sie region angeben, verwendet zone standardmäßig eine Zone aus der entsprechenden Region. Dieses Verhalten lässt sich durch Angabe einer anderen Zone überschreiben.
dataflow_kms_key str Gibt den vom Kunden verwalteten Verschlüsselungsschlüssel (Customer-Managed Encryption Key, CMEK) zum Verschlüsseln inaktiver Daten an. Sie können den Verschlüsselungsschlüssel über Cloud KMS verwalten. Sie müssen auch temp_location angeben, um dieses Feature verwenden zu können. Wenn kein Wert angegeben ist, verwendet Dataflow die standardmäßige Google Cloud-Verschlüsselung anstelle eines CMEK.
flexrs_goal str Gibt Flexible Resource Scheduling (FlexRS) für automatisch skalierte Batchjobs an. Wirkt sich auf die Parameter num_workers, autoscaling_algorithm, zone, region und machine_type aus. Weitere Informationen finden Sie im Abschnitt zu den FlexRS-Pipelineoptionen. Wenn nicht angegeben, wird standardmäßig SPEED_OPTIMIZED verwendet. Dies entspricht dem Weglassen dieses Flags. Zum Aktivieren von FlexRS müssen Sie den Wert COST_OPTIMIZED angeben, damit der Dataflow-Dienst alle verfügbaren ermäßigten Ressourcen auswählen kann.
network str Das Compute Engine-Netzwerk für den Start von Compute Engine-Instanzen zum Ausführen Ihrer Pipeline. Siehe Netzwerk angeben. Wenn nichts festgelegt ist, geht Google Cloud davon aus, dass Sie ein Standardnetzwerk mit dem Namen default verwenden möchten.
subnetwork str Das Compute Engine-Subnetzwerk für den Start von Compute Engine-Instanzen zum Ausführen Ihrer Pipeline. Siehe Subnetzwerk angeben. Der Dataflow-Dienst ermittelt den Standardwert.
use_public_ips bool Legt fest, dass Dataflow-Worker öffentliche IP-Adressen verwenden müssen. Wenn der Wert auf false gesetzt ist, verwenden die Dataflow-Worker für die gesamte Kommunikation private IP-Adressen. In diesem Fall wird die Option network ignoriert, wenn die Option subnetwork angegeben ist. Für das angegebene network oder subnetwork muss der private Google-Zugriff aktiviert sein. Für diese Option ist das Beam SDK für Python erforderlich. Das veraltete Dataflow SDK für Python unterstützt es nicht. Ist nichts festgelegt, verwenden Dataflow-Worker öffentliche IP-Adressen.
enable_streaming_engine bool Gibt an, ob Streaming Engine von Dataflow aktiviert oder deaktiviert wird (true = aktiviert). Wenn Streaming Engine aktiviert wird, können die Schritte der Streamingpipeline im Dataflow-Dienst-Back-End ausgeführt und damit CPU-, Arbeitsspeicher- und nichtflüchtige Speicherressourcen eingespart werden. Der Standardwert ist false. Dies bedeutet, dass die Schritte der Streamingpipeline ausschließlich auf Worker-VMs ausgeführt werden.
disk_size_gb int

Die für jede Remote-Compute Engine-Worker-Instanz zu verwendende Laufwerkgröße in Gigabyte. Falls festgelegt, geben Sie mindestens 30 GB an, um das Worker-Boot-Image und die lokalen Logs zu berücksichtigen.

Bei Batch-Jobs, die Dataflow Shuffle verwenden, legt diese Option die Größe des Bootlaufwerks einer Worker-VM fest. Bei Batch-Jobs, die Dataflow Shuffle nicht verwenden, legt diese Option die Größe der Laufwerke fest, die zum Speichern von Zufallsdaten verwendet werden. Die Größe des Bootlaufwerks ist davon nicht betroffen.

Wenn ein Streamingjob Streaming Engine verwendet, wird mit dieser Option die Größe der Bootlaufwerke festgelegt. Bei Streamingjobs ohne Streaming Engine wird mit dieser Option die Größe jedes zusätzlichen nichtflüchtigen Speichers festgelegt, der vom Dataflow-Dienst erstellt wird. Das Bootlaufwerk ist nicht betroffen. Wenn ein Streamingjob Streaming Engine nicht verwendet, können Sie die Größe des Bootlaufwerks mit dem Experiments-Flag streaming_boot_disk_size_gb festlegen. Geben Sie beispielsweise --experiments=streaming_boot_disk_size_gb=80 an, um Bootlaufwerke mit einer Größe von 80 GB zu erstellen.

Legen Sie als Wert 0 fest, um die in Ihrem Cloud Platform-Projekt definierte Standardgröße zu verwenden.

Wenn ein Batchjob Dataflow Shuffle verwendet, ist die Standardeinstellung 25 GB. Andernfalls beträgt sie 250 GB.

Wenn für einen Streamingjob Streaming Engine verwendet wird, ist die Standardeinstellung 30 GB. Andernfalls beträgt sie 400 GB.

Warnung: Bei einer geringeren Laufwerkgröße wird die verfügbare Shuffle-E/A reduziert. An Shuffle gebundene Jobs, die nicht mit Dataflow Shuffle oder Streaming Engine arbeiten, können sich die Laufzeit und die Jobkosten erhöhen.

service_account_email str Legt ein vom Nutzer verwaltetes Controller-Dienstkonto fest und verwendet dafür das Format my-service-account-name@<project-id>.iam.gserviceaccount.com. Weitere Informationen finden Sie im Abschnitt Controller-Dienstkonto auf der Seite "Cloud Dataflow-Sicherheit und -Berechtigungen". Wenn nicht festgelegt, verwenden Worker das Compute Engine-Dienstkonto Ihres Projekts als Controller-Dienstkonto.
worker_disk_type str Der Typ des zu verwendenden nichtflüchtigen Speichers, angegeben durch die vollständige URL der Ressource für den Laufwerkstyp. Verwenden Sie beispielsweise compute.googleapis.com/projects//zones//diskTypes/pd-ssd, um einen nichtflüchtigen SSD-Speicher anzugeben. Weitere Informationen finden Sie auf der Referenzseite der Compute Engine API für DiskTypes. Der Dataflow-Dienst ermittelt den Standardwert.
machine_type str

Der Compute Engine-Maschinentyp, den Dataflow beim Starten von Worker-VMs verwendet. Sie können jede der verfügbaren Compute Engine-Maschinentypfamilien sowie benutzerdefinierte Maschinentypen verwenden.

Die besten Ergebnisse erzielen Sie mit n1-Maschinentypen. Maschinentypen mit gemeinsam genutztem Kern, beispielsweise Worker der Serien f1 und g1, werden im Rahmen des Service Level Agreements von Dataflow nicht unterstützt.

Beachten Sie, dass Dataflow nach der Anzahl der vCPUs und GB Arbeitsspeicher in Workern abrechnet. Die Abrechnung erfolgt unabhängig von der Maschinentypfamilie.

Wenn Sie diese Option nicht festlegen, wählt der Dataflow-Dienst einen geeigneten Maschinentyp anhand Ihres Jobs aus.

Java: SDK 1.x

PipelineOptions für die lokale Ausführung konfigurieren

Anstatt auf verwalteten Cloudressourcen können Sie Ihre Pipeline auch lokal ausführen. Letzteres bietet gewisse Vorteile für das Testen, Debuggen oder Ausführen Ihrer Pipeline mit kleinen Datensätzen. Bei der lokalen Ausführung sind Sie beispielsweise nicht vom Dataflow-Remote-Dienst und dem zugehörigen Google Cloud-Projekt abhängig.

Bei lokaler Ausführung sollten Sie unbedingt darauf achten, dass die Datasets Ihrer Pipeline so klein sind, dass der lokale Arbeitsspeicher nicht erschöpft wird. Ein kleines speicherinternes Dataset erstellen Sie mit einer Create-Transformation. Alternativ können Sie mit einer Read-Transformation mit kleinen lokalen oder Remote-Dateien arbeiten. Die lokale Ausführung bietet eine schnelle und einfache Möglichkeit, Tests und Fehlerbehebungen mit weniger externen Abhängigkeiten durchzuführen, wird aber durch den Speicher in Ihrer lokalen Umgebung begrenzt.

Der folgende Beispielcode zeigt, wie Sie eine Pipeline erstellen, die in Ihrer lokalen Umgebung ausgeführt wird.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Hinweis: Beim lokalen Modus müssen Sie den Runner nicht festlegen, da der DirectRunner bereits standardmäßig eingestellt ist. Allerdings müssen Sie den DirectRunner entweder explizit als Abhängigkeit einbeziehen oder zum Klassenpfad hinzufügen.

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

Hinweis: Beim lokalen Modus müssen Sie den Runner nicht festlegen, da der DirectRunner bereits standardmäßig eingestellt ist.

Java: SDK 1.x

Nachdem Sie Ihre Pipeline erstellt haben, führen Sie sie aus.

Andere lokale Pipeline-Optionen festlegen

Beim lokalen Ausführen Ihrer Pipeline reichen die Standardwerte für die Attribute in den PipelineOptions üblicherweise aus.

Java: SDK 2.x

Die Standardwerte für PipelineOptions in Java finden Sie in der Java API-Referenz. Ausführliche Informationen können Sie der Auflistung für die Klasse PipelineOptions entnehmen.

Falls Ihre Pipeline Google Cloud-Dienste wie BigQuery oder Cloud Storage für E/A verwendet, müssen Sie eventuell bestimmte Optionen für das Google Cloud-Projekt und die Anmeldedaten festlegen. In diesem Fall sollten Sie mit GcpOptions.setProject Ihre Google Cloud-Projekt-ID angeben. Möglicherweise müssen Sie auch Anmeldedaten explizit festlegen. Ausführliche Informationen entnehmen Sie der Klasse GcpOptions.

Python

Die Standardwerte für PipelineOptions in Python finden Sie in der Python API-Referenz. Ausführliche Informationen können Sie der Auflistung für das Modul PipelineOptions entnehmen.

Falls Ihre Pipeline Google Cloud-Dienste wie BigQuery oder Cloud Storage für E/A verwendet, müssen Sie eventuell bestimmte Optionen für das Google Cloud-Projekt und die Anmeldedaten festlegen. In diesem Fall sollten Sie mit options.view_as(GoogleCloudOptions).project Ihre Google Cloud-Projekt-ID angeben. Möglicherweise müssen Sie auch Anmeldedaten explizit festlegen. Ausführliche Informationen entnehmen Sie der Klasse GoogleCloudOptions.

Java: SDK 1.x