Cette page vous explique comment configurer votre environnement de développement Python, obtenir le SDK Apache Beam pour Python, et exécuter et modifier un exemple de pipeline.
Pour apprendre à créer et à exécuter un pipeline Apache Beam, vous pouvez également en développer un de manière interactive à l'aide d'un notebook Apache Beam. Si vous avez déjà configuré un projet Google Cloud, l'exemple de pipeline WordCount dans ce guide de démarrage rapide est disponible sous forme d'exemple de notebook.
Avant de commencer
- Connectez-vous à votre compte Google.
Si vous n'en possédez pas déjà un, vous devez en créer un.
-
Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.
-
Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.
- Activer les API Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore et Cloud Resource Manager.
- Configurer l'authentification :
-
Dans Cloud Console, accédez à la page Créer une clé de compte de service.
Accéder à la page "Créer une clé de compte de service" - Dans la liste Compte de service, sélectionnez Nouveau compte de service.
- Dans le champ Nom du compte de service, saisissez un nom.
Dans la liste Rôle, sélectionnez Projet > Propriétaire
- Cliquez sur Créer. Un fichier JSON contenant votre clé est téléchargé sur votre ordinateur.
-
-
Définissez la variable d'environnement
GOOGLE_APPLICATION_CREDENTIALS
pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir. - Créez un bucket Cloud Storage :
- Dans Cloud Console, accédez à la page Navigateur Cloud Storage.
- Cliquez sur Créer un bucket.
- Dans la boîte de dialogue Créer un bucket, spécifiez les attributs suivants :
- Name : nom de bucket unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
- Classe de stockage par défaut: Standard
- Emplacement de stockage des données de bucket.
- Cliquez sur Create (Créer).
Configurer votre environnement
-
Utilisez le SDK Apache Beam pour Python avec
pip
et Python version 2.7, 3.5, 3.6 ou 3.7. Vérifiez que votre installation de Python etpip
fonctionnent correctement en exécutant la commande suivante :python --version python -m pip --version
Si vous ne disposez pas de Python, recherchez les étapes d'installation pour votre système d'exploitation sur la page Installer Python. - Configurez et activez un environnement virtuel Python pour ce guide de démarrage rapide.
Une fois celui-ci terminé, vous pouvez désactiver l'environnement virtuel en exécutant la commande
deactivate
.
Dataflow n'est plus compatible avec les pipelines utilisant Python 2. Pour plus d'informations, consultez la page Compatibilité avec Python 2 sur Google Cloud.
Remarque : Pour obtenir de meilleurs résultats, lancez vos pipelines Python 3 avec Apache Beam version 2.16.0 ou une version ultérieure. Le SDK Apache Beam version 2.4.0 est la dernière version compatible avec Python 2 et Python 3.5. Pour obtenir un récapitulatif des améliorations relatives à Python 3 apportées à Apache Beam, reportez-vous à l'outil de suivi des problèmes Apache Beam.
Obtenir le SDK Apache Beam
Le SDK Apache Beam est un modèle de programmation Open Source pour les pipelines de données. Vous définissez ces pipelines avec un programme Apache Beam et pouvez choisir un exécuteur, tel que Dataflow, pour les lancer.Pour installer la dernière version du SDK Apache Beam pour Python, exécutez la commande suivante depuis un environnement virtuel :
pip install 'apache-beam[gcp]'
Exécuter WordCount localement
L'exemple WordCount illustre un pipeline qui effectue les étapes suivantes :- Utiliser un fichier texte comme entrée
- Analyser chaque ligne en mots
- Calculer la fréquence des mots en fonction des mots tokenisés.
Exécutez le module wordcount
à partir du package apache_beam
sur votre machine locale à l'aide de la commande suivante :
python -m apache_beam.examples.wordcount \ --output outputsCe fichier texte se trouve dans un bucket Cloud Storage dont le nom de ressource est
gs://dataflow-samples/shakespeare/kinglear.txt
.
Pour afficher les résultats, exécutez la commande suivante :
more outputs*
Pour quitter le programme, appuyez sur la touche q.
L'exécution locale de votre pipeline vous permet de tester et de déboguer votre programme Apache Beam. Vous pouvez visualiser le code sourcewordcount.py
sur le dépôt GitHub d'Apache Beam.
Exécuter WordCount sur le service Dataflow
Vous pouvez exécuter le modulewordcount
à partir du package apache_beam
sur le service Dataflow en spécifiant DataflowRunner
dans le champ runner
et en sélectionnant une région où le pipeline sera exécuté.Commencez par définir vos variables PROJECT, BUCKET et REGION :
PROJECT=PROJECT_ID BUCKET=GCS_BUCKET REGION=DATAFLOW_REGIONExécutez ce pipeline en exécutant la commande suivante :
python -m apache_beam.examples.wordcount \ --region $REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://$BUCKET/results/outputs \ --runner DataflowRunner \ --project $PROJECT \ --temp_location gs://$BUCKET/tmp/
Afficher vos résultats à l'aide de GCP
Lorsque vous exécutez un pipeline à l'aide de Dataflow, vos résultats sont stockés dans un bucket Cloud Storage.
Vous pouvez utiliser l'outil gsutil
pour afficher les résultats à partir de votre terminal.
Répertoriez les fichiers de sortie en exécutant la commande suivante :
gsutil ls -lh "gs://$BUCKET/results/outputs*"Affichez les résultats présents dans ces fichiers en exécutant la commande suivante :
gsutil cat "gs://$BUCKET/results/outputs*"Pour afficher vos résultats à partir de l'interface utilisateur de surveillance, procédez comme suit :
- Ouvrez l'interface utilisateur de surveillance Dataflow.
Accéder à l'interface utilisateur Web de DataflowVotre tâche wordcount présente au début l'état Running (En cours d'exécution), puis l'état Succeeded (Réussie) :
- Ouvrez le navigateur Cloud Storage dans Google Cloud Console.
Accéder au navigateur Cloud StorageDans le répertoire
wordcount
, vous devriez voir les fichiers de sortie créés par votre tâche :
Modifier le code du pipeline
Le pipeline WordCount dans les exemples précédents fait la distinction entre les mots en majuscules et en minuscules. La procédure suivante montre comment modifier le pipeline WordCount afin qu'il ne soit pas sensible à la casse.- Téléchargez la dernière copie du code
WordCount
à partir du dépôt GitHub d'Apache Beam. - Exécutez le pipeline sur votre machine locale :
python wordcount.py --output outputs
- Affichez les résultats en exécutant la commande suivante :
more outputs*
Pour quitter le programme, appuyez sur la touche q. - Ouvrez le fichier
wordcount.py
dans l'éditeur de votre choix. - Examinez les étapes du pipeline dans la fonction
run
. Après l'élémentsplit
, les lignes sont divisées en mots sous forme de chaînes.counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(count_ones))
- Modifiez la ligne après
split
pour que les chaînes apparaissent en minuscules.counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) | 'lowercase' >> beam.Map(unicode.lower) # Add this line to modify the pipeline | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(count_ones))
Cette modification mappe la fonctionstr.lower
sur chaque mot. Cette ligne est équivalente àbeam.Map(lambda word: str.lower(word))
. - Enregistrez le fichier et exécutez la tâche WordCount modifiée sur votre machine locale :
python wordcount.py --output outputs
- Affichez les résultats du pipeline modifié en exécutant la commande suivante :
more outputs*
Pour quitter le programme, appuyez sur la touche q.
Nettoyer
Pour éviter que les ressources utilisées dans ce guide de démarrage rapide soient facturées sur votre compte Google Cloud, procédez comme suit :
- Dans Cloud Console, accédez à la page Navigateur Cloud Storage.
- Cochez la case correspondant au bucket que vous souhaitez supprimer.
- Pour supprimer le bucket, cliquez sur Supprimer delete.
Étape suivante
- Obtenez plus d'informations sur le modèle de programmation Apache Beam.
- Apprenez à concevoir et à créer votre propre pipeline.
- Utilisez les exemples WordCount et Mobile Gaming.