Créer un pipeline Dataflow à l'aide de Go

Cette page vous explique comment utiliser le SDK Apache Beam pour Go afin de créer un programme qui définit un pipeline. Ensuite, vous exécutez le pipeline en local et sur le service Dataflow. Pour une présentation du pipeline WordCount, regardez la vidéo How to use WordCount in Apache Beam (Comment utiliser WordCount dans Apache Beam).

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  6. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager :

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  8. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  9. Installez Google Cloud CLI.
  10. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  11. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  12. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  13. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager :

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  15. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  16. Attribuez des rôles à votre compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Remplacez PROJECT_ID par l'ID du projet.
    • Remplacez PROJECT_NUMBER par votre numéro de projet. Pour trouver votre numéro de projet, consultez la section Identifier des projets ou utilisez la commande gcloud projects describe.
    • Remplacez SERVICE_ACCOUNT_ROLE par chaque rôle individuel.
  17. Créez un bucket Cloud Storage et configurez-le comme suit :
    • Définissez la classe de stockage sur S (Standard).
    • Définissez l'emplacement de stockage sur : US (États-Unis).
    • Remplacez BUCKET_NAME par un nom de bucket unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Copiez l'ID du projet Google Cloud et le nom du bucket Cloud Storage. Vous aurez besoin de ces valeurs plus loin dans ce guide de démarrage rapide.

Configurer l'environnement de développement

Le SDK Apache Beam est un modèle de programmation Open Source pour les pipelines de données. Vous définissez un pipeline avec un programme Apache Beam, puis l'exécutez à l'aide d'un exécuteur tel que Dataflow.

Nous vous recommandons d'utiliser la dernière version de Go lorsque vous utilisez le SDK Apache Beam pour Go. Si la dernière version de Go n'est pas installée, utilisez le guide de téléchargement et d'installation de Go pour télécharger et installer Go sur votre système d'exploitation spécifique.

Pour vérifier la version de Go que vous avez installée, exécutez la commande suivante dans votre terminal local :

go version

Exécuter l'exemple de décompte de mots Beam

Le SDK Apache Beam pour Go inclut un exemple de pipeline wordcount. L'exemple wordcount effectue les opérations suivantes :

  1. Lit un fichier texte en entrée. Par défaut, il lit un fichier texte situé dans un bucket Cloud Storage avec le nom de ressource gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analyser chaque ligne en mots
  3. Calculer la fréquence des mots en fonction des mots tokenisé.

Pour exécuter la dernière version de l'exemple Beam wordcount sur votre ordinateur local, utilisez la commande suivante. L'option input spécifie le fichier à lire, et l'option output spécifie le nom de fichier à utiliser pour délivrer le décompte de fréquence en sortie.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

Une fois l'exécution du pipeline terminée, affichez les résultats :

more outputs*

Pour quitter, appuyez sur Q.

Modifier le code du pipeline

Le pipeline wordcount de Beam fait la distinction entre les mots en majuscules et en minuscules. Les étapes suivantes montrent comment créer votre propre module Go, modifier le pipeline wordcount afin qu'il ne soit pas sensible à la casse, et l'exécuter sur Dataflow.

Créer un module Go

Pour modifier le code du pipeline, procédez comme suit.

  1. Créez un répertoire pour votre module Go à l'emplacement de votre choix :

    mkdir wordcount
    cd wordcount
    
  2. Créez un module Go. Pour cet exemple, utilisez example/dataflow comme chemin de module.

    go mod init example/dataflow
    
  3. Téléchargez la dernière copie du code wordcount à partir du dépôt GitHub d'Apache Beam. Placez ce fichier dans le répertoire wordcount que vous avez créé.

  4. Si vous utilisez un système d'exploitation non-Linux, vous devez obtenir le package Go unix. Ce package est nécessaire pour exécuter des pipelines sur le service Dataflow.

    go get -u golang.org/x/sys/unix
    
  5. Assurez-vous que le fichier go.mod correspond au code source du module :

    go mod tidy
    

Exécuter le pipeline non modifié

Vérifiez que le pipeline wordcount non modifié s'exécute localement.

  1. Depuis le terminal, créez et exécutez le pipeline localement :

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Affichez les résultats :

     more outputs*
    
  3. Pour quitter, appuyez sur Q.

Modifier le code du pipeline

Pour modifier le pipeline afin qu'il ne soit pas sensible à la casse, modifiez le code pour appliquer la fonction strings.ToLower à tous les mots.

  1. Dans l'éditeur de votre choix, ouvrez le fichier wordcount.go.

  2. Examinez le bloc init (les commentaires ont été supprimés pour plus de clarté):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Ajoutez une ligne pour enregistrer la fonction strings.ToLower :

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Examinez la fonction CountWords :

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Pour convertir les mots en minuscules, ajoutez une opération ParDo qui applique strings.ToLower à chaque mot :

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Enregistrez le fichier.

Exécuter le pipeline mis à jour en local

Exécutez votre pipeline wordcount mis à jour localement et vérifiez que le résultat a changé.

  1. Créez et exécutez le pipeline wordcount modifié :

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Affichez les résultats du pipeline modifié. Tous les mots doivent être en minuscules.

     more outputs*
    
  3. Pour quitter, appuyez sur Q.

Exécuter le pipeline sur le service Dataflow

Pour exécuter l'exemple wordcount mis à jour sur le service Dataflow, utilisez la commande suivante :

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Remplacez les éléments suivants :

  • BUCKET_NAME : nom du bucket Cloud Storage.

  • PROJECT_ID : ID de projet Google Cloud.

  • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer la tâche Dataflow. Exemple :europe-west1 Pour obtenir la liste des emplacements disponibles, voir Emplacements Dataflow. L'option --region remplace la région par défaut définie dans le serveur de métadonnées, sur votre client local ou dans les variables d'environnement.

Afficher les résultats

Vous pouvez afficher la liste de vos tâches Dataflow dans la console Google Cloud. Dans la console Google Cloud, accédez à la page Tâches de Dataflow.

Accéder aux tâches

La page Tâches affiche les détails de votre tâche wordcount, y compris l'état En cours d'exécution, puis Réussite.

Lorsque vous exécutez un pipeline à l'aide de Dataflow, vos résultats sont stockés dans un bucket Cloud Storage. Affichez les résultats de sortie en utilisant la console Google Cloud ou votre terminal local.

Console

Pour afficher vos résultats dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

Accéder à la page "Buckets"

Dans la liste des buckets de votre projet, cliquez sur le bucket de stockage que vous avez créé précédemment. Les fichiers de sortie créés par votre tâche s'affichent dans le répertoire results.

Terminal

Affichez les résultats à partir de votre terminal ou à l'aide de Cloud Shell.

  1. Pour répertorier les fichiers de sortie, utilisez la commande gcloud storage ls :

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    Remplacez BUCKET_NAME par le nom du bucket Cloud Storage de sortie spécifié.

  2. Pour afficher les résultats dans les fichiers de sortie, utilisez la commande gcloud storage cat :

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.

  1. Dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  2. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  3. Pour supprimer le bucket, cliquez sur Supprimer , puis suivez les instructions.
  4. Si vous conservez votre projet, révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.

    gcloud auth application-default revoke
  6. Facultatif : Révoquez les identifiants de la CLI gcloud.

    gcloud auth revoke

Étapes suivantes