Modèles d'utilitaires fournis par Google

Google fournit un ensemble de modèles Dataflow Open Source. Pour obtenir des informations générales sur les modèles, consultez la page Modèles Dataflow. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.

Ce guide décrit les modèles d'utilitaires.

Conversion de format de fichier (Avro, Parquet, CSV)

Le modèle de conversion de format de fichier est un pipeline par lots qui convertit les fichiers stockés sur Cloud Storage d'un format compatible à un autre.

Les conversions entre les formats suivants sont possibles :

  • CSV vers Avro
  • CSV vers Parquet
  • Avro vers Parquet
  • Parquet vers Avro

Conditions requises pour ce pipeline :

  • Le bucket Cloud Storage de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFileFormat Format de fichier d'entrée. Doit être l'un des suivants : [csv, avro, parquet].
outputFileFormat Format de fichier de sortie. Doit être l'un des suivants : [avro, parquet].
inputFileSpec Modèle de chemin d'accès Cloud Storage pour les fichiers d'entrée. Par exemple : gs://bucket-name/path/*.csv
outputBucket Dossier Cloud Storage dans lequel écrire les fichiers de sortie. Le chemin d'accès doit se terminer par une barre oblique. Par exemple : gs://bucket-name/output/
schema Chemin d'accès Cloud Storage au fichier de schéma Avro. Par exemple, gs://bucket-name/schema/my-schema.avsc
containsHeaders (Facultatif) Les fichiers CSV d'entrée contiennent un enregistrement d'en-tête (vrai/faux). La valeur par défaut est false. Nécessaire uniquement en cas de lecture de fichiers CSV.
csvFormat (Facultatif) Spécification du format CSV à utiliser pour l'analyse des enregistrements. La valeur par défaut est Default. Pour en savoir plus, consultez la page Format CSV Apache Commons.
delimiter (Facultatif) Délimiteur de champ utilisé dans les fichiers CSV d'entrée.
outputFilePrefix (Facultatif) Préfixe du fichier de sortie. La valeur par défaut est output.
numShards (Facultatif) Nombre de segments de fichiers de sortie.

