Exécuter un job Dataflow dans un conteneur personnalisé

Ce document explique comment exécuter un pipeline Dataflow à l'aide d'un conteneur personnalisé.

Pour plus d'informations sur la création de l'image de conteneur, consultez la section Créer des images de conteneurs personnalisées pour Dataflow.

Lors de l'exécution de votre pipeline, lancez le pipeline à l'aide du SDK Apache Beam avec la même version et la même version de langage que le SDK de votre image de conteneur personnalisée. Cela évite les erreurs inattendues provenant de SDK ou de dépendances incompatibles.

Tester en local

Avant d'exécuter votre pipeline dans Dataflow, il est judicieux de tester l'image de conteneur localement, ce qui permet des tests et un débogage plus rapides.

Pour en savoir plus sur l'utilisation spécifique à Apache Beam, consultez le guide Apache Beam Exécuter des pipelines avec des images de conteneurs personnalisées.

Tests de base avec PortableRunner

Pour vérifier que les images de conteneurs distantes peuvent être extraites et qu'elles peuvent exécuter un pipeline simple, utilisez le PortableRunner Apache Beam. Lorsque vous utilisez PortableRunner, le job est envoyé dans l'environnement local, et l'exécution de DoFn s'effectue dans l'environnement Docker.

Lorsque vous utilisez des GPU, le conteneur Docker peut ne pas avoir accès aux GPU. Pour tester votre conteneur avec des GPU, utilisez l'exécuteur direct, puis suivez les étapes permettant de tester une image de conteneur sur une VM autonome avec des GPU dans la section Déboguer à l'aide d'une VM autonome de la page "Utiliser des GPU".

Voici un exemple d'exécution de pipeline :

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Remplacez les éléments suivants :

  • REGION : région du service de job à utiliser, sous la forme d'une adresse et d'un port. Exemple : localhost:3000. Utilisez embed pour exécuter un service de tâche en cours.
  • IMAGE_URI : URI de l'image de conteneur personnalisée.
  • INPUT_FILE : fichier d'entrée pouvant être lu sous forme de fichier texte. L'image de conteneur
    du SDK Harness doit pouvoir accéder à ce fichier. Il peut être soit préchargé sur l'image du conteneur, soit accessible en tant que fichier distant.
  • OUTPUT_FILE : chemin d'accès au fichier dans lequel écrire le résultat. Il peut s'agir d'un chemin distant ou d'un chemin d'accès local sur le conteneur.

Une fois le pipeline terminé, consultez les journaux de la console pour vérifier que le pipeline a bien été exécuté, et que l'image distante spécifiée par IMAGE_URI est bien utilisée.

Après l'exécution du pipeline, les fichiers enregistrés dans le conteneur ne se trouvent pas dans votre système de fichiers local, et le conteneur est arrêté. Vous pouvez copier des fichiers à partir du système de fichiers du conteneur arrêté à l'aide de docker cp.

Vous pouvez également procéder comme suit :

  • Fournir des sorties à un système de fichiers distant tel que Cloud Storage. Vous devrez peut-être configurer manuellement l'accès à des fins de test, y compris pour les fichiers d'identifiants ou les identifiants par défaut de l'application.
  • Pour un débogage rapide, ajoutez une journalisation temporaire.

Utiliser Direct Runner

Pour des tests locaux plus approfondis de l'image de conteneur et de votre pipeline, utilisez Direct Runner d'Apache Beam.

Vous pouvez vérifier votre pipeline séparément du conteneur en effectuant un test dans un environnement local correspondant à l'image du conteneur, ou en lançant le pipeline sur un conteneur en cours d'exécution.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Remplacez IMAGE_URI par l'URI de l'image de conteneur personnalisé.

Cet exemple suppose que tous les fichiers de pipeline (y compris le pipeline lui-même) se trouvent sur le conteneur personnalisé, ont été installés à partir d'un système de fichiers local, ou sont distants et accessibles par Apache Beam et le conteneur. Par exemple, pour utiliser Maven (mvn) afin d'exécuter l'exemple Java précédent, Maven et ses dépendances doivent être déplacés sur le conteneur. Pour en savoir plus, consultez les pages Stockage et docker run dans la documentation de Docker.

