Charger des données de Cloud Storage vers BigQuery à l'aide de Workflows

Last reviewed 2021-05-12 UTC

Ce tutoriel explique comment exécuter de manière fiable des workflows sans serveur à l'aide de Workflows, Cloud Functions et Firestore pour charger des données brutes, telles que des journaux d'événements, depuis Cloud Storage vers BigQuery. Les plates-formes d'analyse disposent généralement d'un outil d'orchestration permettant de charger régulièrement les données dans BigQuery à l'aide de tâches BigQuery, puis de transformer les données pour fournir des métriques métier à l'aide d'instructions SQL, y compris des instructions de langage procédural BigQuery. Ce tutoriel s'adresse aux développeurs et aux architectes qui souhaitent créer des pipelines de traitement des données sans serveur basés sur des événements. Pour ce tutoriel, nous partons du principe que vous connaissez bien YAML, SQL et Python.

Architecture

Le schéma suivant illustre l'architecture de haut niveau d'un pipeline d'extraction, chargement et transformation sans serveur utilisant Workflows.

Extraction, chargement et transformation du pipeline.

Dans le schéma précédent, considérons une plate-forme de vente au détail qui collecte régulièrement des événements de vente sous forme de fichiers depuis divers magasins, puis écrit les fichiers dans un bucket Cloud Storage. Les événements permettent de fournir des métriques commerciales via l'importation et le traitement des données dans BigQuery. Cette architecture fournit un système d'orchestration fiable et sans serveur pour importer vos fichiers dans BigQuery. Elle se divise en deux modules :

  • Liste de fichiers : gère la liste des fichiers non traités ajoutés à un bucket Cloud Storage dans une collection Firestore. Ce module fonctionne via une fonction Cloud déclenchée par un événement de stockage Object Finalize (Finalisation de l'objet), lequel est généré lorsqu'un nouveau fichier est ajouté au bucket Cloud Storage. Le nom de fichier est ajouté au tableau files de la collection nommée new dans Firestore.
  • Workflow : exécute les workflows planifiés. Cloud Scheduler déclenche un workflow qui exécute une série d'étapes selon une syntaxe basée sur YAML pour orchestrer le chargement, puis transformer les données dans BigQuery en appelant Cloud Functions. Les étapes du workflow appellent Cloud Functions pour exécuter les tâches suivantes :

    • Créer et démarrer une tâche de chargement BigQuery.
    • Interroger l'état de la tâche de chargement.
    • Créer et démarrer la tâche de requête de transformation.
    • Interroger l'état de la tâche de transformation.

L'utilisation de transactions pour gérer la liste des nouveaux fichiers dans Firestore permet de garantir qu'aucun fichier ne manque lorsqu'un workflow les importe dans BigQuery. Les exécutions distinctes du workflow deviennent idempotentes lorsque vous stockez les métadonnées et l'état de la tâche dans Firestore.

Objectifs

  • Créer une base de données Firestore.
  • Configurer un déclencheur Cloud Function pour suivre les fichiers ajoutés au bucket Cloud Storage dans Firestore.
  • Déployer Cloud Functions pour exécuter et surveiller les tâches BigQuery.
  • Déployer et exécuter un workflow pour automatiser le processus.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

  1. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  2. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  3. Activer les API Cloud Build, Cloud Functions, Identity and Access Management, Resource Manager, and Workflows.

    Activer les API

  4. Accédez à la page Bienvenue et notez l'ID du projet, car vous en aurez besoin lors d'une prochaine étape.

    Accéder à la page d'accueil

  5. Dans la console Google Cloud, activez Cloud Shell.

    Activer Cloud Shell

Préparer votre environnement

Pour préparer votre environnement, créez une base de données Firestore, clonez les exemples de code à partir du dépôt GitHub, créez des ressources à l'aide de Terraform, modifiez le fichier YAML Workflows et installez les exigences du générateur de fichiers.

  1. Pour créer une base de données Firestore, procédez comme suit :

    1. Dans Google Cloud Console, accédez à la page Firestore.

      Accéder à Firestore

    2. Cliquez sur Sélectionner le mode natif.

    3. Dans le menu Sélectionner un emplacement, sélectionnez la région dans laquelle vous souhaitez héberger la base de données Firestore. Nous vous recommandons de choisir une région proche de votre emplacement physique.

    4. Cliquez sur Créer une base de données.

  2. Dans Cloud Shell, clonez le dépôt source :

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. Dans Cloud Shell, créez les ressources suivantes à l'aide de Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet Google Cloud
    • REGION : emplacement géographique Google Cloud spécifique dans lequel héberger vos ressources, par exemple us-central1
    • ZONE : emplacement d'une région dans laquelle héberger vos ressources (par exemple, us-central1-b)

    Un message semblable à celui-ci doit s'afficher : Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform peut vous aider à créer, modifier et mettre à niveau une infrastructure à grande échelle de manière sécurisée et prévisible. Les ressources suivantes sont créées dans votre projet :

    • Des comptes de service disposant des droits requis pour garantir un accès sécurisé à vos ressources
    • Un ensemble de données BigQuery nommé serverless_elt_dataset et une table nommée word_count pour charger les fichiers entrants
    • Un bucket Cloud Storage nommé ${project_id}-ordersbucket pour les fichiers d'entrée de préproduction
    • Les cinq fonctions Cloud Functions suivantes :
      • file_add_handler ajoute le nom des fichiers intégrés au bucket Cloud Storage dans la collection Firestore.
      • create_job crée une tâche de chargement BigQuery et associe les fichiers de la collection Firebase avec la tâche.
      • create_query crée une requête BigQuery.
      • poll_bigquery_job obtient l'état d'une tâche BigQuery.
      • run_bigquery_job démarre une tâche BigQuery.
  4. Récupérez les URL des fonctions Cloud create_job, create_query, poll_job et run_bigquery_job que vous avez déployées à l'étape précédente.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    Le résultat ressemble à ce qui suit :

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Notez ces URL, car elles seront nécessaires au déploiement de votre workflow.

Créer et déployer un workflow

  1. Dans Cloud Shell, ouvrez le fichier source du workflow, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Remplacez les éléments suivants :

    • CREATE_JOB_URL : URL de la fonction permettant de créer une tâche
    • POLL_BIGQUERY_JOB_URL : URL de la fonction permettant d'interroger l'état d'une tâche en cours d'exécution
    • RUN_BIGQUERY_JOB_URL : URL de la fonction permettant de démarrer une tâche de chargement BigQuery
    • CREATE_QUERY_URL : URL de la fonction permettant de démarrer une requête BigQuery
    • BQ_REGION : région BigQuery dans laquelle les données sont stockées, par exemple US.
    • BQ_DATASET_TABLE_NAME: nom de la table de l'ensemble de données BigQuery au format PROJECT_ID.serverless_elt_dataset.word_count
  2. Déployez le fichier workflow :

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Remplacez les éléments suivants :

    • WORKFLOW_NAME : nom unique du workflow
    • WORKFLOW_REGION : région dans laquelle le workflow est déployé (par exemple, us-central1).
    • WORKFLOW_DESCRIPTION : description du workflow.
  3. Créez un environnement virtuel Python 3 et installez les éléments requis pour le générateur de fichiers :

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Générer les fichiers à importer

Le script Python gen.py génère du contenu aléatoire au format Avro. Le schéma est identique à celui de la table BigQuery word_count. Ces fichiers Avro sont copiés dans le bucket Cloud Storage spécifié.

Dans Cloud Shell, générez les fichiers :

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Remplacez les éléments suivants :

  • RECORDS_PER_FILE : nombre d'enregistrements dans un seul fichier
  • NUM_FILES : nombre total de fichiers à importer
  • FILE_PREFIX : préfixe des noms des fichiers générés

Afficher les entrées de fichier dans Firestore

Lorsque les fichiers sont copiés dans Cloud Storage, la fonction Cloud handle_new_file est déclenchée. Cette fonction ajoute la liste de fichiers au tableau de liste de fichiers dans le document new de la collection jobs dans Firestore.

Pour afficher la liste des fichiers, accédez à la page Données de Firestore dans la console Google Cloud.

Accéder aux données

Liste des fichiers ajoutés à la collection

Déclencher le workflow

Workflows associe une série de tâches sans serveur provenant de Google Cloud et de services d'API. Les étapes individuelles de ce workflow s'exécutent en tant que fonctions Cloud et l'état est stocké dans Firestore. Tous les appels à Cloud Functions sont authentifiés à l'aide du compte de service du workflow.

Dans Cloud Shell, exécutez le workflow suivant :

gcloud workflows execute WORKFLOW_NAME

Le schéma suivant illustre les étapes utilisées dans le workflow :

Étapes utilisées dans le workflow principal et le sous-workflow.

Le workflow se divise en deux parties : le workflow principal et le sous-workflow. Le workflow principal gère la création de tâches et l'exécution conditionnelle, tandis que le sous-workflow exécute une tâche BigQuery. Le workflow effectue les opérations suivantes :

  • La fonction Cloud create_job crée un objet de tâche, récupère la liste des fichiers ajoutés à Cloud Storage à partir du document Firestore, puis associe les fichiers à la tâche de chargement. S'il n'y a pas de fichiers à charger, la fonction ne crée pas de tâche.
  • La fonction Cloud create_query associe la requête devant être exécutée avec la région BigQuery dans laquelle la requête doit être exécutée. La fonction crée la tâche dans Firestore et renvoie l'ID de la tâche.
  • La fonction Cloud run_bigquery_job obtient l'ID de la tâche qui doit être exécutée, puis appelle l'API BigQuery pour envoyer la tâche.
  • Au lieu d'attendre la fin de la tâche dans la fonction Cloud, vous pouvez interroger régulièrement son état.
    • La fonction Cloud poll_bigquery_job fournit l'état de la tâche. Elle est appelée à plusieurs reprises jusqu'à la fin de la tâche.
    • Pour ajouter un délai entre les appels à la fonction Cloud poll_bigquery_job, une routine sleep est appelée à partir de Workflows.

Afficher l'état de la tâche

Vous pouvez afficher la liste des fichiers et l'état du job.

  1. Dans la console Google Cloud, accédez à la page Données de Firestore.

    Accéder aux données

  2. Un identifiant unique (UUID) est généré pour chaque tâche. Pour afficher les en-têtes job_type et status, cliquez sur l'ID de la tâche. Une tâche peut être de l'un des types suivants, et se trouver dans l'un des états suivants :

    • job_type : type de tâche en cours d'exécution par le workflow avec l'une des valeurs suivantes :

      • 0 : charger les données dans BigQuery.
      • 1 : exécuter une requête dans BigQuery.
    • status : état actuel de la tâche avec l'une des valeurs suivantes :

      • 0 : la tâche a été créée, mais n'a pas démarré.
      • 1 : la tâche est en cours d'exécution.
      • 2 : la tâche a bien été exécutée.
      • 3: Une erreur s'est produite et la tâche ne s'est pas terminée correctement.

    L'objet de tâche contient également des attributs de métadonnées tels que la région de l'ensemble de données BigQuery, le nom de la table BigQuery et, s'il s'agit d'une tâche de requête, la chaîne de requête en cours d'exécution.

Liste des fichiers avec l'état de la tâche mis en évidence

Consulter les données dans BigQuery

Pour vérifier que la tâche ELT a bien été exécutée, vérifiez que les données apparaissent dans la table.

  1. Dans la console Google Cloud, accédez à la page Éditeur de BigQuery.

    Accéder à l'éditeur

  2. Cliquez sur la table serverless_elt_dataset.word_count.

  3. Cliquez sur l'onglet Preview (Aperçu).

    Onglet Aperçu affichant les données de la table

Planifier le workflow

Pour exécuter régulièrement le workflow de manière planifiée, vous pouvez utiliser Cloud Scheduler.

Effectuer un nettoyage

Le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Google Cloud que vous avez créé pour le tutoriel. Vous pouvez également supprimer les différentes ressources.

Supprimer les ressources individuelles

  1. Dans Cloud Shell, supprimez toutes les ressources créées à l'aide de Terraform :

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Dans la console Google Cloud, accédez à la page Données de Firestore.

    Accéder aux données

  3. À côté de Tâches, cliquez sur Menu, puis sélectionnez Supprimer.

    Chemin de menu permettant de supprimer une collection.

Supprimer le projet

  1. Dans la console Google Cloud, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Étapes suivantes