Exécuter le modèle Conversion de format de fichier

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Convert file formats template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans l'interface système ou le terminal, exécutez le modèle :

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • INPUT_FORMAT : format du fichier d'entrée (doit être l'un des suivants : [csv, avro, parquet])
  • OUTPUT_FORMAT : format des fichiers de sortie (doit être l'un des suivants : [avro, parquet])
  • INPUT_FILES : modèle de chemin d'accès pour les fichiers d'entrée
  • OUTPUT_FOLDER : dossier Cloud Storage pour les fichiers de sortie
  • SCHEMA : chemin d'accès au fichier de schéma Avro

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • INPUT_FORMAT : format du fichier d'entrée (doit être l'un des suivants : [csv, avro, parquet])
  • OUTPUT_FORMAT : format des fichiers de sortie (doit être l'un des suivants : [avro, parquet])
  • INPUT_FILES : modèle de chemin d'accès pour les fichiers d'entrée
  • OUTPUT_FOLDER : dossier Cloud Storage pour les fichiers de sortie
  • SCHEMA : chemin d'accès au fichier de schéma Avro

Compression groupée de fichiers Cloud Storage

Le modèle de compression groupée des fichiers Cloud Storage est un pipeline par lots qui compresse des fichiers de Cloud Storage vers un emplacement spécifié. Ce modèle peut être utile lorsque vous devez compresser de grands lots de fichiers dans le cadre d'un processus d'archivage périodique. Les modes de compression compatibles sont les suivants : BZIP2, DEFLATE et GZIP. Les fichiers produits à l'emplacement de destination suivent un schéma de dénomination associant le nom de fichier d'origine à l'extension du mode de compression. Les extensions ajoutées seront les suivantes : .bzip2, .deflate et .gz.

Toute erreur survenant pendant le processus de compression sera inscrite dans le fichier d'échec au format CSV : nom de fichier, message d'erreur. Si aucune erreur ne survient pendant l'exécution du pipeline, le fichier d'erreur sera créé, mais il ne contiendra aucun enregistrement.

Conditions requises pour ce pipeline :

  • La compression doit être dans l'un des formats suivants : BZIP2, DEFLATE et GZIP.
  • Le répertoire de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/uncompressed/*.txt.
outputDirectory Emplacement de sortie où écrire. Par exemple, gs://bucket-name/compressed/.
outputFailureFile Fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de compression. Par exemple, gs://bucket-name/compressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et contient une ligne pour chaque fichier dont la compression échoue.
compression Algorithme de compression utilisé pour compresser les fichiers correspondants. Doit être l'un des suivants : BZIP2, DEFLATE et GZIP.

Exécuter le modèle de fichiers Cloud Storage de compression par lots

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Compress Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans l'interface système ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • BUCKET_NAME : nom du bucket Cloud Storage
  • COMPRESSION : algorithme de compression de votre choix

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • BUCKET_NAME : nom du bucket Cloud Storage
  • COMPRESSION : algorithme de compression de votre choix

Décompression groupée de fichiers Cloud Storage

Le modèle de décompression groupée de fichiers Cloud Storage est un pipeline par lots qui décompresse les fichiers de Cloud Storage vers un emplacement spécifié. Cette fonctionnalité est utile lorsque vous souhaitez utiliser des données compressées pour réduire les coûts de bande passante réseau lors d'une migration, tout en optimisant la vitesse de traitement analytique en opérant sur des données non compressées après la migration. Le pipeline gère automatiquement plusieurs modes de compression lors d'une seule exécution et détermine le mode de décompression à utiliser en fonction de l'extension de fichier (.bzip2, .deflate, .gz ou .zip).

Remarque : Le modèle de décompression groupée de fichiers Cloud Storage est destiné à des fichiers compressés uniques, et non à des dossiers compressés.

Conditions requises pour ce pipeline :

  • Les fichiers à décompresser doivent être dans l'un des formats suivants : Bzip2, Deflate, Gzip ou Zip.
  • Le répertoire de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/compressed/*.gz.
outputDirectory Emplacement de sortie où écrire. Par exemple, gs://bucket-name/decompressed.
outputFailureFile Le fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de décompression. Par exemple, gs://bucket-name/decompressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et consiste en une ligne pour chaque fichier qui échoue à la décompression.

Exécuter le modèle de décompression groupée de fichiers Cloud Storage

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Decompress Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans l'interface système ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • BUCKET_NAME : nom du bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH : chemin d'accès de votre choix qui mène au fichier contenant les informations sur les échecs

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • BUCKET_NAME : nom du bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH : chemin d'accès de votre choix qui mène au fichier contenant les informations sur les échecs

Suppression groupée Datastore [obsolète]

Ce modèle est obsolète et sera supprimé au premier trimestre 2022. Veuillez migrer vers le modèle de suppression groupée Firestore.

Le modèle de suppression groupée de Datastore est un pipeline qui lit les entités de Datastore avec une requête GQL donnée, puis supprime toutes les entités correspondantes du projet cible sélectionné. Le pipeline peut éventuellement transmettre les entités Datastore encodées en JSON à votre fichier UDF JavaScript, que vous pouvez utiliser pour filtrer les entités en renvoyant des valeurs nulles.

Conditions requises pour ce pipeline :

  • Datastore doit être configuré dans le projet avant l'exécution du modèle.
  • Si vous lisez et supprimez des instances de Datastore distinctes, le compte de service du nœud de calcul Dataflow doit être autorisé à lire depuis une instance et à supprimer depuis une autre.

Paramètres de modèle

Paramètres Description
datastoreReadGqlQuery Requête GQL qui spécifie les entités à rechercher pour la suppression. L'utilisation d'une requête ne contenant que des clés peut améliorer les performances. Par exemple : "SELECT __key__ FROM MyKind".
datastoreReadProjectId ID du projet GCP de l'instance Datastore à partir de laquelle vous souhaitez lire les entités (à l'aide de votre requête GQL) utilisées pour la mise en correspondance.
datastoreDeleteProjectId ID du projet GCP de l'instance Cloud Datastore dans laquelle supprimer les entités correspondantes. Cette valeur peut être identique à datastoreReadProjectId si vous souhaitez lire et supprimer des éléments au sein d'une même instance Datastore.
datastoreReadNamespace [Facultatif] Espace de noms des entités demandées. Défini comme "" pour l'espace de noms par défaut.
datastoreHintNumWorkers (Facultatif) Informations sur le nombre attendu de nœuds de calcul dans l'étape de limitation de la montée en puissance progressive de Datastore. La valeur par défaut est 500.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur. Si cette fonction renvoie une valeur indéfinie ou nulle pour une entité Datastore donnée, cette entité ne sera pas supprimée.

Exécuter le modèle de suppression groupée Datastore

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Delete Entities in Datastore template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans l'interface système ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • DATASTORE_READ_AND_DELETE_PROJECT_ID : ID de projet de votre instance Datastore. Cet exemple lit et supprime des éléments au sein d'une même instance Datastore.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • DATASTORE_READ_AND_DELETE_PROJECT_ID : ID de projet de votre instance Datastore. Cet exemple lit et supprime des éléments au sein d'une même instance Datastore.

Suppression groupée Firestore

Le modèle de suppression groupée de Firestore est un pipeline qui lit les entités de Firestore avec une requête GQL donnée, puis supprime toutes les entités correspondantes du projet cible sélectionné. Le pipeline peut éventuellement transmettre les entités Firestore encodées en JSON à votre fichier UDF JavaScript, que vous pouvez utiliser pour filtrer les entités en renvoyant des valeurs nulles.

Conditions requises pour ce pipeline :

  • Firestore doit être configuré dans le projet avant l'exécution du modèle.
  • Si vous lisez et supprimez des instances de Firestore distinctes, le compte de service des nœuds de calcul Dataflow doit être autorisé à lire depuis une instance et à supprimer depuis une autre.

Paramètres de modèle

Paramètres Description
firestoreReadGqlQuery Requête GQL qui spécifie les entités à rechercher pour la suppression. L'utilisation d'une requête ne contenant que des clés peut améliorer les performances. Par exemple : "SELECT __key__ FROM MyKind".
firestoreReadProjectId ID du projet GCP de l'instance Firestore à partir de laquelle vous souhaitez lire les entités (à l'aide de votre requête GQL) utilisées pour la mise en correspondance.
firestoreDeleteProjectId ID du projet GCP de l'instance Cloud Firestore dans laquelle supprimer les entités correspondantes. Cette valeur peut être identique à firestoreReadProjectId si vous souhaitez lire et supprimer des éléments au sein d'une même instance Firestore.
firestoreReadNamespace [Facultatif] Espace de noms des entités demandées. Défini comme "" pour l'espace de noms par défaut.
firestoreHintNumWorkers (Facultatif) Informations sur le nombre attendu de nœuds de calcul dans l'étape de limitation de la montée en puissance progressive de Firestore. La valeur par défaut est 500.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur. Si cette fonction renvoie une valeur indéfinie ou nulle pour une entité Firestore donnée, cette entité ne sera pas supprimée.

Exécuter le modèle de suppression groupée Firestore

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bulk Delete Entities in Firestore template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans l'interface système ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \
    --region REGION_NAME \
    --parameters \
firestoreReadGqlQuery="GQL_QUERY",\
firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\
firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID : ID de votre projet d'instance Firestore. Cet exemple lit et supprime des éléments au sein d'une même instance Firestore.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "firestoreReadGqlQuery": "GQL_QUERY",
       "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID",
       "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • GQL_QUERY : requête que vous utiliserez pour rechercher les entités à supprimer.
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID : ID de votre projet d'instance Firestore. Cet exemple lit et supprime des éléments au sein d'une même instance Firestore.

Générateur de flux de données vers Pub/Sub, BigQuery et Cloud Storage

Le modèle de générateur de flux de données sert à générer un nombre illimité ou fixe d'enregistrements synthétiques ou de messages, basés sur un schéma fourni par l'utilisateur et à un rythme spécifié. Les destinations compatibles incluent les sujets Pub/Sub, les tables BigQuery et les buckets Cloud Storage.

Voici quelques cas d'utilisation possibles :

  • Simuler un événement en temps réel et à grande échelle sur un sujet Pub/Sub, afin de mesurer et de déterminer le nombre et la taille des consommateurs requis pour traiter les événements publiés
  • Générer des données synthétiques vers une table BigQuery ou un bucket Cloud Storage afin d'évaluer les benchmarks des performances ou d'utiliser la démonstration de faisabilité

Récepteurs et formats d'encodage compatibles

Le tableau suivant décrit les récepteurs et les formats d'encodage compatibles avec ce modèle :
JSON Avro Parquet
Pub/Sub Oui Oui Non
BigQuery Oui No Non
Cloud Storage Oui Yes Oui

La bibliothèque JSON Data Generator utilisée par le pipeline permet d'utiliser différentes fonctions factices pour chaque champ du schéma. Pour en savoir plus sur les fonctions factices et le format de schéma, consultez la documentation sur JSON-data-generator.

Conditions requises pour ce pipeline :

  • Créer un fichier de schéma de message et stocker ce fichier dans un emplacement Cloud Storage
  • La sortie cible doit exister avant l'exécution. La cible doit être un sujet Pub/Sub, une table BigQuery ou un bucket Cloud Storage suivant le type de récepteur.
  • Si l'encodage de sortie est Avro ou Parquet, créez un fichier de schéma Avro et stockez-le dans un emplacement Cloud Storage.

Paramètres de modèle

Paramètres Description
schemaLocation Emplacement du fichier de schéma. Exemple : gs://mybucket/filename.json.
qps Nombre de messages à publier par seconde. Exemple : 100.
sinkType (Facultatif) Type de récepteur de sortie. Les valeurs possibles sont PUBSUB, BIGQUERY, GCS. La valeur par défaut est PUBSUB.
outputType (Facultatif) Type d'encodage de sortie. Les valeurs possibles sont JSON, AVRO, PARQUET. La valeur par défaut est JSON.
avroSchemaLocation (Facultatif) Emplacement du fichier de schéma AVRO. Obligatoire lorsque outputType est défini sur AVRO ou PARQUET. Exemple : gs://mybucket/filename.avsc.
topic (Facultatif) Nom du sujet Pub/Sub dans lequel le pipeline doit publier des données. Obligatoire si sinkType est Pub/Sub. Exemple : projects/my-project-ID/topics/my-topic-ID.
outputTableSpec (Facultatif) Nom de la table BigQuery de sortie. Obligatoire lorsque sinkType est BigQuery. Exemple : my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facultatif) Disposition d'écriture BigQuery. Les valeurs possibles sont WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
outputDeadletterTable (Facultatif) Nom de la table BigQuery de sortie servant à stocker les enregistrements ayant échoué. Si cette valeur est omise, le pipeline crée une table portant le nom {output_table_name}_error_records lors de l'exécution. Exemple : my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facultatif) Chemin de l'emplacement de sortie Cloud Storage. Obligatoire lorsque sinkType est défini sur Cloud Storage. Exemple : gs://mybucket/pathprefix/.
outputFilenamePrefix (Facultatif) Préfixe de nom pour les fichiers de sortie écrits dans Cloud Storage. La valeur par défaut est "output-".
windowDuration (Facultatif) Intervalle de fenêtre auquel la sortie est écrite dans Cloud Storage. La valeur par défaut est de "1m", soit 1 minute.
numShards (Facultatif) Nombre maximal de partitions de sortie. Obligatoire lorsque sinkType est défini sur Cloud Storage. La valeur doit alors être définie sur une valeur supérieure ou égale à 1.
messagesLimit (Facultatif) Nombre maximal de messages de sortie. La valeur par défaut est de 0, c'est-à-dire un nombre illimité.
autoscalingAlgorithm (Facultatif) Algorithme utilisé pour l'autoscaling des nœuds de calcul. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver.
maxNumWorkers (Facultatif) Nombre maximal de systèmes de calcul. Exemple : 10.

Exécuter le modèle du générateur de flux de données

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Streaming Data Generator template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans l'interface système ou le terminal, exécutez le modèle :

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • JOB_NAME : nom de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/my-project-ID/topics/my-topic-ID.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • JOB_NAME : nom de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SCHEMA_LOCATION : chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
  • QPS : nombre de messages à publier par seconde
  • PUBSUB_TOPIC : sujet Pub/Sub de sortie. Exemple : projects/my-project-ID/topics/my-topic-ID.