Déployer des pipelines Dataflow

Ce document présente le déploiement de pipeline et met en lumière certaines des opérations que vous pouvez effectuer sur un pipeline déployé.

Exécuter le pipeline

Après avoir créé et testé votre pipeline Apache Beam, exécutez-le. Vous pouvez exécuter votre pipeline localement, ce qui vous permet de tester et de déboguer votre pipeline Apache Beam, ou sur Dataflow, un système de traitement de données disponible pour l'exécution de pipelines Apache Beam.

Exécuter localement

Exécutez votre pipeline en local.

Java

L'exemple de code suivant, tiré du guide de démarrage rapide, montre comment exécuter le pipeline WordCount en local. Pour en savoir plus, découvrez comment exécuter votre pipeline Java en local.

Dans le terminal, exécutez la commande suivante :

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

L'exemple de code suivant, tiré du guide de démarrage rapide, montre comment exécuter le pipeline WordCount en local. Pour en savoir plus, découvrez comment exécuter votre pipeline Python en local.

Dans le terminal, exécutez la commande suivante :

python -m apache_beam.examples.wordcount \ --output outputs

Go

L'exemple de code suivant, tiré du guide de démarrage rapide, montre comment exécuter le pipeline WordCount en local. Pour en savoir plus, découvrez comment exécuter votre pipeline Go en local.

Dans le terminal, exécutez la commande suivante :

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

Découvrez comment exécuter votre pipeline en local, sur votre machine, à l'aide de l'exécuteur direct.

Exécuter sur Dataflow

Exécutez votre pipeline sur Dataflow.

Java

L'exemple de code suivant, tiré du guide de démarrage rapide, montre comment exécuter le pipeline WordCount sur Dataflow. Pour en savoir plus, découvrez comment exécuter votre pipeline Java sur Dataflow.

Dans votre terminal, exécutez la commande suivante (à partir de votre répertoire word-count-beam) :

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet Google Cloud
  • BUCKET_NAME : nom de votre bucket Cloud Storage
  • REGION : région Dataflow (par exemple, us-central1)

Python

L'exemple de code suivant, tiré du guide de démarrage rapide, montre comment exécuter le pipeline WordCount sur Dataflow. Pour en savoir plus, découvrez comment exécuter votre pipeline Python sur Dataflow.

Dans le terminal, exécutez la commande suivante :

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

Remplacez les éléments suivants :

  • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer le job Dataflow (par exemple, europe-west1)

    L'option --region remplace la région par défaut définie dans le serveur de métadonnées, votre client local ou les variables d'environnement.

  • STORAGE_BUCKET : nom Cloud Storage que vous avez copié précédemment
  • PROJECT_ID : ID de projet Google Cloud que vous avez copié précédemment

Go

L'exemple de code suivant, tiré du guide de démarrage rapide, montre comment exécuter le pipeline WordCount sur Dataflow. Pour en savoir plus, découvrez comment exécuter votre pipeline Go sur Dataflow.

Dans le terminal, exécutez la commande suivante :

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

Remplacez les éléments suivants :

  • STORAGE_BUCKET : nom du bucket Cloud Storage.
  • PROJECT_ID : ID de projet Google Cloud.
  • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer le job Dataflow. Exemple :europe-west1 Pour obtenir la liste des emplacements disponibles, voir Emplacements Dataflow. Veuillez noter que l'option --region remplace la région par défaut définie dans le serveur de métadonnées, sur votre client local ou dans les variables d'environnement.

Découvrez comment exécuter votre pipeline sur le service Dataflow en utilisant l'exécuteur Dataflow.

Lorsque vous exécutez votre pipeline sur Dataflow, Dataflow transforme le code de votre pipeline Apache Beam en job Dataflow. Dataflow gère entièrement les services Google Cloud, tels que Compute Engine et Cloud Storage, pour exécuter votre job Dataflow, et lance et supprime automatiquement les ressources nécessaires. Pour en savoir plus sur la manière dont Dataflow transforme le code Apache Beam en job Dataflow, consultez la page Cycle de vie du pipeline.

Validation des pipelines

Lorsque vous exécutez votre pipeline sur Dataflow, avant le lancement d job, Dataflow effectue des tests de validation sur le pipeline. Lorsqu'un test de validation détecte des problèmes liés au pipeline, Dataflow fait échouer l'envoi du job prématurément. Dans les journaux des jobs, Dataflow inclut des messages avec le texte suivant. Chaque message inclut également des informations sur les résultats de la validation et des instructions pour résoudre le problème.

The preflight pipeline validation failed for job JOB_ID.

