Utiliser des pipelines de données Dataflow

Présentation

Vous pouvez utiliser les pipelines de données Dataflow pour les tâches suivantes :

  • Créer des programmations de jobs récurrents.
  • Comprendre où les ressources sont dépensées lors de plusieurs exécutions de jobs.
  • Définir et gérer les objectifs de fraîcheur des données.
  • Afficher le détail de chaque étape de pipeline pour corriger et optimiser vos pipelines.

Pour obtenir la documentation de l'API, consultez la documentation de référence sur les pipelines de données.

Fonctionnalités

  • Créez un pipeline par lots récurrent pour exécuter une tâche par lots selon un calendrier.
  • Créez un pipeline par lots incrémentiel récurrent afin d'exécuter une tâche par lot sur la dernière version des données d'entrée.
  • Utilisez le tableau de données récapitulatif du pipeline pour afficher l'utilisation totale et la consommation des ressources d'un pipeline.
  • Affichez la fraîcheur des données d'un pipeline de traitement par flux. Cette métrique, qui évolue au fil du temps, peut être liée à une alerte qui vous avertit lorsque la fraîcheur tombe en dessous d'un objectif spécifié.
  • Utilisez les graphiques de métriques de pipeline pour comparer les tâches de pipeline par lot et détecter les anomalies.

Limites

  • Disponibilité régionale : vous pouvez créer des pipelines de données dans les régions Cloud Scheduler disponibles.

  • Quota :

    • Nombre de pipelines par projet par défaut : 500
    • Nombre de pipelines par organisation par défaut : 2 500

      Le quota au niveau de l'organisation est désactivé par défaut. Vous pouvez opter pour des quotas au niveau de l'organisation. Dans ce cas, chaque organisation peut utiliser un nombre maximal de 2 500 pipelines par défaut.

  • Libellés : vous ne pouvez pas utiliser de libellés définis par l'utilisateur pour ajouter un libellé aux pipelines de données Dataflow. Cependant, lorsque vous utilisez le champ additionalUserLabels, ces valeurs sont transmises à votre job Dataflow. Pour plus d'informations sur la façon dont les libellés s'appliquent à des jobs Dataflow individuels, consultez la page Options de pipeline.

Types de pipelines de données

Dataflow propose deux types de pipelines de données : les flux et les lots. Ces deux types de pipelines exécutent des jobs définis dans les modèles Dataflow.

Pipeline de données en flux continu
Un pipeline de flux de données exécute un job par flux Dataflow immédiatement après sa création.
Pipeline de données par lot

Un pipeline de données par lot exécute un job par lot Dataflow selon un calendrier défini par l'utilisateur. Le nom du fichier d'entrée du pipeline par lot peut être paramétré pour permettre le traitement incrémentiel du pipeline par lot.

Pipelines par lots incrémentiels

Vous pouvez utiliser des espaces réservés de date et d'heure pour spécifier un format de fichier d'entrée incrémentiel pour un pipeline par lots.

  • Les espaces réservés pour l'année, le mois, la date, l'heure, la minute et la seconde peuvent être utilisés et doivent respecter le format strftime(). Les espaces réservés sont précédés du symbole pourcentage (%).
  • La mise en forme des paramètres n'a pas été vérifiée lors de la création du pipeline.
    • Exemple : Si vous spécifiez "gs://bucket/Y" comme chemin d'entrée paramétré, il sera évalué comme "gs://bucket/Y", car "Y" qui n'est pas précédé de "%" ne correspond pas au format strftime().

À chaque heure d'exécution programmée du pipeline par lot, la partie de l'espace réservé du chemin d'entrée est évaluée à la date et l'heure actuelles (ou décalées dans le temps). Les valeurs de date sont évaluées à l'aide de la date actuelle dans le fuseau horaire de la tâche planifiée. Si le chemin évalué correspond à celui d'un fichier d'entrée, le fichier est sélectionné pour le traitement par le pipeline de traitement par lot à l'heure planifiée.

  • Exemple:Un pipeline par lot se répète au début de chaque heure PST. Si vous paramétrez le chemin d'entrée sur gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, le 15 avril 2021 à 18:00 PST, le chemin d'entrée est évalué avec gs://bucket-name/2021-04-15/prefix-18_00.csv.

