Modèles de streaming fournis par Google

Google fournit un ensemble de modèles Dataflow open source. Des informations générales sur les modèles sont disponibles sur la page Présentation. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.

Cette page répertorie les modèles de streaming suivants :

Abonnement Pub/Sub vers BigQuery

Le modèle Abonnement Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un abonnement Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline :

  • Les messages Pub/Sub doivent être au format JSON, comme décrit ici. Par exemple, vous pouvez insérer des messages au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant le type de données "chaîne".
  • La table de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription>.
outputTableSpec Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>

Exécuter le modèle Abonnement Pub/Sub vers BigQuery

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Pub/Sub Subscription to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
  • Remplacez YOUR_DATASET par votre ensemble de données BigQuery et YOUR_TABLE_NAME par le nom de votre table BigQuery.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
        --parameters \
    inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
    outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
  • Remplacez YOUR_DATASET par votre ensemble de données BigQuery et YOUR_TABLE_NAME par le nom de votre table BigQuery.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
           "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Sujet Pub/Sub vers BigQuery

Le modèle sujet Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un sujet Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline :

  • Les messages Pub/Sub doivent être au format JSON, comme décrit ici. Par exemple, vous pouvez insérer des messages au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant le type de données "chaîne".
  • La table de sortie doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputTopic Sujet d'entrée Pub/Sub à lire, au format projects/<project>/topics/<topic>.
outputTableSpec Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>

Exécuter le modèle Sujet Pub/Sub vers BigQuery

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Pub/Sub Topic to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_DATASET par votre ensemble de données BigQuery et YOUR_TABLE_NAME par le nom de votre table BigQuery.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
        --parameters \
    inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
    outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_DATASET par votre ensemble de données BigQuery et YOUR_TABLE_NAME par le nom de votre table BigQuery.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
           "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Pub/Sub vers Pub/Sub

Le modèle Pub/Sub vers Pub/Sub est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et les écrit dans un autre sujet Pub/Sub. Le pipeline accepte également une clé facultative d'attribut de message et une valeur qui peut être utilisée pour filtrer les messages devant être écrits dans le sujet Pub/Sub. Vous pouvez utiliser ce modèle pour copier des messages d'un abonnement Pub/Sub à un autre sujet Pub/Sub avec un filtre de message facultatif.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub source doit exister avant l'exécution.
  • Le thème Pub/Sub de destination doit exister avant l'exécution.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Thème Cloud Pub/Sub dans lequel écrire la sortie. Par exemple, projects/<project-id>/topics/<topic-name>.
filterKey [Facultatif] Filtrez les événements en fonction d'une clé d'attribut. Aucun filtre n'est appliqué si filterKey n'est pas spécifié.
filterValue [Facultatif] Valeur d'attribut de filtre à utiliser dans le cas où un filterKey est fourni. Une valeur filterValue nulle est utilisée par défaut.

Exécution du modèle Pub/Sub vers Pub/Sub

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Pub/Sub to Pub/Sub template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez FILTER_KEY par le nom de la clé d'attribut utilisée pour filtrer les messages.
  • Remplacez FILTER_VALUE par la valeur de l'attribut.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
        --parameters \
    inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
    outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
    filterKey=FILTER_KEY,\
    filterValue=FILTER_VALUE
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez FILTER_KEY par le nom de la clé d'attribut utilisée pour filtrer les messages.
  • Remplacez FILTER_VALUE par la valeur de l'attribut.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
           "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
           "filterKey": "FILTER_KEY",
           "filterValue": "FILTER_VALUE"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Pub/Sub vers Splunk

Le modèle Pub/Sub vers Splunk est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et écrit leur charge utile dans Splunk via la solution HEC (HTTP Event Collector) de Splunk. Avant d'écrire vers Splunk, vous pouvez également appliquer une fonction JavaScript personnalisée par l'utilisateur vers la charge utile du message. Tous les messages dont le traitement échoue sont transférés vers un sujet Pub/Sub inactif en vue d'opérations de dépannage supplémentaires et d'un nouveau traitement.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub source doit exister avant l'exécution du pipeline.
  • Le sujet Pub/Sub inactif doit exister avant l'exécution du pipeline.
  • Le point de terminaison Splunk HEC doit être accessible à partir du réseau de nœuds de calcul Dataflow.
  • Le jeton de la solution HEC de Splunk doit être généré et disponible.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
