Modèles d'utilitaires fournis par Google

Google fournit un ensemble de modèles Dataflow Open Source. Des informations générales sur les modèles sont disponibles sur la page Présentation. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.

Cette page traite des modèles d'utilitaires.

Conversion de format de fichier (Avro, Parquet, CSV)

Le modèle de conversion de format de fichier est un pipeline par lots qui convertit les fichiers stockés sur Cloud Storage d'un format compatible à un autre.

Les conversions entre les formats suivants sont possibles :

  • CSV vers Avro
  • CSV vers Parquet
  • Avro vers Parquet
  • Parquet vers Avro

Conditions requises pour ce pipeline :

  • Le bucket Cloud Storage de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFileFormat Format de fichier d'entrée. Doit être l'un des suivants : [csv, avro, parquet].
outputFileFormat Format de fichier de sortie. Doit être l'un des suivants : [avro, parquet].
inputFileSpec Modèle de chemin d'accès Cloud Storage pour les fichiers d'entrée. Par exemple : gs://bucket-name/path/*.csv
outputBucket Dossier Cloud Storage dans lequel écrire les fichiers de sortie. Le chemin d'accès doit se terminer par une barre oblique. Par exemple : gs://bucket-name/output/
schema Chemin d'accès Cloud Storage au fichier de schéma Avro. Par exemple, gs://bucket-name/schema/my-schema.avsc
containsHeaders (Facultatif) Les fichiers CSV d'entrée contiennent un enregistrement d'en-tête (vrai/faux). La valeur par défaut est false. Nécessaire uniquement en cas de lecture de fichiers CSV.
csvFormat (Facultatif) Spécification du format CSV à utiliser pour l'analyse des enregistrements. La valeur par défaut est Default. Pour en savoir plus, consultez la page Format CSV Apache Commons.
delimiter (Facultatif) Délimiteur de champ utilisé dans les fichiers CSV d'entrée.
outputFilePrefix (Facultatif) Préfixe du fichier de sortie. La valeur par défaut est output.
numShards (Facultatif) Nombre de segments de fichiers de sortie.

