Modèles d'utilitaires fournis par Google

Google fournit un ensemble de modèles Dataflow Open Source. Des informations générales sur les modèles sont disponibles sur la page Présentation. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.

Cette page traite des modèles d'utilitaires.

Compression groupée de fichiers Cloud Storage

Le modèle de compression groupée des fichiers Cloud Storage est un pipeline par lots qui compresse des fichiers de Cloud Storage vers un emplacement spécifié. Ce modèle peut être utile lorsque vous devez compresser de grands lots de fichiers dans le cadre d'un processus d'archivage périodique. Les modes de compression compatibles sont les suivants : BZIP2, DEFLATE et GZIP. Les fichiers produits à l'emplacement de destination suivent un schéma de dénomination associant le nom de fichier d'origine à l'extension du mode de compression. Les extensions ajoutées seront les suivantes : .bzip2, .deflate et .gz.

Toute erreur survenant pendant le processus de compression sera inscrite dans le fichier d'échec au format CSV : nom de fichier, message d'erreur. Si aucune erreur ne survient pendant l'exécution du pipeline, le fichier d'erreur sera créé, mais il ne contiendra aucun enregistrement.

Conditions requises pour ce pipeline :

  • La compression doit être dans l'un des formats suivants : BZIP2, DEFLATE et GZIP.
  • Le répertoire de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/uncompressed/*.txt.
outputDirectory Emplacement de sortie où écrire. Par exemple, gs://bucket-name/compressed/.
outputFailureFile Fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de compression. Par exemple, gs://bucket-name/compressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et contient une ligne pour chaque fichier dont la compression échoue.
compression Algorithme de compression utilisé pour compresser les fichiers correspondants. Doit être l'un des suivants : BZIP2, DEFLATE et GZIP.

Exécuter le modèle de fichiers Cloud Storage de compression par lots

Console

Exécuter depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton "Créer une tâche à partir d'un modèle" dans Cloud Console
  5. Sélectionnez the Bulk Compress Cloud Storage Files template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • COMPRESSION : algorithme de compression de votre choix

API

Exécuter depuis l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • COMPRESSION : algorithme de compression de votre choix

Décompression groupée de fichiers Cloud Storage

Le modèle de décompression groupée de fichiers Cloud Storage est un pipeline par lots qui décompresse les fichiers de Cloud Storage vers un emplacement spécifié. Cette fonctionnalité est utile lorsque vous souhaitez utiliser des données compressées pour réduire les coûts de bande passante réseau lors d'une migration, tout en optimisant la vitesse de traitement analytique en opérant sur des données non compressées après la migration. Le pipeline gère automatiquement plusieurs modes de compression lors d'une seule exécution et détermine le mode de décompression à utiliser en fonction de l'extension de fichier (.bzip2, .deflate, .gz ou .zip).

Conditions requises pour ce pipeline :

  • Les fichiers à décompresser doivent être dans l'un des formats suivants : Bzip2, Deflate, Gzip ou Zip.
  • Le répertoire de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/compressed/*.gz.
outputDirectory Emplacement de sortie où écrire. Par exemple, gs://bucket-name/decompressed.
outputFailureFile Le fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de décompression. Par exemple, gs://bucket-name/decompressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et consiste en une ligne pour chaque fichier qui échoue à la décompression.

Exécuter le modèle de décompression groupée de fichiers Cloud Storage

Console

Exécuter depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton "Créer une tâche à partir d'un modèle" dans Cloud Console
  5. Sélectionnez the Bulk Decompress Cloud Storage Files template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH : chemin d'accès de votre choix qui mène au fichier contenant les informations sur les échecs

API

Exécuter depuis l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH : chemin d'accès de votre choix qui mène au fichier contenant les informations sur les échecs

Suppression groupée Datastore

Le modèle de suppression groupée de Datastore est un pipeline qui lit les entités de Datastore avec une requête GQL donnée, puis supprime toutes les entités correspondantes du projet cible sélectionné. Le pipeline peut éventuellement transmettre les entités Datastore encodées en JSON à votre fichier UDF JavaScript, que vous pouvez utiliser pour filtrer les entités en renvoyant des valeurs nulles.

Conditions requises pour ce pipeline :

  • Datastore doit être configuré dans le projet avant l'exécution du modèle.
  • Si vous lisez et supprimez des instances de Datastore distinctes, le compte de service du responsable de traitement de Dataflow doit être autorisé à lire depuis une instance et à supprimer depuis une autre.

Paramètres de modèle

Paramètres Description
datastoreReadGqlQuery Requête GQL qui spécifie les entités à rechercher pour la suppression. L'utilisation d'une requête ne contenant que des clés peut améliorer les performances. Par exemple : "SELECT __key__ FROM MyKind".
datastoreReadProjectId ID du projet GCP de l'instance Datastore à partir de laquelle vous souhaitez lire les entités (à l'aide de votre requête GQL) utilisées pour la mise en correspondance.
datastoreDeleteProjectId ID du projet GCP de l'instance Cloud Datastore dans laquelle supprimer les entités correspondantes. Cette valeur peut être identique à datastoreReadProjectId si vous souhaitez lire et supprimer des éléments au sein d'une même instance Datastore.
datastoreReadNamespace [Facultatif] Espace de noms des entités demandées. Défini comme "" pour l'espace de noms par défaut.
javascriptTextTransformGcsPath (Facultatif) Chemin d'accès Cloud Storage contenant tout votre code JavaScript. Par exemple, gs://mybucket/mytransforms/*.js. Si vous ne souhaitez pas utiliser de fonction définie par l'utilisateur, ne renseignez pas ce champ.
javascriptTextTransformFunctionName [Facultatif] Nom de la fonction à appeler. Si cette fonction renvoie une valeur indéfinie ou nulle pour une entité Datastore donnée, cette entité ne sera pas supprimée. Si votre code JavaScript est le suivant : "function myTransform(inJson) { ...dostuff...}", le nom de votre fonction est "myTransform". Si vous ne souhaitez pas utiliser de fonction définie par l'utilisateur, ne renseignez pas ce champ.

Exécuter le modèle de suppression groupée Datastore

Console

Exécuter depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton "Créer une tâche à partir d'un modèle" dans Cloud Console
  5. Sélectionnez the Datastore Bulk Delete template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • DATASTORE_READ_AND_DELETE_PROJECT_ID : ID de projet de votre instance Datastore. Cet exemple lit et supprime des éléments au sein d'une même instance Datastore.

API

Exécuter depuis l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • DATASTORE_READ_AND_DELETE_PROJECT_ID : ID de projet de votre instance Datastore. Cet exemple lit et supprime des éléments au sein d'une même instance Datastore.

Générateur de flux de données vers Pub/Sub, BigQuery et Cloud Storage

Le modèle de générateur de flux de données sert à générer un nombre illimité ou fixe d'enregistrements synthétiques ou de messages, basés sur un schéma fourni par l'utilisateur et à un rythme spécifié. Les destinations compatibles incluent les sujets Pub/Sub, les tables BigQuery et les buckets Cloud Storage.

Voici quelques cas d'utilisation possibles :

  • Simuler un événement en temps réel et à grande échelle sur un sujet Pub/Sub, afin de mesurer et de déterminer le nombre et la taille des consommateurs requis pour traiter les événements publiés
  • Générer des données synthétiques vers une table BigQuery ou un bucket Cloud Storage afin d'évaluer les benchmarks des performances ou d'utiliser la démonstration de faisabilité

Récepteurs et formats d'encodage compatibles

Le tableau suivant décrit les récepteurs et les formats d'encodage compatibles avec ce modèle :
JSON Avro Parquet
Pub/Sub Oui Oui Non
BigQuery Oui Non Non
Cloud Storage Oui Oui Oui

La bibliothèque JSON Data Generator utilisée par le pipeline permet d'utiliser différentes fonctions factices pour chaque champ du schéma. Pour en savoir plus sur les fonctions factices et le format de schéma, consultez la documentation sur JSON-data-generator.

Conditions requises pour ce pipeline :

  • Créer un fichier de schéma de message et stocker ce fichier dans un emplacement Cloud Storage
  • La sortie cible doit exister avant l'exécution. La cible doit être un sujet Pub/Sub, une table BigQuery ou un bucket Cloud Storage suivant le type de récepteur.
  • Si l'encodage de sortie est Avro ou Parquet, créez un fichier de schéma Avro et stockez-le dans un emplacement Cloud Storage.

Paramètres de modèle

Paramètres Description
schemaLocation Emplacement du fichier de schéma. Exemple : gs://mybucket/filename.json.
qps Nombre de messages à publier par seconde. Exemple : 100.
sinkType (Facultatif) Type de récepteur de sortie. Les valeurs possibles sont PUBSUB, BIGQUERY, GCS. La valeur par défaut est PUBSUB.
outputType (Facultatif) Type d'encodage de sortie. Les valeurs possibles sont JSON, AVRO, PARQUET. La valeur par défaut est JSON.
avroSchemaLocation (Facultatif) Emplacement du fichier de schéma AVRO. Obligatoire lorsque outputType est défini sur AVRO ou PARQUET. Exemple : gs://mybucket/filename.avsc.
topic (Facultatif) Nom du sujet Pub/Sub dans lequel le pipeline doit publier des données. Obligatoire si sinkType est Pub/Sub. Exemple : projects/<project-id>/topics/<topic-name>.
outputTableSpec (Facultatif) Nom de la table BigQuery de sortie. Obligatoire si sinkType est BigQuery. Exemple : your-project:your-dataset.your-table-name.
writeDisposition (Facultatif) Disposition d'écriture BigQuery. Les valeurs possibles sont WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
outputDeadletterTable (Facultatif) Nom de la table BigQuery de sortie servant à stocker les enregistrements ayant échoué. Si cette valeur est omise, le pipeline crée une table portant le nom {output_table_name}_error_records lors de l'exécution. Exemple : your-project:your-dataset.your-table-name.
outputDirectory (Facultatif) Chemin de l'emplacement de sortie Cloud Storage. Obligatoire lorsque sinkType est défini sur Cloud Storage. Exemple : gs://mybucket/pathprefix/.
outputFilenamePrefix (Facultatif) Préfixe de nom pour les fichiers de sortie écrits dans Cloud Storage. La valeur par défaut est "output-".
windowDuration (Facultatif) Intervalle de fenêtre auquel la sortie est écrite dans Cloud Storage. La valeur par défaut est de "1m", soit 1 minute.
numShards [Facultatif] Nombre maximal de partitions de sortie. Obligatoire lorsque sinkType est défini sur Cloud Storage. La valeur doit alors être définie sur une valeur supérieure ou égale à 1.
messagesLimit (Facultatif) Nombre maximal de messages de sortie. La valeur par défaut est de 0, c'est-à-dire un nombre illimité.
autoscalingAlgorithm (Facultatif) Algorithme utilisé pour l'autoscaling des nœuds de calcul. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver.
maxNumWorkers (Facultatif) Nombre maximal de systèmes de calcul. Exemple : 10.

Exécuter le modèle du générateur de flux de données

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Streaming Data Generator template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécution à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • PROJECT_ID : ID de votre projet.
  • REGION_NAME : nom de la région Dataflow (par exemple, us-central1)
  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/<project-id>/topics/<topic-name>.

API

Exécution à partir de l'API REST

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}
  

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet.
  • LOCATION : nom de la région Dataflow (par exemple, us-central1)
  • JOB_NAME : nom de la tâche de votre choix
  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/<project-id>/topics/<topic-name>.