Dataflow-Job in einem benutzerdefinierten Container ausführen

In diesem Dokument wird beschrieben, wie Sie eine Dataflow-Pipeline mit einem benutzerdefinierten Container ausführen.

Informationen zum Erstellen des Container-Images finden Sie unter Benutzerdefinierte Container-Images für Dataflow erstellen.

Starten Sie beim Ausführen Ihrer Pipeline diese mit dem Apache Beam SDK mit der gleichen Version und Sprachversion wie das SDK auf Ihrem benutzerdefinierten Container-Image. Dadurch werden unerwartete Fehler durch inkompatible Abhängigkeiten oder SDKs vermieden.

Lokal testen

Bevor Sie Ihre Pipeline in Dataflow ausführen, sollten Sie das Container-Image lokal testen, um schnellere Tests und Fehlerbehebungen zu ermöglichen.

Weitere Informationen zur Apache Beam-spezifischen Nutzung finden Sie im Apache Beam-Leitfaden zum Ausführen von Pipelines mit benutzerdefinierten Container-Images.

Einfache Tests mit PortableRunner

Verwenden Sie PortableRunner von Apache Beam, um zu prüfen, ob Remote-Container-Images abgerufen werden können und eine einfache Pipeline ausführen können. Bei Verwendung von PortableRunner erfolgt die Jobübermittlung in der lokalen Umgebung und die DoFn-Ausführung in der Docker-Umgebung.

Wenn Sie GPUs verwenden, hat der Docker-Container möglicherweise keinen Zugriff auf die GPUs. Verwenden Sie zum Testen Ihres Containers mit GPUs den Direct Runner und befolgen Sie die Schritte zum Testen eines Container-Images auf einer eigenständigen VM mit GPUs im Abschnitt Fehlerbehebung mit einer eigenständigen VM auf der Seite „GPUs verwenden“.

Im Folgenden wird eine Beispielpipeline ausgeführt:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Einfach loslegen (Go)

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Ersetzen Sie Folgendes:

  • REGION: die zu verwendende Jobdienstregion in Form von Adresse und Port. Beispiel: localhost:3000 Führen Sie mit embed einen Prozessjobdienst aus.
  • IMAGE_URI: der URI des benutzerdefinierten Container-Images.
  • INPUT_FILE ist eine Eingabedatei, die als Textdatei gelesen werden kann. Auf diese Datei muss vom SDK aus zugegriffen werden können.
    Das Container-Image kann entweder als vorab geladenes Container-Image oder als Remote-Datei verwendet werden.
  • OUTPUT_FILE ist ein Pfad, in den die Ausgabe geschrieben wird. Dieser Pfad ist entweder ein Remote-Pfad oder ein lokaler Pfad im Container.

Wenn die Pipeline erfolgreich abgeschlossen wurde, prüfen Sie die Logs der Console, um festzustellen, ob die Pipeline erfolgreich abgeschlossen wurde und das von IMAGE_URI angegebene Remote-Image verwendet wird.

Nach dem Ausführen der Pipeline befinden sich die im Container gespeicherten Dateien nicht in Ihrem lokalen Dateisystem und der Container wird angehalten. Mit docker cp können Sie Dateien aus dem angehaltenen Containerdateisystem kopieren.

Alternativ:

  • Ausgaben an ein Remote-Dateisystem wie Cloud Storage senden. Sie müssen den Zugriff möglicherweise zu Testzwecken manuell konfigurieren, einschließlich Anmeldedatendateien oder Standardanmeldedaten für Anwendungen.
  • Fügen Sie für ein schnelles Debugging ein vorübergehendes Logging hinzu.

Direct Runner verwenden

Für detaillierte lokale Tests des Container-Images und Ihrer Pipeline verwenden Sie DirectRunner von Apache Beam.

Sie können Ihre Pipeline getrennt vom Container prüfen, indem Sie sie in einer lokalen Umgebung testen, die mit dem Container-Image übereinstimmt, oder indem Sie die Pipeline in einem laufenden Container starten.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Einfach loslegen (Go)

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Ersetzen Sie IMAGE_URI durch den URI des benutzerdefinierten Container-Images.

In den Beispielen wird davon ausgegangen, dass sich alle Pipeline-Dateien, einschließlich der Pipeline selbst, im benutzerdefinierten Container befinden, von einem lokalen Dateisystem bereitgestellt wurden oder von Apache Beam und dem Container remote zugänglich sind. Wenn Sie beispielsweise das vorherige Java-Beispiel mit Maven (mvn) ausführen möchten, müssen Maven und seine Abhängigkeiten im Container bereitgestellt werden. Weitere Informationen finden Sie in der Docker-Dokumentation unter Storage und docker run.

