Dieses Dokument bietet einen Überblick über die Pipelinebereitstellung und zeigt einige der Vorgänge an, die Sie für eine bereitgestellte Pipeline ausführen können.
Pipeline ausführen
Nachdem Sie Ihre Apache Beam-Pipeline create und getestet haben, führen Sie Ihre Pipeline aus. Sie können Ihre Pipeline lokal ausführen, um Ihre Apache Beam-Pipeline zu testen und Fehler zu beheben, oder auf Dataflow, einem Datenverarbeitungssystem, das zum Ausführen von Apache Beam-Pipelines verfügbar ist.
Lokal ausführen
Führen Sie Ihre Pipeline lokal aus.
Java
Im folgenden Beispielcode aus der Kurzanleitung wird gezeigt, wie die WordCount-Pipeline lokal ausgeführt wird. Weitere Informationen finden Sie unter Java-Pipeline lokal ausführen.
Führen Sie im Terminal den folgenden Befehl aus:
mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
Python
Im folgenden Beispielcode aus der Kurzanleitung wird gezeigt, wie die WordCount-Pipeline lokal ausgeführt wird. Weitere Informationen finden Sie unter Python-Pipeline lokal ausführen.
Führen Sie im Terminal den folgenden Befehl aus:
python -m apache_beam.examples.wordcount \ --output outputs
Go
Im folgenden Beispielcode aus der Kurzanleitung wird gezeigt, wie die WordCount-Pipeline lokal ausgeführt wird. Weitere Informationen finden Sie unter Go-Pipeline lokal ausführen.
Führen Sie im Terminal den folgenden Befehl aus:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Erfahren Sie, wie Sie Ihre Pipeline lokal mit dem direkten Runner auf Ihrem Computer ausführen.
In Dataflow ausführen
Führen Sie die Pipeline in Dataflow aus.
Java
Der folgende Beispielcode aus der Kurzanleitung zeigt, wie die WordCount-Pipeline in Dataflow ausgeführt wird. Weitere Informationen finden Sie unter Java-Pipeline in Dataflow ausführen.
Führen Sie im Terminal den folgenden Befehl aus (im Verzeichnis word-count-beam
):
mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
Dabei gilt:
PROJECT_ID
: Ihre Google Cloud-Projekt-IDBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsREGION
: eine Dataflow-Region, z. B.us-central1
Python
Der folgende Beispielcode aus der Kurzanleitung zeigt, wie die WordCount-Pipeline in Dataflow ausgeführt wird. Weitere Informationen finden Sie unter Python-Pipeline in Dataflow ausführen.
Führen Sie im Terminal den folgenden Befehl aus:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://STORAGE_BUCKET/tmp/
Ersetzen Sie Folgendes:
DATAFLOW_REGION
: Die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.europe-west1
Das Flag
--region
überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.STORAGE_BUCKET
: Cloud Storage-Name, den Sie zuvor kopiert haben.PROJECT_ID
: Google Cloud-Projekt-ID, die Sie zuvor kopiert haben.
Go
Der folgende Beispielcode aus der Kurzanleitung zeigt, wie die WordCount-Pipeline in Dataflow ausgeführt wird. Weitere Informationen finden Sie unter Go-Pipeline in Dataflow ausführen.
Führen Sie im Terminal den folgenden Befehl aus:
posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner dataflow \ --project PROJECT_ID \ --region DATAFLOW_REGION \ --staging_location gs://STORAGE_BUCKET/binaries/
Dabei gilt:
STORAGE_BUCKET
: der Name des Cloud Storage-BucketsPROJECT_ID
: die Google Cloud-Projekt-ID.DATAFLOW_REGION
: die Region, in der Sie den Dataflow-Job bereitstellen möchten Beispiel:europe-west1
Eine Liste der verfügbaren Dataflow-Standorte finden Sie hier. Beachten Sie, dass das Flag--region
die Standardregion überschreibt, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.
Erfahren Sie, wie Sie Ihre Pipeline im Dataflow-Dienst mit dem Dataflow-Runner ausführen.
Wenn Sie die Pipeline in Dataflow ausführen, wandelt Dataflow den Apache Beam-Pipelinecode in einen Dataflow-Job um. Dataflow verwaltet vollständig Google Cloud-Dienste für Sie, z. . Compute Engine und Cloud Storage, um Ihren Dataflow-Job auszuführen, und startet automatisch fährt die erforderlichen Ressourcen herunter. Weitere Informationen dazu, wie Dataflow Ihren Apache Beam-Code in einen Dataflow-Job in Pipeline-Lebenszyklus umwandelt.
Pipelinevalidierung
Wenn Sie die Pipeline in Dataflow ausführen, führt Dataflow vor dem Start des Jobs Validierungstests für die Pipeline aus. Wenn ein Validierungstest Probleme mit der Pipeline findet, bricht Dataflow die Jobübermittlung vorzeitig ab. In den Joblogs enthält Dataflow Nachrichten mit dem folgenden Text. Jede Nachricht enthält auch Details zu den Ergebnissen der Validierung und eine Anleitung zur Behebung des Problems.
The preflight pipeline validation failed for job JOB_ID.
Welche Validierungstests ausgeführt werden, hängt von den Ressourcen und Diensten ab, die Ihr Dataflow-Job verwendet.
- Wenn die Service Usage API für Ihr Projekt aktiviert ist, wird bei den Pipeline-Validierungstests geprüft, ob die Dienste, die zum Ausführen Ihres Dataflow-Jobs erforderlich sind, aktiviert sind.
- Wenn die Cloud Resource Manager API für Ihr Projekt aktiviert ist, wird mit den Pipelinevalidierungstests geprüft, ob Sie die Konfigurationen auf Projektebene haben, die zum Ausführen Ihres Dataflow-Jobs erforderlich sind.
Weitere Informationen zum Aktivieren von Diensten finden Sie unter Dienste aktivieren und deaktivieren.
Informationen zum Beheben von Berechtigungsproblemen, die bei der Pipeline-Validierung erkannt wurden, finden Sie unter Pipeline-Validierung fehlgeschlagen.
Wenn Sie die Pipelinevalidierung überschreiben und den Job mit Validierungsfehlern starten möchten, verwenden Sie die folgende Pipeline-Dienstoption:
Java
--dataflowServiceOptions=enable_preflight_validation=false
Python
--dataflow_service_options=enable_preflight_validation=false
Go
--dataflow_service_options=enable_preflight_validation=false
Pipelineoptionen festlegen
Sie können bestimmte Aspekte hinsichtlich der Ausführung des Jobs durch Dataflow steuern, indem Sie Pipelineoptionen im Apache Beam-Pipelinecode festlegen. Mit Pipeline-Optionen können Sie beispielsweise festlegen, ob Ihre Pipeline auf virtuellen Worker-Maschinen, im Dataflow-Dienst-Backend oder lokal ausgeführt wird.
Pipelineabhängigkeiten verwalten
Viele Apache Beam-Pipelines können mit den standardmäßigen Dataflow-Laufzeitumgebungen ausgeführt werden. Bei einigen Anwendungsfällen in der Datenverarbeitung ist es jedoch sinnvoll, zusätzliche Bibliotheken oder Klassen zu verwenden. In diesen Fällen müssen Sie möglicherweise Ihre Pipelineabhängigkeiten verwalten. Weitere Informationen zum Verwalten von Abhängigkeiten finden Sie unter Pipelineabhängigkeiten in Dataflow verwalten.
Job überwachen
Dataflow bietet Einblick in Ihre Jobs mit Tools wie der Dataflow-Monitoring-Oberfläche und der Dataflow-Befehlszeile.
Auf Worker-VMs zugreifen
Sie können die VM-Instanzen für eine bestimmte Pipeline mithilfe der Google Cloud Console anzeigen lassen. Von dort aus können Sie SSH verwenden, um auf die Instanzen zuzugreifen. Nachdem Ihr Job entweder abgeschlossen oder fehlgeschlagen ist, fährt der Dataflow-Dienst die VM-Instanzen jedoch automatisch herunter und bereinigt sie.
Joboptimierungen
Neben der Verwaltung von Google Cloud-Ressourcen führt Dataflow viele Aspekte der verteilten parallelen Verarbeitung automatisch für Sie aus und optimiert sie.
Parallelisierung und Verteilung
Dataflow partitioniert die Daten automatisch und verteilt den Worker-Code zur parallelen Verarbeitung auf Compute Engine-Instanzen. Weitere Informationen finden Sie unter Parallelisierung und Verteilung.
Optimierungen zusammenführen und kombinieren
Dataflow erstellt mithilfe des Pipelinecodes eine Ausführungsgrafik mit den PCollection
s der Pipeline. Die Grafik wird außerdem für eine möglichst effiziente Leistung und Ressourcennutzung transformiert und optimiert.
Dataflow optimiert darüber hinaus automatisch potenziell kostspielige Vorgänge wie etwa Datenzusammenfassungen. Weitere Informationen finden Sie unter Zusammenführung optimieren und Kombination optimieren.
Automatische Feinabstimmungsfeatures
Der Dataflow-Dienst umfasst mehrere Funktionen zur sofortigen Anpassung der Ressourcenzuweisung und Datenpartitionierung. Diese Funktionen ermöglichen eine möglichst schnelle und effiziente Ausführung des Jobs durch Dataflow. Unter anderem sind folgende Funktionen verfügbar:
Streaming Engine
Standardmäßig führt der Dataflow-Pipeline-Runner die Schritte der Streamingpipeline vollständig auf Worker-VMs aus und beansprucht CPU-Leistung, Arbeitsspeicher und nichtflüchtigen Speicherplatz auf dem Worker. Streaming Engine von Dataflow verschiebt die Pipelineausführung aus den Worker-VMs in das Dataflow-Dienst-Backend. Weitere Informationen finden Sie unter Streaming Engine.
Flexible Ressourcenplanung für Dataflow
Dataflow FlexRS reduziert die Kosten für die Batchverarbeitung. Dazu werden erweiterte Planungsverfahren, der Dataflow Shuffle-Dienst sowie eine Kombination aus VM-Instanzen auf Abruf und normalen VMs verwendet. Durch die parallele Ausführung von VMs auf Abruf und regulären VMs verbessert Dataflow die Nutzerfreundlichkeit, wenn Compute Engine VM-Instanzen auf Abruf während eines Systemereignisses beendet. FlexRS gewährleistet, dass die Pipeline weiter verarbeitet wird und keine geleistete Arbeit verloren geht, wenn Ihre VMs auf Abruf von Compute Engine vorzeitig beendet werden. Weitere Informationen zu FlexRS finden Sie unter Flexible Ressourcenplanung in Dataflow verwenden.
Dataflow Shielded VM
Ab dem 1. Juni 2022 verwendet der Dataflow-Dienst Shielded VM für alle Worker. Weitere Informationen zu Shielded VM-Funktionen finden Sie unter Shielded VM.