Modèle de générateur de flux de données vers Pub/Sub, BigQuery et Cloud Storage

Le modèle de générateur de flux de données sert à générer un nombre illimité ou fixe d'enregistrements synthétiques ou de messages, basés sur un schéma fourni par l'utilisateur et à un rythme spécifié. Les destinations compatibles incluent les sujets Pub/Sub, les tables BigQuery et les buckets Cloud Storage.

Voici quelques cas d'utilisation possibles :

  • Simuler un événement en temps réel et à grande échelle sur un sujet Pub/Sub, afin de mesurer et de déterminer le nombre et la taille des consommateurs requis pour traiter les événements publiés
  • Générer des données synthétiques vers une table BigQuery ou un bucket Cloud Storage afin d'évaluer les benchmarks des performances ou d'utiliser la démonstration de faisabilité

Récepteurs et formats d'encodage compatibles

Le tableau suivant décrit les récepteurs et les formats d'encodage compatibles avec ce modèle :
JSON Avro Parquet
Pub/Sub Oui Oui Non
BigQuery Oui Non Non
Cloud Storage Oui Oui Oui

Conditions requises pour ce pipeline

  • Le compte de service de nœud de calcul doit disposer du rôle de nœud de calcul Dataflow (roles/dataflow.worker). Pour en savoir plus, consultez la page Présentation de IAM.
  • Créez un fichier de schéma contenant un modèle JSON pour les données générées. Ce modèle utilise la bibliothèque JSON Data Generator, qui vous permet de fournir diverses fonctions factices pour chaque champ du schéma. Pour en savoir plus, consultez la documentation sur json-data-generator.

    Exemple :

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Importez le fichier de schéma dans un bucket Cloud Storage.
  • La sortie cible doit exister avant l'exécution. La cible doit être un sujet Pub/Sub, une table BigQuery ou un bucket Cloud Storage suivant le type de récepteur.
  • Si l'encodage de sortie est Avro ou Parquet, créez un fichier de schéma Avro et stockez-le dans un emplacement Cloud Storage.
  • Attribuez au compte de service de nœud de calcul un rôle IAM supplémentaire en fonction de la destination souhaitée.
    Destination Rôle IAM supplémentaire requis Appliquer à quelle ressource
    Pub/Sub Éditeur Pub/Sub (roles/pubsub.publisher)
    (Pour en savoir plus, consultez la page Contrôle des accès Pub/Sub avec IAM.)
    Sujet Pub/Sub
    BigQuery Éditeur de données BigQuery (roles/bigquery.dataEditor)
    (Pour en savoir plus, consultez la page Contrôle des accès BigQuery avec IAM)
    Ensemble de données BigQuery
    Cloud Storage Administrateur des objets Cloud Storage (roles/storage.objectAdmin)
    (Pour en savoir plus, consultez la page Contrôle des accès Cloud Storage avec IAM.)
    Bucket Cloud Storage

Paramètres de modèle

Paramètres Description
schemaLocation Emplacement du fichier de schéma. Exemple : gs://mybucket/filename.json.
qps Nombre de messages à publier par seconde. Exemple : 100.
sinkType (Facultatif) Type de récepteur de sortie. Les valeurs possibles sont PUBSUB, BIGQUERY, GCS. La valeur par défaut est PUBSUB.
outputType (Facultatif) Type d'encodage de sortie. Les valeurs possibles sont JSON, AVRO, PARQUET. La valeur par défaut est JSON.
avroSchemaLocation (Facultatif) Emplacement du fichier de schéma AVRO. Obligatoire lorsque outputType est défini sur AVRO ou PARQUET. Exemple : gs://mybucket/filename.avsc.
topic (Facultatif) Nom du sujet Pub/Sub dans lequel le pipeline doit publier des données. Obligatoire si sinkType est Pub/Sub. Exemple : projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facultatif) Nom de la table BigQuery de sortie. Obligatoire lorsque sinkType est BigQuery. Exemple : my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facultatif) Disposition d'écriture BigQuery. Les valeurs possibles sont WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
outputDeadletterTable (Facultatif) Nom de la table BigQuery de sortie servant à stocker les enregistrements ayant échoué. Si cette valeur est omise, le pipeline crée une table portant le nom {output_table_name}_error_records lors de l'exécution. Exemple : my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facultatif) Chemin de l'emplacement de sortie Cloud Storage. Obligatoire lorsque sinkType est défini sur Cloud Storage. Exemple : gs://mybucket/pathprefix/.
outputFilenamePrefix (Facultatif) Préfixe de nom pour les fichiers de sortie écrits dans Cloud Storage. La valeur par défaut est "output-".
windowDuration (Facultatif) Intervalle de fenêtre auquel la sortie est écrite dans Cloud Storage. La valeur par défaut est de "1m", soit 1 minute.
numShards (Facultatif) Nombre maximal de partitions de sortie. Obligatoire lorsque sinkType est défini sur Cloud Storage. La valeur doit alors être définie sur une valeur supérieure ou égale à 1.
messagesLimit (Facultatif) Nombre maximal de messages de sortie. La valeur par défaut est de 0, c'est-à-dire un nombre illimité.
autoscalingAlgorithm (Facultatif) Algorithme utilisé pour l'autoscaling des nœuds de calcul. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver.
maxNumWorkers (Facultatif) Nombre maximal de systèmes de calcul. Exemple : 10.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Streaming Data Generator template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/my-project-id/topics/my-topic-id.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/my-project-id/topics/my-topic-id.

Étapes suivantes