token Jeton d'authentification HEC Splunk.
url URL HEC Splunk. Il doit être routable depuis le VPC dans lequel le pipeline est exécuté. Par exemple, https://splunk-hec-host:8088.
outputDeadletterTopic Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Facultatif] Chemin d'accès Cloud Storage contenant tout votre code JavaScript. Par exemple, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Facultatif] Nom de la fonction JavaScript à appeler. Par exemple, si votre fonction JavaScript est function myTransform(inJson) { ...dostuff...}, le nom de la fonction est myTransform.
batchCount [Facultatif] Taille de lot pour l'envoi de plusieurs événements vers Splunk. Valeur par défaut 1 (pas de traitement par lots).
parallelism [Facultatif] Nombre maximal de demandes en parallèle. Valeur par défaut 1 (aucun parallélisme).
disableCertificateValidation [Facultatif] Désactiver la validation du certificat SSL. Valeur par défaut "false" (validation activée).

Exécution du modèle Pub/Sub vers Splunk

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Pub/Sub to Splunk template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez INPUT_SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
  • Remplacez TOKEN avec le jeton HTTP Event Collector de Splunk.
  • Remplacez URL par le chemin de l'URL du HTTP Event Collector de Splunk. Par exemple : https://splunk-hec-host:8088.
  • Remplacez DEADLETTER_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_JAVASCRIPT_FUNCTION par le nom de votre fonction JavaScript.
  • Remplacez PATH_TO_JAVASCRIPT_UDF_FILE par le chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript.
  • Remplacez BATCH_COUNT par la taille de lot à utiliser pour l'envoi de plusieurs événements à Splunk.
  • Remplacez PARALLELISM par le nombre de requêtes parallèles à utiliser pour l'envoi d'événements à Splunk.
  • Remplacez DISABLE_VALIDATION par true si vous souhaitez désactiver la validation du certificat SSL.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
        --parameters \
    inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
    token=TOKEN,\
    url=URL,\
    outputDeadletterTopic=projects/YOUR_PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
    javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
    javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
    batchCount=BATCH_COUNT,\
    parallelism=PARALLELISM,\
    disableCertificateValidation=DISABLE_VALIDATION
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez INPUT_SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
  • Remplacez TOKEN avec le jeton HTTP Event Collector de Splunk.
  • Remplacez URL par le chemin de l'URL du HTTP Event Collector de Splunk. Par exemple : https://splunk-hec-host:8088.
  • Remplacez DEADLETTER_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_JAVASCRIPT_FUNCTION par le nom de votre fonction JavaScript.
  • Remplacez PATH_TO_JAVASCRIPT_UDF_FILE par le chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript.
  • Remplacez BATCH_COUNT par la taille de lot à utiliser pour l'envoi de plusieurs événements à Splunk.
  • Remplacez PARALLELISM par le nombre de requêtes parallèles à utiliser pour l'envoi d'événements à Splunk.
  • Remplacez DISABLE_VALIDATION par true si vous souhaitez désactiver la validation du certificat SSL.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
           "token": "TOKEN",
           "url": "URL",
           "outputDeadletterTopic": "projects/YOUR_PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
           "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
           "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
           "batchCount": "BATCH_COUNT",
           "parallelism": "PARALLELISM",
           "disableCertificateValidation": "DISABLE_VALIDATION"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Pub/Sub vers Cloud Storage Avro

