Dataflow-Pipelines bereitstellen

Dieses Dokument bietet einen Überblick über die Pipelinebereitstellung und zeigt einige der Vorgänge an, die Sie für eine bereitgestellte Pipeline ausführen können.

Pipeline ausführen

Nachdem Sie die Apache Beam-Pipeline erstellt und testet haben, führen Sie die Pipeline aus. Sie können Ihre Pipeline lokal ausführen, um Ihre Apache Beam-Pipeline zu testen und Fehler zu beheben, oder auf Dataflow, einem Datenverarbeitungssystem, das zum Ausführen von Apache Beam-Pipelines verfügbar ist.

Lokal ausführen

Führen Sie Ihre Pipeline lokal aus.

Java

Im folgenden Beispielcode aus der Kurzanleitung wird gezeigt, wie die WordCount-Pipeline lokal ausgeführt wird. Weitere Informationen finden Sie unter Java-Pipeline lokal ausführen.

Führen Sie im Terminal den folgenden Befehl aus:

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

Im folgenden Beispielcode aus der Kurzanleitung wird gezeigt, wie die WordCount-Pipeline lokal ausgeführt wird. Weitere Informationen finden Sie unter Python-Pipeline lokal ausführen.

Führen Sie im Terminal den folgenden Befehl aus:

python -m apache_beam.examples.wordcount \ --output outputs

Einfach loslegen (Go)

Im folgenden Beispielcode aus der Kurzanleitung wird gezeigt, wie die WordCount-Pipeline lokal ausgeführt wird. Weitere Informationen finden Sie unter Go-Pipeline lokal ausführen.

Führen Sie im Terminal den folgenden Befehl aus:

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

Erfahren Sie, wie Sie Ihre Pipeline lokal mit dem direkten Runner auf Ihrem Computer ausführen.

In Dataflow ausführen

Führen Sie die Pipeline in Dataflow aus.

Java

Der folgende Beispielcode aus der Kurzanleitung zeigt, wie die WordCount-Pipeline in Dataflow ausgeführt wird. Weitere Informationen finden Sie unter Java-Pipeline in Dataflow ausführen.

Führen Sie im Terminal den folgenden Befehl aus (im Verzeichnis word-count-beam):

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

Dabei gilt:

  • PROJECT_ID: Ihre Google Cloud-Projekt-ID
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • REGION: eine Dataflow-Region, z. B. us-central1

Python

Der folgende Beispielcode aus der Kurzanleitung zeigt, wie die WordCount-Pipeline in Dataflow ausgeführt wird. Weitere Informationen finden Sie unter Python-Pipeline in Dataflow ausführen.

Führen Sie im Terminal den folgenden Befehl aus:

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

Ersetzen Sie Folgendes:

  • DATAFLOW_REGION: die Region, in der Sie den Dataflow-Job bereitstellen möchten, z. B. europe-west1

    Das Flag --region überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

  • STORAGE_BUCKET: Cloud Storage-Name, den Sie zuvor kopiert haben.
  • PROJECT_ID: Google Cloud-Projekt-ID, die Sie zuvor kopiert haben.

Einfach loslegen (Go)

Der folgende Beispielcode aus der Kurzanleitung zeigt, wie die WordCount-Pipeline in Dataflow ausgeführt wird. Weitere Informationen finden Sie unter Go-Pipeline in Dataflow ausführen.

Führen Sie im Terminal den folgenden Befehl aus:

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

Dabei gilt:

  • STORAGE_BUCKET: der Name des Cloud Storage-Buckets
  • PROJECT_ID: die Google Cloud-Projekt-ID.
  • DATAFLOW_REGION: die Region, in der Sie den Dataflow-Job bereitstellen möchten. Beispiel: europe-west1 Eine Liste der verfügbaren Dataflow-Standorte finden Sie hier. Beachten Sie, dass das Flag --region die Standardregion überschreibt, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

Erfahren Sie, wie Sie Ihre Pipeline im Dataflow-Dienst mit dem Dataflow-Runner ausführen.

Wenn Sie die Pipeline in Dataflow ausführen, wandelt Dataflow den Apache Beam-Pipelinecode in einen Dataflow-Job um. Dataflow verwaltet vollständig Google Cloud-Dienste für Sie, z.  . Compute Engine und Cloud Storage, um Ihren Dataflow-Job auszuführen, und startet automatisch fährt die erforderlichen Ressourcen herunter. Weitere Informationen dazu, wie Dataflow Ihren Apache Beam-Code in einen Dataflow-Job in Pipeline-Lebenszyklus umwandelt.

Pipelinevalidierung

Wenn Sie Ihre Pipeline in Dataflow ausführen, führt Dataflow vor dem Start des Jobs Validierungstests für die Pipeline aus. Wenn ein Validierungstest Probleme mit der Pipeline feststellt, schlägt Dataflow das Senden des Jobs frühzeitig fehl. In den Joblogs enthält Dataflow Nachrichten mit dem folgenden Text. Jede Nachricht enthält auch Details zu den Validierungsergebnissen und eine Anleitung zur Behebung des Problems.

The preflight pipeline validation failed for job JOB_ID.