Utiliser les paramètres de décalage temporel

Vous pouvez utiliser les paramètres de décalage temporel de + ou - minute ou heure. Pour prendre en charge la correspondance d'un chemin d'entrée avec une date et heure évaluée qui est modifiée avant ou après la date et l'heure actuelles de la programmation du pipeline, placez ces paramètres entre accolades. Utilisez le format {[+|-][0-9]+[m|h]}. Le pipeline par lots continue de se répéter à l'heure prévue, mais le chemin d'entrée est évalué avec le décalage temporel spécifié.

  • Exemple:Un pipeline par lot se répète au début de chaque heure PST. Si vous paramétrez le chemin d'entrée sur gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, le 15 avril 2021 à 18:00 PST, le chemin d'entrée est évalué avec gs://bucket-name/2021-04-15/prefix-16_00.csv.

Rôles de pipeline de données

Pour que les opérations de pipeline de données Dataflow réussissent, vous devez disposer des rôles IAM nécessaires, comme suit :

  1. Vous devez disposer du rôle approprié pour effectuer des opérations :

  2. Le compte de service utilisé par Cloud Scheduler doit disposer du rôle roles/iam.serviceAccountUser, que le compte de service soit spécifié par l'utilisateur ou qu'il s'agisse du compte de service Compute Engine par défaut. Pour en savoir plus, consultez la section Rôles de pipeline de données.

  3. Vous devez pouvoir agir en tant que compte de service utilisé par Cloud Scheduler et Dataflow grâce au rôle roles/iam.serviceAccountUser sur ce compte. Si vous ne sélectionnez pas de compte de service pour Cloud Scheduler et Dataflow, le compte de service Compute Engine par défaut est utilisé.

Créer un pipeline de données

Vous pouvez créer un pipeline de données Dataflow de deux manières :

  1. Importer une tâche, ou
  2. Créer un pipeline de données

Page de configuration des pipelines de données : lorsque vous accédez pour la première fois à la fonctionnalité des pipelines Dataflow dans la console Google Cloud, une page de configuration s'ouvre. Autorisez les API répertoriées à créer des pipelines de données.

Importer une tâche

Vous pouvez importer un job par lot ou par flux Dataflow basé sur un modèle classique ou flexible et en faire un pipeline de données.

  1. Dans la console Google Cloud, accédez à la page Tâches de Dataflow.

    Accéder aux tâches

  2. Sélectionnez un job terminé, puis sur la page Informations sur le job, sélectionnez + Importer en tant que pipeline.

  3. Sur la page Créer un pipeline à partir d'un modèle, les paramètres sont renseignés avec les options du job importé.

  4. Pour une tâche par lot, dans la section Planifier votre pipeline, indiquez un calendrier de récurrence. La spécification d'une adresse de compte de messagerie pour Cloud Scheduler, utilisée pour planifier des exécutions par lots, est facultative. Si elle n'est pas spécifiée, le compte de service Compute Engine par défaut est utilisé.

Créer un pipeline de données

  1. Dans la console Google Cloud, accédez à la page Pipelines de données de Dataflow.

    Accéder à la page "Pipelines de données"

  2. Sélectionnez + Créer un pipeline de données.

  3. Sur la page Créer un pipeline à partir d'un modèle, indiquez un nom de pipeline, puis renseignez les autres champs de sélection et de paramètres du modèle.

  4. Pour un job par lot, dans la section Planifier votre pipeline, indiquez un calendrier de récurrence. La spécification d'une adresse de compte de messagerie pour Cloud Scheduler, utilisée pour planifier des exécutions par lots, est facultative. Si aucune valeur n'est spécifiée, le compte de service Compute Engine par défaut est utilisé.

Créer un pipeline de données par lots