Le modèle Pub/Sub vers Cloud Storage Avro est un pipeline en streaming qui lit les données d'un sujet Pub/Sub et écrit les fichiers Avro dans le bucket Cloud Storage spécifié. Ce pipeline est compatible avec la durée de fenêtre facultative fournie par l'utilisateur à utiliser pour effectuer des écritures fenêtrées.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub d'entrée doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Cloud Pub/Sub permettant de s'abonner à la consultation de messages. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory Répertoire de sortie dans lequel les fichiers de sortie Avro seront archivés. Veuillez ajouter / à la fin. Par exemple : gs://example-bucket/example-directory/.
avroTempDirectory Répertoire des fichiers Avro temporaires. Veuillez ajouter / à la fin. Par exemple : gs://example-bucket/example-directory/.
outputFilenamePrefix [Facultatif] Préfixe du nom de fichier de sortie pour les fichiers Avro.
outputFilenameSuffix [Facultatif] Suffixe du nom de fichier de sortie pour les fichiers Avro.
outputShardTemplate [Facultatif] Modèle de partition du fichier de sortie. Spécifié en tant que séquences répétées des lettres "S" ou "N" (exemple : SSS-NNN), qui sont remplacées par le numéro de partition ou par le nombre de partitions, respectivement. Le format de modèle par défaut est "W-P-SS-of-NN" lorsque ce paramètre n'est pas spécifié.
numShards [Facultatif] Nombre maximal de partitions de sortie générées lors de l'écriture. Le nombre maximal de partitions par défaut est de 1.
windowDuration [Facultatif] Durée de la fenêtre dans laquelle les données seront écrites. La valeur par défaut est "5m". Les formats autorisés sont les suivants : Ns (pour les secondes, exemple : 5s), Nm (pour les minutes, exemple : 12m), Nh (pour les heures, exemple : 2h).

Exécuter le modèle Pub/Sub vers Cloud Storage Avro

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Pub/Sub to Cloud Storage Avro template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_BUCKET_NAME par le nom du bucket Cloud Storage.
  • Remplacez FILENAME_PREFIX par le préfixe de nom de fichier de sortie souhaité.
  • Remplacez FILENAME_SUFFIX par le suffixe de nom de fichier de sortie souhaité.
  • Remplacez SHARD_TEMPLATE par le modèle de partition de sortie souhaité.
  • Remplacez NUM_SHARDS par le nombre de partitions de sortie.
  • Remplacez WINDOW_DURATION par la durée de fenêtre de sortie.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Avro \
        --parameters \
    inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
    outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
    avroTempDirectory=gs://YOUR_BUCKET_NAME/temp/,\
    outputFilenamePrefix=FILENAME_PREFIX,\
    outputFilenameSuffix=FILENAME_SUFFIX,\
    outputShardTemplate=SHARD_TEMPLATE,\
    numShards=NUM_SHARDS,\
    windowDuration=WINDOW_DURATION
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_BUCKET_NAME par le nom du bucket Cloud Storage.
  • Remplacez FILENAME_PREFIX par le préfixe de nom de fichier de sortie souhaité.
  • Remplacez FILENAME_SUFFIX par le suffixe de nom de fichier de sortie souhaité.
  • Remplacez SHARD_TEMPLATE par le modèle de partition de sortie souhaité.
  • Remplacez NUM_SHARDS par le nombre de partitions de sortie.
  • Remplacez WINDOW_DURATION par la durée de fenêtre de sortie.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Avro
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
           "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
           "avroTempDirectory": "gs://YOUR_BUCKET_NAME/temp/",
           "outputFilenamePrefix": "FILENAME_PREFIX",
           "outputFilenameSuffix": "FILENAME_SUFFIX",
           "outputShardTemplate": "SHARD_TEMPLATE",
           "numShards": "NUM_SHARDS",
           "windowDuration": "WINDOW_DURATION"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Pub/Sub vers texte Cloud Storage

Le modèle Pub/Sub vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub doit exister avant l'exécution.
  • Les messages publiés sur le thème doivent être au format texte.
  • Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple, gs://bucket-name/path/. Cette valeur doit se terminer par une barre oblique.
outputFilenamePrefix Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple : output-
outputFilenameSuffix Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv.
outputShardTemplate Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données vont atterrir dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NNW correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate sera 00-of-01.