Welche Validierungstests ausgeführt werden, hängt von den Ressourcen und Diensten ab, die von Ihrem Dataflow-Job verwendet werden.

  • Wenn die Service Usage API für Ihr Projekt aktiviert ist, prüfen die Pipeline-Validierungstests, ob die für die Ausführung Ihres Dataflow-Jobs erforderlichen Dienste aktiviert sind.
  • Wenn die Cloud Resource Manager API für Ihr Projekt aktiviert ist, prüfen die Pipelinevalidierungstests, ob Sie die Konfigurationen auf Projektebene haben, die zum Ausführen des Dataflow-Jobs erforderlich sind.

Weitere Informationen zum Aktivieren von Diensten finden Sie unter Dienste aktivieren und deaktivieren.

Informationen zum Beheben von Berechtigungsproblemen, die während der Pipelinevalidierung erkannt wurden, finden Sie unter Pipelinevalidierung fehlgeschlagen.

Wenn Sie die Pipelinevalidierung überschreiben und den Job mit Validierungsfehlern starten möchten, verwenden Sie die folgende Dienstoption für die Pipeline:

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Einfach loslegen (Go)

--dataflow_service_options=enable_preflight_validation=false

Pipelineoptionen festlegen

Sie können bestimmte Aspekte hinsichtlich der Ausführung des Jobs durch Dataflow steuern, indem Sie Pipelineoptionen im Apache Beam-Pipelinecode festlegen. Mit Pipeline-Optionen können Sie beispielsweise festlegen, ob Ihre Pipeline auf virtuellen Worker-Maschinen, im Dataflow-Dienst-Backend oder lokal ausgeführt wird.

Pipelineabhängigkeiten verwalten

Viele Apache Beam-Pipelines können unter Verwendung der Dataflow-Standardlaufzeitumgebungen ausgeführt werden. Einige Anwendungsfälle für die Datenverarbeitung profitieren jedoch von zusätzlichen Bibliotheken oder Klassen. In diesen Fällen müssen Sie möglicherweise Ihre Pipelineabhängigkeiten verwalten. Weitere Informationen zum Verwalten von Abhängigkeiten finden Sie unter Pipelineabhängigkeiten in Dataflow verwalten.

Job überwachen

Dataflow bietet Einblick in Ihre Jobs mit Tools wie der Dataflow-Monitoring-Oberfläche und der Dataflow-Befehlszeile.

Auf Worker-VMs zugreifen

Sie können die VM-Instanzen für eine bestimmte Pipeline mithilfe der Google Cloud Console anzeigen lassen. Von dort aus können Sie SSH verwenden, um auf die Instanzen zuzugreifen. Nachdem Ihr Job entweder abgeschlossen oder fehlgeschlagen ist, fährt der Dataflow-Dienst die VM-Instanzen jedoch automatisch herunter und bereinigt sie.

Joboptimierungen

Neben der Verwaltung von Google Cloud-Ressourcen führt Dataflow viele Aspekte der verteilten parallelen Verarbeitung automatisch für Sie aus und optimiert sie.

Parallelisierung und Verteilung

Dataflow partitioniert die Daten automatisch und verteilt den Worker-Code zur parallelen Verarbeitung auf Compute Engine-Instanzen. Weitere Informationen finden Sie unter Parallelisierung und Verteilung.

Optimierungen zusammenführen und kombinieren

Dataflow erstellt mithilfe des Pipelinecodes eine Ausführungsgrafik mit den PCollections der Pipeline. Die Grafik wird außerdem für eine möglichst effiziente Leistung und Ressourcennutzung transformiert und optimiert. Dataflow optimiert darüber hinaus automatisch potenziell kostspielige Vorgänge wie etwa Datenzusammenfassungen. Weitere Informationen finden Sie unter Zusammenführung optimieren und Kombination optimieren.

Automatische Feinabstimmungsfeatures

Der Dataflow-Dienst umfasst mehrere Funktionen zur sofortigen Anpassung der Ressourcenzuweisung und Datenpartitionierung. Diese Funktionen ermöglichen eine möglichst schnelle und effiziente Ausführung des Jobs durch Dataflow. Unter anderem sind folgende Funktionen verfügbar:

Streaming Engine

Standardmäßig führt der Dataflow-Pipeline-Runner die Schritte der Streamingpipeline vollständig auf Worker-VMs aus und beansprucht CPU-Leistung, Arbeitsspeicher und nichtflüchtigen Speicherplatz auf dem Worker. Streaming Engine von Dataflow verschiebt die Pipelineausführung aus den Worker-VMs in das Dataflow-Dienst-Backend. Weitere Informationen finden Sie unter Streaming Engine.

Flexible Ressourcenplanung für Dataflow

Dataflow FlexRS reduziert die Kosten für die Batchverarbeitung. Dazu werden erweiterte Planungsverfahren, der Dataflow Shuffle-Dienst sowie eine Kombination aus VM-Instanzen auf Abruf und normalen VMs verwendet. Durch die parallele Ausführung von VMs auf Abruf und regulären VMs verbessert Dataflow die Nutzerfreundlichkeit, wenn Compute Engine VM-Instanzen auf Abruf während eines Systemereignisses beendet. FlexRS gewährleistet, dass die Pipeline weiter verarbeitet wird und keine geleistete Arbeit verloren geht, wenn Ihre VMs auf Abruf von Compute Engine vorzeitig beendet werden. Weitere Informationen zu FlexRS finden Sie unter Flexible Ressourcenplanung in Dataflow verwenden.

Dataflow Shielded VM

Ab dem 1. Juni 2022 verwendet der Dataflow-Dienst Shielded VM für alle Worker. Weitere Informationen zu Shielded VM-Funktionen finden Sie unter Shielded VM.