Pour créer cet exemple de pipeline de données par lot, vous devez avoir accès aux ressources suivantes de votre projet:

Cet exemple de pipeline utilise le modèle de pipeline par lots Texte Cloud Storage vers BigQuery. Ce modèle lit les fichiers au format CSV à partir de Cloud Storage, exécute une transformation, puis insère les valeurs dans une table BigQuery avec trois colonnes.

  1. Créez les fichiers suivants sur votre disque local :

    1. Un fichier bq_three_column_table.json contenant le schéma suivant de la table BigQuery de destination.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. Un fichier JavaScript split_csv_3cols.js, qui met en œuvre une transformation simple sur les données d'entrée avant l'insertion dans BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. Un fichier CSV file01.csv avec plusieurs enregistrements insérés dans la table BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Utilisez la commande gcloud storage cp pour copier les fichiers dans des dossiers dans un bucket Cloud Storage de votre projet, comme suit :

    1. Copiez bq_three_column_table.json et split_csv_3cols.js dans gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
      
    2. Copiez file01.csv vers gs://BUCKET_ID/inputs/ :

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
      
  3. Dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  4. Pour créer un dossier tmp dans votre bucket Cloud Storage, sélectionnez le nom de votre dossier pour ouvrir la page Informations sur le bucket, puis cliquez sur Créer un dossier.

    Bouton "Créer un dossier" sur la page d'informations sur le bucket

  5. Dans la console Google Cloud, accédez à la page Pipelines de données de Dataflow.

    Accéder à la page "Pipelines de données"

  6. Sélectionnez Créer un pipeline de données. Saisissez ou sélectionnez les éléments suivants sur la page Créer un pipeline à partir d'un modèle:

    1. Dans le champ Nom du pipeline, saisissez text_to_bq_batch_data_pipeline.
    2. Pour le Point de terminaison régional, sélectionnez une région Compute Engine. Les régions source et de destination doivent correspondre. Par conséquent, votre bucket Cloud Storage et votre table BigQuery doivent se trouver dans la même région.
    3. Pour Modèle Dataflow, dans Traiter les données de manière groupée (lot), sélectionnez Fichiers texte sur Cloud Storage vers BigQuery.

    4. Pour Planifier votre pipeline, sélectionnez une programmation, par exemple toutes les heures à la minute 25, dans votre fuseau horaire. Vous pouvez modifier la programmation après avoir envoyé le pipeline. La spécification d'une adresse de compte de messagerie pour Cloud Scheduler, utilisée pour planifier des exécutions par lots, est facultative. Si elle n'est pas spécifiée, le compte de service Compute Engine par défaut est utilisé.

    5. Dans Paramètres obligatoires, saisissez les valeurs suivantes :

      1. Pour le Chemin d'accès aux fonctions JavaScript définies par l'utilisateur dans Cloud Storage :
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. Pour le Chemin JSON :
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. Pour le Nom des fonctions JavaScript définies par l'utilisateur : transform
      4. Pour Table BigQuery de sortie :
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. Pour le Chemin d'entrée Cloud Storage :
        BUCKET_ID/inputs/file01.csv
        
      6. Pour le Répertoire BigQuery temporaire :
        BUCKET_ID/tmp
        
      7. Pour Emplacement temporaire :
        BUCKET_ID/tmp
        
    6. Cliquez sur Créer un pipeline.

  7. Confirmez les informations sur le pipeline et le modèle, puis affichez l'historique actuel et précédent à partir de la page Détails du pipeline.

    Page d'informations sur le pipeline

Vous pouvez également exécuter un pipeline par lot à la demande en utilisant le bouton Exécuter de la console Dataflow Pipelines.

Créer un exemple de pipeline de flux de données

