In diesem Dokument wird beschrieben, wie eine Dataflow-Pipeline mit einem benutzerdefinierten Container ausgeführt wird.
Informationen zum Erstellen des Container-Images finden Sie unter Benutzerdefinierte Container-Images für Dataflow erstellen.
Starten Sie beim Ausführen Ihrer Pipeline diese mit dem Apache Beam SDK mit der gleichen Version und Sprachversion wie das SDK auf Ihrem benutzerdefinierten Container-Image. Dadurch werden unerwartete Fehler durch inkompatible Abhängigkeiten oder SDKs vermieden.
Lokal testen
Bevor Sie Ihre Pipeline in Dataflow ausführen, sollten Sie das Container-Image lokal testen. So können Sie schneller testen und Fehler beheben.
Weitere Informationen zur Apache Beam-spezifischen Nutzung finden Sie im Apache Beam-Leitfaden zum Ausführen von Pipelines mit benutzerdefinierten Container-Images.
Einfache Tests mit PortableRunner
Verwenden Sie Apache Beam PortableRunner
, um zu prüfen, ob Remote-Container-Images abgerufen werden können und eine einfache Pipeline ausgeführt werden kann. Wenn Sie PortableRunner
verwenden, erfolgt die Jobeinreichung in der lokalen Umgebung und die Ausführung von DoFn
in der Docker-Umgebung.
Wenn Sie GPUs verwenden, hat der Docker-Container möglicherweise keinen Zugriff auf die GPUs. Verwenden Sie zum Testen Ihres Containers mit GPUs den Direct Runner und befolgen Sie die Schritte zum Testen eines Container-Images auf einer eigenständigen VM mit GPUs im Abschnitt Fehlerbehebung mit einer eigenständigen VM auf der Seite „GPUs verwenden“.
Im Folgenden wird eine Beispielpipeline ausgeführt:
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"
Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Go
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Ersetzen Sie dabei Folgendes:
REGION
: die Region des Jobdiensts in Form von Adresse und Port. Beispiel:localhost:3000
Führen Sie mitembed
einen Prozessjobdienst aus.IMAGE_URI
: der URI des benutzerdefinierten Container-Images.INPUT_FILE
ist eine Eingabedatei, die als Textdatei gelesen werden kann. Auf diese Datei muss vom SDK aus zugegriffen werden können.
Das Container-Image kann entweder als vorab geladenes Container-Image oder als Remote-Datei verwendet werden.OUTPUT_FILE
ist ein Pfad, in den die Ausgabe geschrieben wird. Dieser Pfad ist entweder ein Remote-Pfad oder ein lokaler Pfad im Container.
Prüfen Sie anhand der Konsolenlogs, ob die Pipeline erfolgreich abgeschlossen wurde und das von IMAGE_URI
angegebene Remote-Image verwendet wurde.
Nach der Ausführung der Pipeline befinden sich die im Container gespeicherten Dateien nicht mehr in Ihrem lokalen Dateisystem und der Container wird angehalten. Sie können Dateien mit docker cp
aus dem angehaltenen Containerdateisystem kopieren.
Alternativ:
- Ausgaben an ein Remote-Dateisystem wie Cloud Storage senden. Für Testzwecke ist es möglicherweise erforderlich, den Zugriff manuell zu konfigurieren, einschließlich Anmeldedatendateien oder Standardanmeldedaten für Anwendungen.
- Fügen Sie für eine schnelle Fehlerbehebung ein vorübergehendes Logging hinzu.
Direct Runner verwenden
Für detaillierte lokale Tests des Container-Images und Ihrer Pipeline verwenden Sie DirectRunner von Apache Beam.
Sie können Ihre Pipeline getrennt vom Container prüfen, indem Sie sie in einer lokalen Umgebung testen, die mit dem Container-Image übereinstimmt, oder indem Sie die Pipeline in einem laufenden Container starten.
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Go
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
Ersetzen Sie IMAGE_URI
durch den URI des benutzerdefinierten Container-Images.
In den Beispielen wird davon ausgegangen, dass sich alle Pipeline-Dateien, einschließlich der Pipeline selbst, im benutzerdefinierten Container befinden, von einem lokalen Dateisystem bereitgestellt wurden oder von Apache Beam und dem Container remote zugänglich sind. Wenn Sie beispielsweise das obige Java-Beispiel mit Maven (mvn
) ausführen möchten, müssen Sie Maven und die zugehörigen Abhängigkeiten für den Container bereitstellen. Weitere Informationen finden Sie in der Docker-Dokumentation unter Speicher und docker run
.
Das Ziel der Tests mit dem Direct Runner ist es, Ihre Pipeline in der benutzerdefinierten Container-Umgebung zu testen und nicht, die Ausführung Ihres Containers mit seinem Standard-ENTRYPOINT
zu testen. Ändern Sie den ENTRYPOINT
(z. B. docker run --entrypoint ...
) so, dass er entweder direkt Ihre Pipeline ausführt oder eine manuelle Befehlsausführung im Container zulässt.
Wenn Sie eine bestimmte Konfiguration für die Ausführung in Compute Engine benötigen, können Sie den Container direkt auf einer Compute Engine-VM ausführen. Weitere Informationen finden Sie unter Container in Compute Engine.
Dataflow-Job starten
Geben Sie beim Starten der Apache Beam-Pipeline in Dataflow den Pfad zum Container-Image an. Verwenden Sie das Tag :latest
nicht für Ihre benutzerdefinierten Images. Taggen Sie Ihre Builds mit einem Datum oder einer eindeutigen Kennung. Sollte ein Fehler auftreten, können Sie mit dieser Art von Tag die Pipelineausführung auf eine zuvor bekannte funktionierende Konfiguration zurücksetzen und die Überprüfung von Änderungen ermöglichen.
Java
Verwenden Sie --sdkContainerImage
, um ein SDK-Container-Image für Ihre Java-Laufzeit anzugeben.
Verwenden Sie --experiments=use_runner_v2
, um Runner v2 zu aktivieren.
Python
Wenn Sie die SDK-Version 2.30.0 oder höher verwenden, geben Sie mit der Pipeline-Option --sdk_container_image
ein SDK-Container-Image an.
Verwenden Sie bei älteren Versionen des SDK die Pipeline-Option --worker_harness_container_image
, um den Speicherort des Container-Image anzugeben, das für die Worker-Harness verwendet werden soll.
Benutzerdefinierte Container werden nur für Dataflow Runner v2 unterstützt. Wenn Sie eine Batch-Python-Pipeline starten, geben Sie das Flag --experiments=use_runner_v2
an.
Wenn Sie eine Streaming-Python-Pipeline starten, ist die Angabe des Tests nicht erforderlich, da für Streaming-Python-Pipelines standardmäßig Runner v2 verwendet wird.
Go
Wenn Sie die SDK-Version 2.40.0 oder höher verwenden, geben Sie mit der Pipeline-Option --sdk_container_image
ein SDK-Container-Image an.
Verwenden Sie bei älteren Versionen des SDK die Pipeline-Option --worker_harness_container_image
, um den Speicherort des Container-Image anzugeben, das für die Worker-Harness verwendet werden soll.
Benutzerdefinierte Container werden auf allen Versionen des Go SDK unterstützt, da sie standardmäßig Dataflow Runner v2 verwenden.
Das folgende Beispiel zeigt, wie das Beispiel WordCount
mit einem benutzerdefinierten Container gestartet wird.
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"
Python
Verwenden Sie Apache Beam SDK für Python Version 2.30.0 oder höher:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URI
Go
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URI
Ersetzen Sie dabei Folgendes:
INPUT_FILE
: der Cloud Storage-Eingabepfad, der von Dataflow beim Ausführen des Beispiels gelesen wird.OUTPUT_FILE
: der Cloud Storage-Ausgabepfad, in den die Beispielpipeline geschrieben wird. Diese Datei enthält die Anzahl der Wörter.PROJECT_ID
: die ID Ihres Google Cloud-Projekts.REGION
: Die Region, in der Ihr Dataflow-Job bereitgestellt werden soll.TEMP_LOCATION
: ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Jobdateien verwendet, die während der Pipelineausführung erstellt werden.DISK_SIZE_GB
: Optional. : (Optional) Wenn Ihr Container groß ist, sollten Sie die standardmäßige Bootlaufwerkgröße erhöhen, um zu vermeiden, dass der Speicherplatz aufgebraucht wird.IMAGE_URI
: der URI des benutzerdefinierten SDK-Container-Image. Geben Sie immer ein versioniertes Container-SHA oder -Tag an. Verwenden Sie weder das Tag:latest
noch ein änderbares Tag.