Exécuter le modèle Pub/Sub vers texte Cloud Storage

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Pub/Sub to Cloud Storage Text template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_BUCKET_NAME par le nom du bucket Cloud Storage.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text \
        --parameters \
    inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
    outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
    outputFilenamePrefix=output-,\
    outputFilenameSuffix=.txt
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_BUCKET_NAME par le nom du bucket Cloud Storage.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
           "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
           "outputFilenamePrefix": "output-",
           "outputFilenameSuffix": ".txt",
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Texte Cloud Storage vers BigQuery (Flux)

Le pipeline de texte Cloud Storage vers BigQuery est un pipeline de streaming qui vous permet de lire des fichiers texte stockés dans Cloud Storage, de les transformer à l'aide de la fonction JavaScript définie par l'utilisateur que vous fournissez, et de générer le résultat dans BigQuery.

Conditions requises pour ce pipeline :

  • Créez un fichier de schéma BigQuery au format JSON qui décrit votre table de sortie.
        {
            'fields': [{
                'name': 'location',
                'type': 'STRING'
            }, {
                'name': 'name',
                'type': 'STRING'
            }, {
                'name': 'age',
                'type': 'STRING',
            }, {
                'name': 'color',
                'type': 'STRING'
            }, {
                'name': 'coffee',
                'type': 'STRING',
                'mode': 'REQUIRED'
            }, {
                'name': 'cost',
                'type': 'NUMERIC',
                'mode': 'REQUIRED'
            }]
        }
        
  • Créez un fichier JavaScript (.js) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique permettant transformer les lignes de texte. Notez que votre fonction doit renvoyer une chaîne JSON.

    Par exemple, cette fonction divise chaque ligne d'un fichier CSV et renvoie une chaîne JSON après avoir transformé les valeurs.

        function transform(line) {
        var values = line.split(',');
    
        var obj = new Object();
        obj.location = values[0];
        obj.name = values[1];
        obj.age = values[2];
        obj.color = values[3];
        obj.coffee = values[4];
        var jsonString = JSON.stringify(obj);
    
        return jsonString;
        }
        

Paramètres de modèle

Paramètres Description
javascriptTextTransformGcsPath Emplacement Cloud Storage de votre fichier UDF JavaScript. Exemple : gs://my_bucket/my_function.js.
JSONPath Emplacement Cloud Storage de votre fichier de schéma BigQuery, décrit comme fichier JSON. Exemple : gs://path/to/my/schema.json.
javascriptTextTransformFunctionName Nom de la fonction JavaScript que vous souhaitez appeler en tant que fonction définie par l'utilisateur. Exemple : transform.
outputTable Nom complet de la table BigQuery. Par exemple : my-project:dataset.table
inputFilePattern Emplacement Cloud Storage du texte que vous souhaitez traiter. Exemple : gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Répertoire temporaire pour le processus de chargement de BigQuery. Par exemple : gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table des messages n'ayant pas pu atteindre la table de sortie (c'est-à-dire table de lettres mortes). Exemple : my-project:dataset.my-deadletter-table. Si elle n'existe pas, elle sera créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.

