Démarrage rapide avec Python

Cette page vous explique comment configurer votre environnement de développement Python, obtenir le SDK Apache Beam pour Python, et exécuter et modifier un exemple de pipeline.

Pour apprendre à créer et à exécuter un pipeline Apache Beam, vous pouvez également en développer un de manière interactive à l'aide d'un notebook Apache Beam. Si vous avez déjà configuré un projet Google Cloud, l'exemple de pipeline WordCount dans ce guide de démarrage rapide est disponible sous forme d'exemple de notebook.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder à la page de sélection du projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore et Cloud Resource Manager.

    Activer les API

  5. Configurer l'authentification :
    1. Dans Cloud Console, accédez à la page Créer une clé de compte de service.

      Accéder à la page "Créer une clé de compte de service"
    2. Dans la liste Compte de service, sélectionnez Nouveau compte de service.
    3. Dans le champ Nom du compte de service, saisissez un nom.
    4. Dans la liste Rôle, sélectionnez Projet > Propriétaire

    5. Cliquez sur Créer. Un fichier JSON contenant votre clé est téléchargé sur votre ordinateur.
  6. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  7. Créez un bucket Cloud Storage :
    1. Dans Cloud Console, accédez à la page Navigateur Cloud Storage.

      Accéder à la page du navigateur Cloud Storage

    2. Cliquez sur Créer un bucket.
    3. Dans la boîte de dialogue Créer un bucket, spécifiez les attributs suivants :
      • Name : nom de bucket unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
      • Classe de stockage par défaut: Standard
      • Emplacement de stockage des données de bucket.
    4. Cliquez sur Create (Créer).

Configurer votre environnement

  1. Utilisez le SDK Apache Beam pour Python avec pip et Python version 2.7, 3.5, 3.6 ou 3.7. Vérifiez que votre installation de Python et pip fonctionnent correctement en exécutant la commande suivante :
    python --version
    python -m pip --version
    Si vous ne disposez pas de Python, recherchez les étapes d'installation pour votre système d'exploitation sur la page Installer Python.
  2. Dataflow n'est plus compatible avec les pipelines utilisant Python 2. Pour plus d'informations, consultez la page Compatibilité avec Python 2 sur Google Cloud.

  3. Configurez et activez un environnement virtuel Python pour ce guide de démarrage rapide.

    Une fois celui-ci terminé, vous pouvez désactiver l'environnement virtuel en exécutant la commande deactivate.

Pour en savoir plus sur l'utilisation de Python sur Google Cloud, consultez la page Configurer un environnement de développement Python.

Remarque : Pour obtenir de meilleurs résultats, lancez vos pipelines Python 3 avec Apache Beam version 2.16.0 ou une version ultérieure. Le SDK Apache Beam version 2.4.0 est la dernière version compatible avec Python 2 et Python 3.5. Pour obtenir un récapitulatif des améliorations relatives à Python 3 apportées à Apache Beam, reportez-vous à l'outil de suivi des problèmes Apache Beam.

Obtenir le SDK Apache Beam

Le SDK Apache Beam est un modèle de programmation Open Source pour les pipelines de données. Vous définissez ces pipelines avec un programme Apache Beam et pouvez choisir un exécuteur, tel que Dataflow, pour les lancer.

Pour installer la dernière version du SDK Apache Beam pour Python, exécutez la commande suivante depuis un environnement virtuel :

pip install 'apache-beam[gcp]'

Exécuter WordCount localement

L'exemple WordCount illustre un pipeline qui effectue les étapes suivantes :
  1. Utiliser un fichier texte comme entrée
  2. Analyser chaque ligne en mots
  3. Calculer la fréquence des mots en fonction des mots tokenisés.

Exécutez le module wordcount à partir du package apache_beam sur votre machine locale à l'aide de la commande suivante :

python -m apache_beam.examples.wordcount \
  --output outputs