Vous pouvez créer un exemple de pipeline de flux de données en suivant les instructions d'exemple de pipeline de traitement par lot, avec les différences suivantes:

  • Pour Planification du pipeline, ne spécifiez pas de programmation pour un pipeline de flux de données. La tâche par flux Cloud Dataflow est immédiatement démarrée.
  • Pour Modèle Dataflow, dans Traiter les données en continu (flux), sélectionnez Fichiers texte sur Cloud Storage vers BigQuery.
  • Pour le Type de machine des nœuds de calcul, le pipeline traite l'ensemble initial de fichiers correspondant au modèle gs://BUCKET_ID/inputs/file01.csv et tous les fichiers supplémentaires correspondant à ce modèle que vous importez dans le dossier inputs/. Si la taille des fichiers CSV dépasse plusieurs Go, pour éviter d'éventuelles erreurs de mémoire saturée, sélectionnez un type de machine doté d'une capacité de mémoire supérieure à celle du type de machine n1-standard-4 par défaut, tel que n1-highmem-8.

Dépannage

Cette section explique comment résoudre les problèmes liés aux pipelines de données Dataflow.

Échec de lancement du job de pipeline de données

Lorsque vous utilisez des pipelines de données pour créer une programmation de job récurrent, votre job Dataflow peut ne pas se lancer, auquel cas une erreur d'état 503 apparaît dans les fichiers journaux Cloud Scheduler.

Ce problème se produit lorsque Dataflow est temporairement dans l'impossibilité d'exécuter le job.

Pour contourner ce problème, configurez Cloud Scheduler afin de relancer le job. Comme le problème est temporaire, une nouvelle tentative d'exécution du job peut aboutir. Pour en savoir plus sur la définition des valeurs de nouvelle tentative dans Cloud Scheduler, consultez la section Créer un job.

Examiner les violations des objectifs de pipeline

Les sections suivantes décrivent comment examiner les pipelines qui n'atteignent pas les objectifs de performances.

Pipelines par lots récurrents

Pour une analyse initiale de l'état de votre pipeline, sur la page Informations sur le pipeline de la console Google Cloud, utilisez les graphiques État d'un job spécifique et Durée des threads par étape. Ces graphiques se trouvent dans le panneau d'état du pipeline.

Exemple d'investigation :

  1. Vous avez un pipeline par lot récurrent qui s'exécute toutes les heures à trois minutes de l'heure. Chaque job s'exécute normalement pendant environ neuf minutes. Vous avez pour objectif d'exécuter tous les jobs en moins de 10 minutes.

  2. Le graphique d'état de la tâche indique que l'exécution d'une tâche a pris plus de 10 minutes.

  3. Dans le tableau de l'historique des mises à jour/exécution, trouvez le job exécuté à l'heure qui vous intéresse. Cliquez dessus pour accéder à la page des détails du job Dataflow. Sur cette page, trouvez l'étape d'exécution la plus longue, puis recherchez dans les journaux les erreurs potentielles pour déterminer la cause du retard.

Pipelines de traitement par flux

Pour une analyse initiale de l'état de votre pipeline, sur la page Détails du pipeline, dans l'onglet Informations sur le pipeline, utilisez le graphique de fraîcheur des données. Ce graphique se trouve dans le panneau d'état du pipeline.

Exemple d'investigation :

  1. Vous disposez d'un pipeline de streaming qui produit normalement un résultat avec une fraîcheur des données de 20 secondes.

  2. Vous avez fixé un objectif de 30 secondes pour garantir la fraîcheur des données. Lorsque vous examinez le graphique de fraîcheur des données, vous remarquez qu'entre 9h et 10h, la fraîcheur des données est passée à près de 40 secondes.

    Graphique de fraîcheur des données indiquant une augmentation du nombre de minutes dans la fraîcheur des données

  3. Basculez vers l'onglet Métriques du pipeline, puis affichez les graphiques d'utilisation du processeur et d'utilisation de la mémoire pour une analyse plus approfondie.

Erreur : L'ID de pipeline existe déjà dans le projet

Si vous essayez de créer un pipeline portant un nom déjà présent dans votre projet, le message d'erreur suivant s'affiche : Pipeline Id already exist within the project. Pour éviter ce problème, choisissez toujours des noms uniques pour vos pipelines.