Exécuter le modèle Conversion de format de fichier

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Convert file formats template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles Flex, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/File_Format_Conversion
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • INPUT_FORMAT : format du fichier d'entrée (doit être l'un des suivants : [csv, avro, parquet])
  • OUTPUT_FORMAT : format des fichiers de sortie (doit être l'un des suivants : [avro, parquet])
  • INPUT_FILES : modèle de chemin d'accès pour les fichiers d'entrée
  • OUTPUT_FOLDER : dossier Cloud Storage pour les fichiers de sortie
  • SCHEMA : chemin d'accès au fichier de schéma Avro
  • LOCATION : région d'exécution (par exemple, us-central1)

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/File_Format_Conversion

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/File_Format_Conversion",
   }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • INPUT_FORMAT : format du fichier d'entrée (doit être l'un des suivants : [csv, avro, parquet])
  • OUTPUT_FORMAT : format des fichiers de sortie (doit être l'un des suivants : [avro, parquet])
  • INPUT_FILES : modèle de chemin d'accès pour les fichiers d'entrée
  • OUTPUT_FOLDER : dossier Cloud Storage pour les fichiers de sortie
  • SCHEMA : chemin d'accès au fichier de schéma Avro
  • LOCATION : région d'exécution (par exemple, us-central1)

Compression groupée de fichiers Cloud Storage

Le modèle de compression groupée des fichiers Cloud Storage est un pipeline par lots qui compresse des fichiers de Cloud Storage vers un emplacement spécifié. Ce modèle peut être utile lorsque vous devez compresser de grands lots de fichiers dans le cadre d'un processus d'archivage périodique. Les modes de compression compatibles sont les suivants : BZIP2, DEFLATE et GZIP. Les fichiers produits à l'emplacement de destination suivent un schéma de dénomination associant le nom de fichier d'origine à l'extension du mode de compression. Les extensions ajoutées seront les suivantes : .bzip2, .deflate et .gz.

Toute erreur survenant pendant le processus de compression sera inscrite dans le fichier d'échec au format CSV : nom de fichier, message d'erreur. Si aucune erreur ne survient pendant l'exécution du pipeline, le fichier d'erreur sera créé, mais il ne contiendra aucun enregistrement.

Conditions requises pour ce pipeline :

  • La compression doit être dans l'un des formats suivants : BZIP2, DEFLATE et GZIP.
  • Le répertoire de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/uncompressed/*.txt.
outputDirectory Emplacement de sortie où écrire. Par exemple, gs://bucket-name/compressed/.
outputFailureFile Fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de compression. Par exemple, gs://bucket-name/compressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et contient une ligne pour chaque fichier dont la compression échoue.
compression Algorithme de compression utilisé pour compresser les fichiers correspondants. Doit être l'un des suivants : BZIP2, DEFLATE et GZIP.

Exécuter le modèle de fichiers Cloud Storage de compression par lots

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Compress Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • COMPRESSION : algorithme de compression de votre choix

API

Exécuter depuis l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • COMPRESSION : algorithme de compression de votre choix

Décompression groupée de fichiers Cloud Storage

Le modèle de décompression groupée de fichiers Cloud Storage est un pipeline par lots qui décompresse les fichiers de Cloud Storage vers un emplacement spécifié. Cette fonctionnalité est utile lorsque vous souhaitez utiliser des données compressées pour réduire les coûts de bande passante réseau lors d'une migration, tout en optimisant la vitesse de traitement analytique en opérant sur des données non compressées après la migration. Le pipeline gère automatiquement plusieurs modes de compression lors d'une seule exécution et détermine le mode de décompression à utiliser en fonction de l'extension de fichier (.bzip2, .deflate, .gz ou .zip).

Conditions requises pour ce pipeline :

  • Les fichiers à décompresser doivent être dans l'un des formats suivants : Bzip2, Deflate, Gzip ou Zip.
  • Le répertoire de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/compressed/*.gz.
outputDirectory Emplacement de sortie où écrire. Par exemple, gs://bucket-name/decompressed.
outputFailureFile Le fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de décompression. Par exemple, gs://bucket-name/decompressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et consiste en une ligne pour chaque fichier qui échoue à la décompression.

Exécuter le modèle de décompression groupée de fichiers Cloud Storage

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Decompress Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH : chemin d'accès de votre choix qui mène au fichier contenant les informations sur les échecs

API

Exécuter depuis l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • BUCKET_NAME : nom du bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH : chemin d'accès de votre choix qui mène au fichier contenant les informations sur les échecs

Suppression groupée Datastore

Le modèle de suppression groupée de Datastore est un pipeline qui lit les entités de Datastore avec une requête GQL donnée, puis supprime toutes les entités correspondantes du projet cible sélectionné. Le pipeline peut éventuellement transmettre les entités Datastore encodées en JSON à votre fichier UDF JavaScript, que vous pouvez utiliser pour filtrer les entités en renvoyant des valeurs nulles.

Conditions requises pour ce pipeline :

  • Datastore doit être configuré dans le projet avant l'exécution du modèle.
  • Si vous lisez et supprimez des instances de Datastore distinctes, le compte de service du responsable de traitement de Dataflow doit être autorisé à lire depuis une instance et à supprimer depuis une autre.

Paramètres de modèle

Paramètres Description
datastoreReadGqlQuery Requête GQL qui spécifie les entités à rechercher pour la suppression. L'utilisation d'une requête ne contenant que des clés peut améliorer les performances. Par exemple : "SELECT __key__ FROM MyKind".
datastoreReadProjectId ID du projet GCP de l'instance Datastore à partir de laquelle vous souhaitez lire les entités (à l'aide de votre requête GQL) utilisées pour la mise en correspondance.
datastoreDeleteProjectId ID du projet GCP de l'instance Cloud Datastore dans laquelle supprimer les entités correspondantes. Cette valeur peut être identique à datastoreReadProjectId si vous souhaitez lire et supprimer des éléments au sein d'une même instance Datastore.
datastoreReadNamespace [Facultatif] Espace de noms des entités demandées. Défini comme "" pour l'espace de noms par défaut.
javascriptTextTransformGcsPath (Facultatif) Chemin d'accès Cloud Storage contenant tout votre code JavaScript. Par exemple, gs://mybucket/mytransforms/*.js. Si vous ne souhaitez pas utiliser de fonction définie par l'utilisateur, ne renseignez pas ce champ.
javascriptTextTransformFunctionName [Facultatif] Nom de la fonction à appeler. Si cette fonction renvoie une valeur indéfinie ou nulle pour une entité Datastore donnée, cette entité ne sera pas supprimée. Si votre code JavaScript est le suivant : "function myTransform(inJson) { ...dostuff...}", le nom de votre fonction est "myTransform". Si vous ne souhaitez pas utiliser de fonction définie par l'utilisateur, ne renseignez pas ce champ.

Exécuter le modèle de suppression groupée Datastore

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Delete Entities in Datastore template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • DATASTORE_READ_AND_DELETE_PROJECT_ID : ID de projet de votre instance Datastore. Cet exemple lit et supprime des éléments au sein d'une même instance Datastore.

API

Exécuter depuis l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • DATASTORE_READ_AND_DELETE_PROJECT_ID : ID de projet de votre instance Datastore. Cet exemple lit et supprime des éléments au sein d'une même instance Datastore.

Générateur de flux de données vers Pub/Sub, BigQuery et Cloud Storage

Le modèle de générateur de flux de données sert à générer un nombre illimité ou fixe d'enregistrements synthétiques ou de messages, basés sur un schéma fourni par l'utilisateur et à un rythme spécifié. Les destinations compatibles incluent les sujets Pub/Sub, les tables BigQuery et les buckets Cloud Storage.

Voici quelques cas d'utilisation possibles :

  • Simuler un événement en temps réel et à grande échelle sur un sujet Pub/Sub, afin de mesurer et de déterminer le nombre et la taille des consommateurs requis pour traiter les événements publiés
  • Générer des données synthétiques vers une table BigQuery ou un bucket Cloud Storage afin d'évaluer les benchmarks des performances ou d'utiliser la démonstration de faisabilité

Récepteurs et formats d'encodage compatibles

Le tableau suivant décrit les récepteurs et les formats d'encodage compatibles avec ce modèle :
JSON Avro Parquet
Pub/Sub Oui Oui Non
BigQuery Oui Non Non
Cloud Storage Oui Oui Oui

La bibliothèque JSON Data Generator utilisée par le pipeline permet d'utiliser différentes fonctions factices pour chaque champ du schéma. Pour en savoir plus sur les fonctions factices et le format de schéma, consultez la documentation sur JSON-data-generator.

Conditions requises pour ce pipeline :

  • Créer un fichier de schéma de message et stocker ce fichier dans un emplacement Cloud Storage
  • La sortie cible doit exister avant l'exécution. La cible doit être un sujet Pub/Sub, une table BigQuery ou un bucket Cloud Storage suivant le type de récepteur.
  • Si l'encodage de sortie est Avro ou Parquet, créez un fichier de schéma Avro et stockez-le dans un emplacement Cloud Storage.

Paramètres de modèle

Paramètres Description
schemaLocation Emplacement du fichier de schéma. Exemple : gs://mybucket/filename.json.
qps Nombre de messages à publier par seconde. Exemple : 100.
sinkType (Facultatif) Type de récepteur de sortie. Les valeurs possibles sont PUBSUB, BIGQUERY, GCS. La valeur par défaut est PUBSUB.
outputType (Facultatif) Type d'encodage de sortie. Les valeurs possibles sont JSON, AVRO, PARQUET. La valeur par défaut est JSON.
avroSchemaLocation (Facultatif) Emplacement du fichier de schéma AVRO. Obligatoire lorsque outputType est défini sur AVRO ou PARQUET. Exemple : gs://mybucket/filename.avsc.
topic (Facultatif) Nom du sujet Pub/Sub dans lequel le pipeline doit publier des données. Obligatoire si sinkType est Pub/Sub. Exemple : projects/<project-id>/topics/<topic-name>.
outputTableSpec (Facultatif) Nom de la table BigQuery de sortie. Obligatoire si sinkType est BigQuery. Exemple : your-project:your-dataset.your-table-name.
writeDisposition (Facultatif) Disposition d'écriture BigQuery. Les valeurs possibles sont WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
outputDeadletterTable (Facultatif) Nom de la table BigQuery de sortie servant à stocker les enregistrements ayant échoué. Si cette valeur est omise, le pipeline crée une table portant le nom {output_table_name}_error_records lors de l'exécution. Exemple : your-project:your-dataset.your-table-name.
outputDirectory (Facultatif) Chemin de l'emplacement de sortie Cloud Storage. Obligatoire lorsque sinkType est défini sur Cloud Storage. Exemple : gs://mybucket/pathprefix/.
outputFilenamePrefix (Facultatif) Préfixe de nom pour les fichiers de sortie écrits dans Cloud Storage. La valeur par défaut est "output-".
windowDuration (Facultatif) Intervalle de fenêtre auquel la sortie est écrite dans Cloud Storage. La valeur par défaut est de "1m", soit 1 minute.
numShards [Facultatif] Nombre maximal de partitions de sortie. Obligatoire lorsque sinkType est défini sur Cloud Storage. La valeur doit alors être définie sur une valeur supérieure ou égale à 1.
messagesLimit (Facultatif) Nombre maximal de messages de sortie. La valeur par défaut est de 0, c'est-à-dire un nombre illimité.
autoscalingAlgorithm (Facultatif) Algorithme utilisé pour l'autoscaling des nœuds de calcul. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver.
maxNumWorkers (Facultatif) Nombre maximal de systèmes de calcul. Exemple : 10.

Exécuter le modèle du générateur de flux de données

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Streaming Data Generator template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécution à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • PROJECT_ID : ID de votre projet.
  • REGION_NAME : nom de la région Dataflow (par exemple, us-central1)
  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/<project-id>/topics/<topic-name>.

API

Exécution à partir de l'API REST

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

POST  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}
  

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet.
  • LOCATION : nom de la région Dataflow (par exemple, us-central1)
  • JOB_NAME : nom de la tâche de votre choix
  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/<project-id>/topics/<topic-name>.