Les tests de validation exécutés dépendent des ressources et des services utilisés par votre job Dataflow.

  • Si l'API Service Usage est activée pour votre projet, les tests de validation du pipeline vérifient si les services nécessaires à l'exécution de votre job Dataflow sont activés.
  • Si l'API Cloud Resource Manager est activée pour votre projet, les tests de validation du pipeline vérifient si vous disposez des configurations au niveau du projet nécessaires pour exécuter votre job Dataflow.

Pour plus d'informations sur l'activation des services, consultez la section Activer et désactiver des services.

Pour savoir comment résoudre les problèmes d'autorisation détectés lors de la validation du pipeline, consultez la section Échec de la validation du pipeline.

Si vous souhaitez ignorer la validation du pipeline et lancer votre job avec des erreurs de validation, utilisez l'option de service de pipeline suivante :

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Go

--dataflow_service_options=enable_preflight_validation=false

Définir les options de pipeline

Vous pouvez contrôler certains aspects de la manière dont Dataflow exécute votre job en configurant des options de pipeline dans votre code de pipeline Apache Beam. Par exemple, vous pouvez utiliser les options de pipeline pour définir si votre pipeline s'exécute sur des machines virtuelles de calcul, sur le backend du service Dataflow, ou en local.

Gérer les dépendances de pipeline

De nombreux pipelines Apache Beam peuvent s'exécuter à l'aide des environnements d'exécution Dataflow par défaut. Toutefois, certains cas d'utilisation du traitement des données exploitent l'utilisation de bibliothèques ou de classes supplémentaires. Dans ce cas, vous devrez peut-être gérer vos dépendances de pipeline. Pour en savoir plus sur la gestion des dépendances, consultez la page Gérer les dépendances de pipeline dans Dataflow.

Surveiller le job

Dataflow offre une visibilité sur vos jobs grâce à des outils tels que l'interface de surveillance de Dataflow et l'interface de ligne de commande de Dataflow.

Accéder aux VM de nœud de calcul

Vous pouvez afficher les instances de VM d'un pipeline donné à l'aide de Google Cloud Console. Vous pouvez ensuite vous connecter en SSH à chaque instance depuis la console. Sachez toutefois que le service Dataflow s'arrête automatiquement et nettoie les instances de VM dès que votre job se termine ou échoue.

Optimisations des jobs

Outre la gestion des ressources Google Cloud, Dataflow exécute et optimise automatiquement de nombreux aspects du traitement parallèle distribué.

Parallélisation et distribution

Dataflow partitionne automatiquement vos données et distribue votre code de nœud de calcul aux instances Compute Engine pour un traitement parallèle. Pour en savoir plus, consultez la section Parallélisation et distribution.

Fusion et combinaison d'optimisations

Dataflow utilise votre code de pipeline pour créer un graphique d'exécution représentant les classes PCollection et les transformations de votre pipeline, et améliore le graphique afin d'optimiser les performances et l'utilisation des ressources. Le service optimise également automatiquement les opérations potentiellement coûteuses, telles que les agrégations de données. Pour en savoir plus, consultez les sections Optimisation de la fusion et Combiner l'optimisation.

Fonctions de réglage automatique.

Le service Dataflow comprend plusieurs fonctionnalités qui permettent un ajustement à la volée de l'allocation des ressources et du partitionnement des données. Ces fonctionnalités aident Dataflow à exécuter votre job aussi rapidement et aussi efficacement que possible. Ces fonctionnalités incluent les suivantes :

Streaming Engine

Par défaut, l'exécuteur de pipeline Dataflow exécute entièrement les étapes de votre pipeline de traitement par flux sur des machines virtuelles de nœud de calcul et consomme les ressources de processeur, la mémoire et le stockage de disque persistant des nœuds de calcul. La fonctionnalité Streaming Engine de Dataflow transfère l'exécution du pipeline depuis les VM de nœud de calcul vers le backend du service Dataflow. Pour en savoir plus, consultez la page Streaming Engine.

Planification flexible des ressources dans Dataflow

La fonctionnalité FlexRS de Dataflow réduit le coût du traitement par lot en s'appuyant sur des techniques de planification avancées, sur le service Dataflow Shuffle et sur une combinaison d'instances de VM préemptives et de VM standards. Grâce à cette exécution combinée de VM préemptives et de VM standards en parallèle, Dataflow améliore l'expérience utilisateur lorsque Compute Engine arrête des instances de VM préemptives lors d'un événement système. FlexRS contribue à garantir que le pipeline continue de progresser et que vous ne perdez pas le travail déjà réalisé lorsque Compute Engine préempte vos VM préemptives. Pour plus d'informations sur FlexRS, consultez la page Utiliser la planification flexible des ressources dans Dataflow.

VM protégée par Dataflow

Depuis le 1er juin 2022, le service Dataflow utilise des VM protégées pour tous les nœuds de calcul. Pour en savoir plus sur les fonctionnalités des VM protégées, consultez la page VM protégées.