Exécuter le modèle Texte Cloud Storage vers BigQuery (Flux)

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Cloud Storage Text to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_JAVASCRIPT_FUNCTION par le nom de votre UDF.
  • Remplacez PATH_TO_BIGQUERY_SCHEMA_JSON par le chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma.
  • Remplacez PATH_TO_JAVASCRIPT_UDF_FILE par le chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript.
  • Remplacez PATH_TO_YOUR_TEXT_DATA par le chemin d'accès Cloud Storage à votre ensemble de données textuelles.
  • Remplacez BIGQUERY_TABLE par le nom de votre table BigQuery.
  • Remplacez BIGQUERY_DEADLETTER_TABLE par le nom de votre table de lettres mortes BigQuery.
  • Remplacez PATH_TO_TEMP_DIR_ON_GCS par le chemin d'accès Cloud Storage au répertoire temporaire.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
        --parameters \
    javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
    JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
    javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
    inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
    outputTable=BIGQUERY_TABLE,\
    outputDeadletterTable=BIGQUERY_DEADLETTER_TABLE,\
    bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_JAVASCRIPT_FUNCTION par le nom de votre UDF.
  • Remplacez PATH_TO_BIGQUERY_SCHEMA_JSON par le chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma.
  • Remplacez PATH_TO_JAVASCRIPT_UDF_FILE par le chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript.
  • Remplacez PATH_TO_YOUR_TEXT_DATA par le chemin d'accès Cloud Storage à votre ensemble de données textuelles.
  • Remplacez BIGQUERY_TABLE par le nom de votre table BigQuery.
  • Remplacez BIGQUERY_DEADLETTER_TABLE par le nom de votre table de lettres mortes BigQuery.
  • Remplacez PATH_TO_TEMP_DIR_ON_GCS par le chemin d'accès Cloud Storage au répertoire temporaire.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
           "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
           "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
           "inputFilePattern":"PATH_TO_YOUR_TEXT_DATA",
           "outputTable":"BIGQUERY_TABLE",
           "outputDeadletterTable":"BIGQUERY_DEADLETTER_TABLE",
           "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Texte Cloud Storage vers Pub/Sub (Flux)

Le modèle crée un pipeline de streaming qui interroge en permanence les nouveaux fichiers texte chargés dans Cloud Storage, lit chaque fichier ligne par ligne, et publie des chaînes vers Pub/Sub. Il publie les enregistrements dans un fichier délimité par une nouvelle ligne contenant des enregistrements JSON ou un fichier CSV dans une rubrique Pub/Sub pour un traitement en temps réel. Vous pouvez utiliser ce modèle pour relire les données dans Pub/Sub.

Actuellement, l'intervalle d'interrogation est fixé à 10 secondes. Ce modèle ne définit aucun horodatage sur les enregistrements individuels. Par conséquent, l'heure de l'événement sera égale à l'heure de publication pendant l'exécution. Si votre pipeline dépend d'une heure d'événement précise pour le traitement, ne l'utilisez pas.