L'objectif de test sur Direct Runner consiste à tester votre pipeline dans l'environnement de conteneur personnalisé, et non à exécuter votre conteneur avec son ENTRYPOINT par défaut. Modifiez le fichier ENTRYPOINT (par exemple, docker run --entrypoint ...) pour exécuter directement votre pipeline ou autoriser les commandes manuelles sur le conteneur.

Si vous utilisez une configuration spécifique basée sur l'exécution du conteneur sur Compute Engine, vous pouvez exécuter le conteneur directement sur une VM Compute Engine. Pour en savoir plus, consultez la page Conteneurs sur Compute Engine.

Lancer la tâche Dataflow

Lors du lancement du pipeline Apache Beam sur Dataflow, spécifiez le chemin d'accès à l'image de conteneur. N'utilisez pas le tag :latest avec vos images personnalisées. Ajoutez à vos compilations un tag avec une date ou un identifiant unique. Si un problème survient, l'utilisation de ce type de tag peut permettre de rétablir l'exécution d'un pipeline avec une configuration fonctionnelle connue et d'effectuer une inspection des modifications.

Java

Utilisez le paramètre --sdkContainerImage afin de spécifier une image de conteneur SDK pour votre environnement d'exécution Java.

Utilisez --experiments=use_runner_v2 pour activer l'exécuteur v2.

Python

Si vous utilisez la version 2.30.0 ou ultérieure du SDK, utilisez l'option de pipeline --sdk_container_image pour spécifier une image de conteneur de SDK.

Pour les anciennes versions du SDK, utilisez l'option de pipeline --worker_harness_container_image pour spécifier l'emplacement de l'image de conteneur à utiliser pour le faisceau de nœud de calcul.

Les conteneurs personnalisés ne sont compatibles qu'avec l'application Runner v2. Si vous lancez un pipeline Python par lots, définissez l'option --experiments=use_runner_v2. Si vous lancez un pipeline Python en streaming, la spécification du test n'est pas nécessaire, car les pipelines Python en streaming utilisent Runner v2 par défaut.

Go

Si vous utilisez la version 2.40.0 ou ultérieure du SDK, utilisez l'option de pipeline --sdk_container_image pour spécifier une image de conteneur de SDK.

Pour les anciennes versions du SDK, utilisez l'option de pipeline --worker_harness_container_image pour spécifier l'emplacement de l'image de conteneur à utiliser pour le faisceau de nœud de calcul.

Les conteneurs personnalisés sont compatibles avec toutes les versions du SDK Go, car ils utilisent par défaut l'exécuteur Dataflow v2.

L'exemple suivant montre comment lancer l'exemple de traitement par lots WordCount avec un conteneur personnalisé.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Utilisez le SDK Apache Beam pour Python version 2.30.0 ou ultérieure :

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Remplacez les éléments suivants :

  • INPUT_FILE : chemin d'accès au fichier d'entrée Cloud Storage lu par Dataflow lors de l'exécution de l'exemple.
  • OUTPUT_FILE : chemin d'accès au fichier de sortie Cloud Storage écrit par l'exemple de pipeline. Ce fichier contient le nombre de mots.
  • PROJECT_ID : ID de votre projet Google Cloud.
  • REGION : région dans laquelle déployer votre job Dataflow.
  • TEMP_LOCATION : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.
  • DISK_SIZE_GB : facultatif. Si votre conteneur est volumineux, envisagez d'augmenter la taille du disque de démarrage par défaut afin d'éviter de manquer d'espace disque.
  • IMAGE_URI : URI de l'image de conteneur personnalisé du SDK. Utilisez toujours un SHA ou un tag de conteneur avec versions gérées. N'utilisez pas le tag :latest ni un tag modifiable.