Ce fichier texte se trouve dans un bucket Cloud Storage dont le nom de ressource est gs://dataflow-samples/shakespeare/kinglear.txt. Pour afficher les résultats, exécutez la commande suivante :
more outputs*

Pour quitter le programme, appuyez sur la touche q.

L'exécution locale de votre pipeline vous permet de tester et de déboguer votre programme Apache Beam. Vous pouvez visualiser le code source wordcount.py sur le dépôt GitHub d'Apache Beam.

Exécuter WordCount sur le service Dataflow

Vous pouvez exécuter le module wordcount à partir du package apache_beam sur le service Dataflow en spécifiant DataflowRunner dans le champ runner et en sélectionnant une région où le pipeline sera exécuté.

Commencez par définir vos variables PROJECT, BUCKET et REGION :

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
REGION=DATAFLOW_REGION
Exécutez ce pipeline en exécutant la commande suivante :
python -m apache_beam.examples.wordcount \
  --region $REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/results/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

Afficher vos résultats à l'aide de GCP

Lorsque vous exécutez un pipeline à l'aide de Dataflow, vos résultats sont stockés dans un bucket Cloud Storage.

Vous pouvez utiliser l'outil gsutil pour afficher les résultats à partir de votre terminal.

Répertoriez les fichiers de sortie en exécutant la commande suivante :

gsutil ls -lh "gs://$BUCKET/results/outputs*"  
Affichez les résultats présents dans ces fichiers en exécutant la commande suivante :
gsutil cat "gs://$BUCKET/results/outputs*"

Pour afficher vos résultats à partir de l'interface utilisateur de surveillance, procédez comme suit :
  1. Ouvrez l'interface utilisateur de surveillance Dataflow.
    Accéder à l'interface utilisateur Web de Dataflow

    Votre tâche wordcount présente au début l'état Running (En cours d'exécution), puis l'état Succeeded (Réussie) :

    Tâche WordCount de Cloud Dataflow avec l'état "Succeeded" (Réussie)
  2. Ouvrez le navigateur Cloud Storage dans Google Cloud Console.
    Accéder au navigateur Cloud Storage

    Dans le répertoire wordcount, vous devriez voir les fichiers de sortie créés par votre tâche :

    Répertoire de résultats avec les fichiers de sortie de la tâche WordCount

Modifier le code du pipeline

Le pipeline WordCount dans les exemples précédents fait la distinction entre les mots en majuscules et en minuscules. La procédure suivante montre comment modifier le pipeline WordCount afin qu'il ne soit pas sensible à la casse.
  1. Téléchargez la dernière copie du code WordCount à partir du dépôt GitHub d'Apache Beam.
  2. Exécutez le pipeline sur votre machine locale :
    python wordcount.py --output outputs
  3. Affichez les résultats en exécutant la commande suivante :
    more outputs*
    Pour quitter le programme, appuyez sur la touche q.
  4. Ouvrez le fichier wordcount.py dans l'éditeur de votre choix.
  5. Examinez les étapes du pipeline dans la fonction run. Après l'élément split, les lignes sont divisées en mots sous forme de chaînes.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. Modifiez la ligne après split pour que les chaînes apparaissent en minuscules.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    Cette modification mappe la fonction str.lower sur chaque mot. Cette ligne est équivalente à beam.Map(lambda word: str.lower(word)).
  7. Enregistrez le fichier et exécutez la tâche WordCount modifiée sur votre machine locale :
    python wordcount.py --output outputs
  8. Affichez les résultats du pipeline modifié en exécutant la commande suivante :
    more outputs*
    Pour quitter le programme, appuyez sur la touche q.

Nettoyer

Pour éviter que les ressources utilisées dans ce guide de démarrage rapide soient facturées sur votre compte Google Cloud, procédez comme suit :

  1. Dans Cloud Console, accédez à la page Navigateur Cloud Storage.

    Accéder au navigateur Cloud Storage

  2. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  3. Pour supprimer le bucket, cliquez sur Supprimer .

Étape suivante

Apache Beam™ est une marque commerciale d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.