Conditions requises pour ce pipeline :

  • Les fichiers d'entrée doivent être au format CSV ou JSON délimité par une nouvelle ligne. Les enregistrements couvrant plusieurs lignes dans les fichiers source peuvent entraîner des problèmes en aval, car chaque ligne dans les fichiers sera publiée sous forme de message à Pub/Sub.
  • Le sujet Pub/Sub doit exister avant l'exécution.
  • Le pipeline fonctionne indéfiniment et doit être terminé manuellement.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/files/*.json.
outputTopic Sujet d'entrée Pub/Sub dans lequel écrire. Le nom doit être au format projects/<project-id>/topics/<topic-name>.

Exécuter le modèle Texte Cloud Storage vers Pub/Sub (Flux)

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Cloud Storage Text to Pub/Sub (Stream) template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_BUCKET_NAME par le nom du bucket Cloud Storage.
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
        --parameters \
    inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
    outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID de votre projet.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_BUCKET_NAME par le nom du bucket Cloud Storage.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
           "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
       },
       "environment": { "zone": "us-central1-f" }
    }
    

Masquage/Tokenisation de données à l'aide de Cloud DLP à partir de Cloud Storage vers BigQuery (Flux)

Le modèle Masquage/Tokenisation de données à l'aide de Cloud DLP à partir de Cloud Storage vers BigQuery est un pipeline de streaming qui lit les fichiers csv d'un bucket Cloud Storage, appelle l'API Cloud Data Prevention (Cloud DLP) pour supprimer l'identification des données, et écrit les données dont l'identification a été supprimée dans la table BigQuery spécifiée. Ce modèle accepte l'utilisation d'un modèle d'inspection Cloud DLP et d'un modèle de suppression d'identification Cloud DLP. Cela permet aux utilisateurs d'inspecter du texte pour identifier les données potentiellement sensibles, et de supprimer l'identification des données sensibles détectées. Ils ont également la possibilité de supprimer l'identification de données structurées lorsque des colonnes sont spécifiées et qu'aucune inspection n'est nécessaire.

Conditions requises pour ce pipeline :

  • Les données d'entrée à tokeniser doivent exister.
  • Les modèles Cloud DLP doivent exister (par exemple, DeidentifyTemplate et InspectTemplate). Pour en savoir plus, consultez la page Modèles Cloud DLP.
  • L'ensemble de données BigQuery doit exister.

Paramètres de modèle

Paramètres Description
inputFilePattern Fichiers csv à partir desquels lire les enregistrements de données d'entrée. Les caractères génériques sont également acceptés. Par exemple, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv.
dlpProjectId ID du projet Cloud DLP propriétaire de la ressource API Cloud DLP. Il peut s'agir du projet propriétaire des modèles Cloud DLP ou d'un autre projet. Par exemple, my_dlp_api_project.
deidentifyTemplateName Modèle de suppression d'identification Cloud DLP à utiliser pour les requêtes d'API, spécifié à l'aide du schéma projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Exemple :projects/my_project/deidentifyTemplates/100
datasetName Ensemble de données BigQuery pour l'envoi des résultats tokenisés.
batchSize Taille du lot pour l'envoi des données à inspecter et/ou à détokeniser. Dans le cas d'un fichier csv, batchSize correspond au nombre de lignes dans un lot. Les utilisateurs doivent déterminer la taille du lot en fonction de la taille des enregistrements et de la taille du fichier. Notez que l'API Cloud DLP limite la taille de la charge utile à 524 Ko par appel d'API.
inspectTemplateName [Facultatif] Modèle d'inspection Cloud DLP à utiliser pour les requêtes d'API, spécifié à l'aide du schéma projects/{template_project_id}/identifyTemplates/{idTemplateId}. Exemple :projects/my_project/identifyTemplates/100

Exécuter le modèle Masquage/Tokenisation de données à l'aide de Cloud DLP à partir de Cloud Storage vers BigQuery

CONSOLE

Exécuter depuis la Google Cloud Console
  1. Accédez à la page Dataflow de Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton
  5. Sélectionnez the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

GCLOUD

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_TEMPLATE_PROJECT_ID par l'ID de projet de votre modèle.
  • Remplacez YOUR_DLP_API_PROJECT_ID par l'ID de projet de votre API Cloud DLP.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_INPUT_DATA par le chemin d'accès du fichier d'entrée.
  • Remplacez YOUR_DEIDENTIFY_TEMPLATE par le numéro du modèle de suppression d'identification Cloud DLP.
  • Remplacez YOUR_DATASET_NAME par le nom de l'ensemble de données BigQuery.
  • Remplacez YOUR_INSPECT_TEMPLATE par le numéro du modèle d'inspection Cloud DLP.
  • Remplacez BATCH_SIZE_VALUE par la taille du lot (nombre de lignes par API pour les données csv).
    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
        --parameters \
    inputFilePattern=YOUR_INPUT_DATA,\
    dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
    deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
    inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
    datasetName=YOUR_DATASET,\
    batchSize=BATCH_SIZE_VALUE
    

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_TEMPLATE_PROJECT_ID par l'ID de projet de votre modèle.
  • Remplacez YOUR_DLP_API_PROJECT_ID par l'ID de projet de votre API Cloud DLP.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez YOUR_INPUT_DATA par le chemin d'accès du fichier d'entrée.
  • Remplacez YOUR_DEIDENTIFY_TEMPLATE par le numéro du modèle de suppression d'identification Cloud DLP.
  • Remplacez YOUR_DATASET_NAME par le nom de l'ensemble de données BigQuery.
  • Remplacez YOUR_INSPECT_TEMPLATE par le numéro du modèle d'inspection Cloud DLP.
  • Remplacez BATCH_SIZE_VALUE par la taille du lot (nombre de lignes par API pour les données csv).
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
          "inputFilePattern":YOUR_INPUT_DATA,
          "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
          "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
          "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
          "datasetName": "YOUR_DATASET",
          "batchSize": "BATCH_SIZE_VALUE"
       },
       "environment": { "zone": "us-central1-f" }
    }