Ziel des Tests mit dem Direct Runner ist es, die Pipeline in der benutzerdefinierten Containerumgebung zu testen, nicht den Standard-ENTRYPOINT auszuführen. Ändern Sie den ENTRYPOINT (z. B. docker run --entrypoint ...) so, dass er entweder direkt Ihre Pipeline ausführt oder eine manuelle Befehlsausführung im Container zulässt.

Wenn Sie eine bestimmte Konfiguration verwenden, die auf der Ausführung des Containers in Compute Engine basiert, können Sie den Container direkt auf einer Compute Engine-VM ausführen. Weitere Informationen finden Sie unter Container in Compute Engine.

Dataflow-Job starten

Geben Sie beim Starten der Apache Beam-Pipeline in Dataflow den Pfad zum Container-Image an. Verwenden Sie das Tag :latest nicht mit Ihren benutzerdefinierten Images. Taggen Sie Ihre Builds mit einem Datum oder einer eindeutigen ID. Bei Problemen kann die Verwendung dieser Art von Tags die Wiederherstellung der Pipelineausführung auf eine zuvor bekannte Arbeitskonfiguration ermöglichen und eine Prüfung der Änderungen ermöglichen.

Java

Verwenden Sie --sdkContainerImage, um ein SDK-Container-Image für Ihre Java-Laufzeit anzugeben.

Verwenden Sie --experiments=use_runner_v2, um Runner v2 zu aktivieren.

Python

Wenn Sie die SDK-Version 2.30.0 oder höher verwenden, geben Sie mit der Pipeline-Option --sdk_container_image ein SDK-Container-Image an.

Verwenden Sie bei älteren Versionen des SDK die Pipeline-Option --worker_harness_container_image, um den Speicherort des Container-Image anzugeben, das für die Worker-Harness verwendet werden soll.

Benutzerdefinierte Container werden nur für Dataflow Runner v2 unterstützt. Wenn Sie eine Batch-Python-Pipeline starten, geben Sie das Flag --experiments=use_runner_v2 an. Wenn Sie eine Streaming-Python-Pipeline starten, ist die Angabe des Tests nicht erforderlich, da für Streaming-Python-Pipelines standardmäßig Runner v2 verwendet wird.

Einfach loslegen (Go)

Wenn Sie die SDK-Version 2.40.0 oder höher verwenden, geben Sie mit der Pipeline-Option --sdk_container_image ein SDK-Container-Image an.

Verwenden Sie bei älteren Versionen des SDK die Pipeline-Option --worker_harness_container_image, um den Speicherort des Container-Image anzugeben, das für die Worker-Harness verwendet werden soll.

Benutzerdefinierte Container werden auf allen Versionen des Go SDK unterstützt, da sie standardmäßig Dataflow Runner v2 verwenden.

Das folgende Beispiel zeigt, wie das Batch-Beispiel WordCount mit einem benutzerdefinierten Container gestartet wird.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Verwenden Sie Apache Beam SDK für Python Version 2.30.0 oder höher:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Einfach loslegen (Go)

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Ersetzen Sie Folgendes:

  • INPUT_FILE: der Cloud Storage-Eingabepfad, der von Dataflow beim Ausführen des Beispiels gelesen wird.
  • OUTPUT_FILE: der Cloud Storage-Ausgabepfad, in den die Beispielpipeline geschrieben wird. Diese Datei enthält die Anzahl der Wörter.
  • PROJECT_ID: die ID Ihres Google Cloud-Projekts.
  • REGION ist die Region, in der der Dataflow-Job bereitgestellt wird.
  • TEMP_LOCATION: ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Jobdateien verwendet, die während der Pipelineausführung erstellt werden.
  • DISK_SIZE_GB: Optional. : (Optional) Wenn Ihr Container groß ist, sollten Sie die standardmäßige Bootlaufwerkgröße erhöhen, um zu vermeiden, dass der Speicherplatz aufgebraucht wird.
  • IMAGE_URI: der URI des benutzerdefinierten SDK-Container-Image. Verwenden Sie immer ein SHA oder ein Tag mit Versionsverwaltung. Verwenden Sie nicht das Tag :latest oder ein änderbares Tag.