Créer un pipeline Dataflow à l'aide de Python

Dans ce guide de démarrage rapide, vous allez apprendre à utiliser le SDK Apache Beam pour Python afin de créer un programme qui définit un pipeline. Vous exécutez ensuite le pipeline à l'aide d'un exécuteur local direct ou basé sur le cloud, tel que Dataflow. Pour une présentation du pipeline WordCount, regardez la vidéo How to use WordCount in Apache Beam (Comment utiliser WordCount dans Apache Beam).


Pour obtenir des conseils détaillés sur cette tâche directement dans la console Google Cloud, cliquez sur Visite guidée :

Visite guidée


Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  6. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore et Cloud Resource Manager :

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  8. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  9. Installez Google Cloud CLI.
  10. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  11. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  12. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  13. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore et Cloud Resource Manager :

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  15. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  16. Attribuez des rôles à votre compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Remplacez PROJECT_ID par l'ID du projet.
    • Remplacez PROJECT_NUMBER par votre numéro de projet. Pour trouver votre numéro de projet, consultez la section Identifier des projets ou utilisez la commande gcloud projects describe.
    • Remplacez SERVICE_ACCOUNT_ROLE par chaque rôle individuel.
  17. Créez un bucket Cloud Storage et configurez-le comme suit :
    • Définissez la classe de stockage sur S (Standard).
    • Définissez l'emplacement de stockage sur : US (États-Unis).
    • Remplacez BUCKET_NAME par un nom de bucket unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Copiez l'ID du projet Google Cloud et le nom du bucket Cloud Storage. Vous aurez besoin de ces valeurs plus loin dans ce document.

Configurer votre environnement

Dans cette section, utilisez l'invite de commande pour configurer un environnement virtuel Python isolé permettant d'exécuter votre projet de pipeline à l'aide de venv. Ce processus vous permet d'isoler les dépendances d'un projet des dépendances d'autres projets.

Si vous n'avez pas d'invite de commande à votre disposition, vous pouvez utiliser Cloud Shell. Le gestionnaire de packages pour Python 3 est déjà installé dans Cloud Shell, ce qui vous permet d'ignorer la création d'un environnement virtuel.

Pour installer Python, puis créer un environnement virtuel, procédez comme suit :

  1. Vérifiez que Python 3 et pip sont en cours d'exécution sur votre système :
    python --version
    python -m pip --version
    
  2. Si nécessaire, installez Python 3, puis configurez un environnement virtuel Python : suivez les instructions fournies dans les sections Installer Python et Configurer venv de la page Configurer un environnement de développement Python. Si vous utilisez Python 3.10 ou une version ultérieure, vous devez également activer l'exécuteur Dataflow v2. Pour utiliser l'exécuteur v1, utilisez Python 3.9 ou une version antérieure.

Une fois celui-ci terminé, vous pouvez désactiver l'environnement virtuel en exécutant la commande deactivate.

Obtenir le SDK Apache Beam

Le SDK Apache Beam est un modèle de programmation Open Source pour les pipelines de données. Vous définissez un pipeline avec un programme Apache Beam, puis l'exécutez à l'aide d'un exécuteur, tel que Dataflow.

Pour télécharger et installer le SDK Apache Beam, procédez comme suit :

  1. Vérifiez que vous êtes dans l'environnement virtuel Python que vous avez créé dans la section précédente. Assurez-vous que l'invite commence par <env_name>, où env_name est le nom de l'environnement virtuel.
  2. Installez la norme du package Python wheel :
    pip install wheel
    
  3. Installez la dernière version du SDK Apache Beam pour Python :
  4. pip install 'apache-beam[gcp]'

    Sous Microsoft Windows, exécutez la commande suivante :

    pip install apache-beam[gcp]

    En fonction de la connexion, l'installation peut prendre un certain temps.

Exécuter le pipeline en local

Pour voir comment un pipeline s'exécute localement, utilisez un module Python prêt à l'emploi wordcount, qui est inclus dans le package apache_beam.

L'exemple de pipeline wordcount effectue les opérations suivantes :

  1. Utiliser un fichier texte comme entrée

    Ce fichier texte se trouve dans un bucket Cloud Storage dont le nom de ressource est gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Analyser chaque ligne en mots
  3. Calculer la fréquence des mots en fonction des mots tokenisé.

Pour mettre le pipeline wordcount en local, procédez comme suit :

  1. Depuis votre terminal local, exécutez l'exemple wordcount :
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Affichez le résultat du pipeline :
    more outputs*
  3. Pour quitter, appuyez sur Q.
L'exécution locale du pipeline vous permet de tester et de déboguer votre programme Apache Beam. Vous pouvez visualiser le code source wordcount.py sur le dépôt GitHub d'Apache Beam.

Exécuter le pipeline sur le service Dataflow

Dans cette section, exécutez l'exemple de pipeline wordcount du package apache_beam sur le service Dataflow. Cet exemple spécifie DataflowRunner comme paramètre pour --runner.
  • Exécutez le pipeline :
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Remplacez les éléments suivants :

    • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer le job Dataflow (par exemple, europe-west1)

      L'option --region remplace la région par défaut définie dans le serveur de métadonnées, votre client local ou les variables d'environnement.

    • BUCKET_NAME : nom du bucket Cloud Storage que vous avez copié précédemment
    • PROJECT_ID : ID de projet Google Cloud que vous avez copié précédemment

Afficher les résultats

Lorsque vous exécutez un pipeline à l'aide de Dataflow, vos résultats sont stockés dans un bucket Cloud Storage. Dans cette section, vérifiez que le pipeline est en cours d'exécution à l'aide de la console Google Cloud ou du terminal local.

console Google Cloud

Pour afficher vos résultats dans la console Google Cloud, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Tâches de Dataflow.

    Accéder aux tâches

    La page Tâches affiche les détails de votre tâche wordcount, y compris l'état En cours d'exécution, puis Réussite.

  2. Accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  3. Dans la liste des buckets de votre projet, cliquez sur le bucket de stockage que vous avez créé précédemment.

    Dans le répertoire wordcount, les fichiers de sortie créés par votre tâche sont affichés.

Terminal local

Affichez les résultats à partir de votre terminal ou à l'aide de Cloud Shell.

  1. Pour répertorier les fichiers de sortie, utilisez la commande gcloud storage ls :
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. Remplacez BUCKET_NAME par le nom du bucket Cloud Storage utilisé dans le programme de pipeline.

  3. Pour afficher les résultats dans les fichiers de sortie, utilisez la commande gcloud storage cat :
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Modifier le code du pipeline

Le pipeline wordcount dans les exemples précédents fait la distinction entre les mots en majuscules et en minuscules. Les étapes suivantes montrent comment effectuer des modifications afin que le pipeline wordcount ne soit pas sensible à la casse.
  1. Sur votre ordinateur local, téléchargez la dernière copie du code wordcount à partir du dépôt GitHub d'Apache Beam.
  2. Depuis le terminal local, exécutez le pipeline :
    python wordcount.py --output outputs
  3. Afficher les résultats :
    more outputs*
  4. Pour quitter, appuyez sur Q.
  5. Dans l'éditeur de votre choix, ouvrez le fichier wordcount.py.
  6. Dans la fonction run, examinez les étapes du pipeline :
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Après l'élément split, les lignes sont divisées en mots sous forme de chaînes.

  7. Pour mettre les chaînes en minuscules, modifiez la ligne après split :
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Cette modification mappe la fonction str.lower sur chaque mot. Cette ligne est équivalente à beam.Map(lambda word: str.lower(word)).
  8. Enregistrez le fichier et exécutez la tâche wordcount modifiée :
    python wordcount.py --output outputs
  9. Affichez les résultats du pipeline modifié :
    more outputs*
  10. Pour quitter, appuyez sur Q.
  11. Exécuter le pipeline modifié sur le service Dataflow :
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Remplacez les éléments suivants :

    • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer le job Dataflow
    • BUCKET_NAME : nom de votre bucket Cloud Storage.
    • PROJECT_ID : ID de votre projet Google Cloud.

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.

  1. Dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  2. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  3. Pour supprimer le bucket, cliquez sur Supprimer , puis suivez les instructions.
  4. Si vous conservez votre projet, révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.

    gcloud auth application-default revoke
  6. Facultatif : Révoquez les identifiants de la CLI gcloud.

    gcloud auth revoke

Étapes suivantes