Modèles de streaming fournis par Google

Google fournit un ensemble de modèles Dataflow Open Source. Des informations générales sur les modèles sont disponibles sur la page Présentation. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.

Cette page répertorie les modèles de streaming suivants :

Abonnement Pub/Sub vers BigQuery

Le modèle Abonnement Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un abonnement Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline :

  • Les messages Pub/Sub doivent être au format JSON, comme décrit ici. Par exemple, vous pouvez insérer des messages au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant le type de données "chaîne".
  • La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription>.
outputTableSpec Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>
outputDeadletterTable Table BigQuery des messages qui n'ont pas pu atteindre la table de sortie, au format <my-project>:<my-dataset>.<my-table>. Si elle n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.

Exécuter le modèle Abonnement Pub/Sub vers BigQuery

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub Subscription to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet.
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet.
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Sujet Pub/Sub vers BigQuery

Le modèle Sujet Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un sujet Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline :

  • Les messages Pub/Sub doivent être au format JSON, comme décrit ici. Par exemple, vous pouvez insérer des messages au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant le type de données "chaîne".
  • La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.

Paramètres de modèle

Paramètres Description
inputTopic Sujet d'entrée Pub/Sub à lire, au format projects/<project>/topics/<topic>.
outputTableSpec Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>
outputDeadletterTable La table BigQuery des messages n'ayant pas pu atteindre la table de sortie Elle doit être au format <my-project>:<my-dataset>.<my-table>. Si elle n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.

Exécuter le modèle Sujet Pub/Sub vers BigQuery

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub Topic to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro vers BigQuery

Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub d'entrée doit exister.
  • Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
  • Le sujet Pub/Sub non traité doit exister.
  • L'ensemble de données BigQuery de sortie doit exister.

Paramètres de modèle

Paramètres Description
schemaPath Emplacement Cloud Storage du fichier de schéma Avro. Par exemple, gs://path/to/my/schema.avsc.
inputSubscription Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription>.
outputTopic Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name>.
outputTableSpec Emplacement de la table de sortie BigQuery. Par exemple, <my-project>:<my-dataset>.<my-table>. Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur.
writeDisposition (Facultatif) La propriété WriteDisposition de BigQuery. Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Par défaut, WRITE_APPEND.
createDisposition (Facultatif) La propriété CreateDisposition de BigQuery. Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. Par défaut, CREATE_IF_NEEDED.

Exécuter le modèle Pub/Sub Avro vers BigQuery

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub Avro to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécution à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : nom de la région Dataflow (par exemple, us-central1).
  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Exécution à partir de l'API REST

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : nom de la région Dataflow (par exemple, us-central1).
  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub vers Pub/Sub

Le modèle Pub/Sub vers Pub/Sub est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et les écrit dans un autre sujet Pub/Sub. Le pipeline accepte également une clé facultative d'attribut de message et une valeur qui peut être utilisée pour filtrer les messages devant être écrits dans le sujet Pub/Sub. Vous pouvez utiliser ce modèle pour copier des messages d'un abonnement Pub/Sub à un autre sujet Pub/Sub avec un filtre de message facultatif.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub source doit exister avant l'exécution.
  • Le sujet Pub/Sub de destination doit exister avant l'exécution.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Sujet Cloud Pub/Sub dans lequel écrire la sortie. Par exemple, projects/<project-id>/topics/<topic-name>.
filterKey [Facultatif] Filtrez les événements en fonction d'une clé d'attribut. Aucun filtre n'est appliqué si filterKey n'est pas spécifié.
filterValue [Facultatif] Valeur d'attribut de filtre à utiliser dans le cas où un filterKey est fourni. Une valeur filterValue nulle est utilisée par défaut.

Exécuter le modèle Pub/Sub vers Pub/Sub

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub to Pub/Sub template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOPIC_NAME : nom du sujet Pub/Sub
  • FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
  • FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (sous-chaîne, par exemple) ne seront pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOPIC_NAME : nom du sujet Pub/Sub
  • FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
  • FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (sous-chaîne, par exemple) ne seront pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub vers Splunk

Le modèle Pub/Sub vers Splunk est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et écrit leur charge utile dans Splunk via la solution HEC (HTTP Event Collector) de Splunk. Le cas d'utilisation le plus courant de ce modèle consiste à exporter les journaux vers Splunk. Pour découvrir un exemple de workflow sous-jacent, consultez la section Déployer des exportations de journaux prêtes pour la production vers Splunk à l'aide de Dataflow.

Avant d'écrire vers Splunk, vous pouvez également appliquer une fonction JavaScript définie par l'utilisateur vers la charge utile du message. Tous les messages dont le traitement échoue sont transférés vers un sujet Pub/Sub non traité en vue d'opérations de dépannage supplémentaires et d'un nouveau traitement.

Pour ajouter une couche de protection à votre jeton HEC, vous pouvez également transmettre une clé Cloud KMS ainsi que le paramètre de jeton HEC encodé en base64 chiffré avec cette clé. Pour en savoir plus sur le chiffrement des paramètres du jeton HEC, consultez la page sur le point de terminaison du chiffrement de l'API Cloud KMS.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub source doit exister avant l'exécution du pipeline.
  • Le sujet Pub/Sub non traité doit exister avant l'exécution du pipeline.
  • Le point de terminaison Splunk HEC doit être accessible à partir du réseau de nœuds de calcul Dataflow.
  • Le jeton de la solution HEC de Splunk doit être généré et disponible.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
token Jeton d'authentification HEC Splunk. Cette chaîne encodée en base64 peut être chiffrée avec une clé Cloud KMS pour plus de sécurité.
url URL HEC Splunk. Il doit être routable depuis le VPC dans lequel le pipeline est exécuté. Par exemple, https://splunk-hec-host:8088.
outputDeadletterTopic Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Facultatif] Chemin d'accès Cloud Storage contenant tout votre code JavaScript. Par exemple, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Facultatif] Nom de la fonction JavaScript à appeler. Par exemple, si votre fonction JavaScript est function myTransform(inJson) { ...dostuff...}, le nom de la fonction est myTransform.
batchCount [Facultatif] Taille de lot pour l'envoi de plusieurs événements vers Splunk. Valeur par défaut 1 (pas de traitement par lots).
parallelism [Facultatif] Nombre maximal de demandes en parallèle. Valeur par défaut 1 (aucun parallélisme).
disableCertificateValidation [Facultatif] Désactiver la validation du certificat SSL. Valeur par défaut "false" (validation activée).
includePubsubMessage [Facultatif] Inclure le message Pub/Sub complet dans la charge utile. Valeur "false" par défaut (seul l'élément de données est inclus dans la charge utile).
tokenKMSEncryptionKey [Facultatif] Clé Cloud KMS permettant de déchiffrer la chaîne du jeton HEC. Si la clé Cloud KMS est fournie, la chaîne du jeton HEC doit être transmise sous forme chiffrée.

Exécuter le modèle Pub/Sub vers Splunk

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub to Splunk template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOKEN : jeton HTTP Event Collector de Splunk
  • URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
  • JAVASCRIPT_FUNCTION : nom de votre fonction JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE : chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript (par exemple, gs://your-bucket/your-function.js)
  • BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
  • PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
  • DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOKEN : jeton HTTP Event Collector de Splunk
  • URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
  • JAVASCRIPT_FUNCTION : nom de votre fonction JavaScript
  • PATH_TO_JAVASCRIPT_UDF_FILE : chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript (par exemple, gs://your-bucket/your-function.js)
  • BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
  • PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
  • DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub vers fichiers Avro dans Cloud Storage

Le modèle Pub/Sub vers fichiers Avro dans Cloud Storage est un pipeline de streaming qui lit les données d'un sujet Pub/Sub et écrit des fichiers Avro dans le bucket Cloud Storage spécifié.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub d'entrée doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Cloud Pub/Sub permettant de s'abonner à la consultation de messages. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory Répertoire de sortie dans lequel les fichiers de sortie Avro seront archivés. Ajoutez / à la fin. Exemple : gs://example-bucket/example-directory/.
avroTempDirectory Répertoire des fichiers Avro temporaires. Veuillez ajouter / à la fin. Par exemple : gs://example-bucket/example-directory/.
outputFilenamePrefix [Facultatif] Préfixe du nom de fichier de sortie pour les fichiers Avro.
outputFilenameSuffix [Facultatif] Suffixe du nom de fichier de sortie pour les fichiers Avro.
outputShardTemplate [Facultatif] Modèle de partition du fichier de sortie. Spécifié en tant que séquences répétées des lettres "S" ou "N" (exemple : SSS-NNN), qui sont remplacées par le numéro de partition ou par le nombre de partitions, respectivement. Le format de modèle par défaut est "W-P-SS-of-NN" lorsque ce paramètre n'est pas spécifié.
numShards [Facultatif] Nombre maximal de partitions de sortie générées lors de l'écriture. Le nombre maximal de segments par défaut est de 1.

Exécuter le modèle Pub/Sub vers Cloud Storage Avro

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub to Cloud Storage Avro template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom du sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage
  • FILENAME_PREFIX : préfixe du nom de fichier de sortie préféré
  • FILENAME_SUFFIX : suffixe du nom de fichier de sortie préféré
  • SHARD_TEMPLATE : modèle de partition de sortie préféré
  • NUM_SHARDS : nombre de partitions de sortie
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom du sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage
  • FILENAME_PREFIX : préfixe du nom de fichier de sortie préféré
  • FILENAME_SUFFIX : suffixe du nom de fichier de sortie préféré
  • SHARD_TEMPLATE : modèle de partition de sortie préféré
  • NUM_SHARDS : nombre de partitions de sortie
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub vers des fichiers texte dans Cloud Storage

Le modèle Pub/Sub vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub doit exister avant l'exécution.
  • Les messages publiés sur le thème doivent être au format texte.
  • Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/. Cette valeur doit se terminer par une barre oblique.
outputFilenamePrefix Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple : output-
outputFilenameSuffix Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv.
outputShardTemplate Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données vont atterrir dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NNW correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate sera 00-of-01.

Exécuter le modèle Pub/Sub vers des fichiers texte dans Cloud Storage

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Pub/Sub to Text Files on Cloud Storage template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub vers MongoDB

Le modèle Pub/Sub vers MongoDB est un pipeline de streaming qui lit les messages encodés au format JSON d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents. Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction JavaScript définie par l'utilisateur. Toute erreur survenue en raison d'une non-concordance du schéma ou d'un format JSON non valide, ou pendant l'exécution de transformations, est enregistrée avec le message d'entrée dans une table BigQuery destinée aux messages non traités. Si la table des enregistrements non traités n'existe pas avant l'exécution, le pipeline la crée automatiquement.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
  • Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.

Paramètres de modèle

Paramètres Description
inputSubscription Nom de l'abonnement Pub/Sub. Par exemple : projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri Liste de serveurs MongoDB séparés par une virgule. Par exemple : 192.285.234.12:27017,192.287.123.11:27017
database Base de données dans MongoDB pour stocker la collection. Exemple : my-db.
collection Nom de la collection dans la base de données MongoDB. Exemple : my-collection.
deadletterTable Table BigQuery qui stocke les messages en raison d'échecs (schéma non correspondant, format JSON non valide, etc.). Exemple : project-id:dataset-name.table-name.
javascriptTextTransformGcsPath [Facultatif] Emplacement Cloud Storage du fichier JavaScript contenant la transformation définie par l'utilisateur. Exemple : gs://mybucket/filename.json.
javascriptTextTransformFunctionName [Facultatif] Nom de la fonction JavaScript définie par l'utilisateur. Exemple : transform.
batchSize [Facultatif] Taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. Valeur par défaut : 1000
batchSizeBytes [Facultatif] Taille du lot en octets. Valeur par défaut : 5242880
maxConnectionIdleTime [Facultatif] Durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. Valeur par défaut : 60000
sslEnabled [Facultatif] Valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. Valeur par défaut : true
ignoreSSLCertificate [Facultatif] Valeur booléenne indiquant si le certificat SSL doit être ignoré. Valeur par défaut : true
withOrdered [Facultatif] Valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. Valeur par défaut : true
withSSLInvalidHostNameAllowed [Facultatif] Valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. Valeur par défaut : true

Exécuter le modèle Pub/Sub vers MongoDB

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez Pub/Sub to MongoDB template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécution à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet.
  • REGION_NAME : nom de la région Dataflow (par exemple, us-central1).
  • JOB_NAME : nom de la tâche de votre choix
  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/<project-id>/subscriptions/<subscription-name>).
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Exécution à partir de l'API REST

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet.
  • LOCATION : nom de la région Dataflow (par exemple, us-central1).
  • JOB_NAME : nom de la tâche de votre choix
  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/<project-id>/subscriptions/<subscription-name>).
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Fichiers texte dans Cloud Storage vers BigQuery (Flux)

Les fichiers texte dans Cloud Storage vers BigQuery sont un pipeline de streaming qui vous permet de lire des fichiers texte stockés dans Cloud Storage, de les transformer à l'aide de la fonction JavaScript définie par l'utilisateur que vous fournissez, et de générer le résultat vers BigQuery.

Le pipeline s'exécute indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation "Watch", qui est une fonction "SplittableDoFn" qui n'est pas compatible avec le drainage.

Conditions requises pour ce pipeline :

  • Créez un fichier JSON décrivant le schéma de votre table de sortie dans BigQuery.

    Assurez-vous qu'il existe un tableau JSON de niveau supérieur intitulé BigQuery Schema et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemple :

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Créez un fichier JavaScript (.js) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte. Notez que votre fonction doit renvoyer une chaîne JSON.

    Par exemple, cette fonction divise chaque ligne d'un fichier CSV et renvoie une chaîne JSON après avoir transformé les valeurs.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Paramètres de modèle

Paramètres Description
javascriptTextTransformGcsPath Emplacement Cloud Storage de votre fonction JavaScript définie par l'utilisateur. Exemple : gs://my_bucket/my_function.js.
JSONPath Emplacement Cloud Storage de votre fichier de schéma BigQuery, décrit comme fichier JSON. Exemple : gs://path/to/my/schema.json.
javascriptTextTransformFunctionName Nom de la fonction JavaScript que vous souhaitez appeler en tant que fonction définie par l'utilisateur. Exemple : transform.
outputTable Nom complet de la table BigQuery. Par exemple : my-project:dataset.table
inputFilePattern Emplacement Cloud Storage du texte que vous souhaitez traiter. Exemple : gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Répertoire temporaire pour le processus de chargement de BigQuery. Par exemple : gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table des messages qui n'ont pas pu atteindre la table de sortie. Exemple : my-project:dataset.my-unprocessed-table. Si elle n'existe pas, elle sera créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.

Exécuter le modèle Texte Cloud Storage vers BigQuery (Flux)

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Cloud Storage Text to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION : nom de votre fonction définie par l'utilisateur (UDF)
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • PATH_TO_JAVASCRIPT_UDF_FILE : chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript
  • PATH_TO_TEXT_DATA : chemin d'accès Cloud Storage à votre ensemble de données texte
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE : nom de votre table BigQuery pour les messages non traités
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION : nom de votre fonction définie par l'utilisateur (UDF)
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • PATH_TO_JAVASCRIPT_UDF_FILE : chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript
  • PATH_TO_TEXT_DATA : chemin d'accès Cloud Storage à votre ensemble de données texte
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE : nom de votre table BigQuery pour les messages non traités
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Fichiers texte sur Cloud Storage vers Pub/Sub (Flux)

Le modèle crée un pipeline de streaming qui interroge en permanence les nouveaux fichiers texte chargés dans Cloud Storage, lit chaque fichier ligne par ligne et publie des chaînes dans un sujet Pub/Sub. Le modèle publie les enregistrements dans un fichier délimité par une nouvelle ligne contenant des enregistrements JSON ou un fichier CSV dans un sujet Pub/Sub pour un traitement en temps réel. Vous pouvez utiliser ce modèle pour relire les données dans Pub/Sub.

Le pipeline s'exécute indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation "Watch", qui est une fonction "SplittableDoFn" qui n'est pas compatible avec le drainage.

Actuellement, l'intervalle d'interrogation est fixé à 10 secondes. Ce modèle ne définit aucun horodatage sur les enregistrements individuels. Par conséquent, l'heure de l'événement sera égale à l'heure de publication pendant l'exécution. Si votre pipeline dépend d'une heure d'événement précise pour le traitement, ne l'utilisez pas.

Conditions requises pour ce pipeline :

  • Les fichiers d'entrée doivent être au format CSV ou JSON délimité par une nouvelle ligne. Les enregistrements couvrant plusieurs lignes dans les fichiers sources peuvent entraîner des problèmes en aval, car chaque ligne dans les fichiers sera publiée sous forme de message à Pub/Sub.
  • Le sujet Pub/Sub doit exister avant l'exécution.
  • Le pipeline fonctionne indéfiniment et doit être terminé manuellement.

Paramètres de modèle

Paramètres Description
inputFilePattern Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/files/*.json ou gs://bucket-name/path/*.csv.
outputTopic Sujet d'entrée Pub/Sub dans lequel écrire. Le nom doit être au format projects/<project-id>/topics/<topic-name>.

Exécuter le modèle de fichiers texte dans Cloud Storage vers Pub/Sub (Flux)

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Text Files on Cloud Storage to Pub/Sub (Stream) template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom de votre bucket Cloud Storage
  • FILE_PATTERN : modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple, path/*.csv)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • PROJECT_ID : ID de votre projet
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom de votre bucket Cloud Storage
  • FILE_PATTERN : modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple, path/*.csv)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Masquage/Tokenisation de données de Cloud Storage vers BigQuery (à l'aide de Cloud DLP)

Le modèle Masquage/Tokenisation de données de Cloud Storage vers BigQuery (à l'aide de Cloud DLP) est un pipeline de streaming qui lit les fichiers CSV d'un bucket Cloud Storage, appelle l'API Cloud Data Loss Prevention (Cloud DLP) pour les anonymiser puis écrit les données anonymisées dans la table BigQuery spécifiée. Ce modèle accepte l'utilisation d'un modèle d'inspection Cloud DLP et d'un modèle de suppression d'identification Cloud DLP. Cela permet aux utilisateurs d'inspecter du texte pour identifier les données potentiellement sensibles, et de supprimer l'identification des données sensibles détectées. Ils ont également la possibilité de supprimer l'identification de données structurées lorsque des colonnes sont spécifiées et qu'aucune inspection n'est nécessaire.

Conditions requises pour ce pipeline :

  • Les données d'entrée à tokeniser doivent exister.
  • Les modèles Cloud DLP doivent exister (par exemple, DeidentifyTemplate et InspectTemplate). Pour en savoir plus, consultez la page Modèles Cloud DLP.
  • L'ensemble de données BigQuery doit exister.

Paramètres de modèle

Paramètres Description
inputFilePattern Fichiers csv à partir desquels lire les enregistrements de données d'entrée. Les caractères génériques sont également acceptés. Par exemple, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv.
dlpProjectId ID du projet Cloud DLP propriétaire de la ressource API Cloud DLP. Il peut s'agir du projet propriétaire des modèles Cloud DLP ou d'un autre projet. Par exemple, my_dlp_api_project.
deidentifyTemplateName Modèle d'anonymisation Cloud DLP à utiliser pour les requêtes d'API, spécifié à l'aide du schéma projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Exemple :projects/my_project/deidentifyTemplates/100
datasetName Ensemble de données BigQuery pour l'envoi des résultats tokenisés.
batchSize Taille du lot pour l'envoi des données à inspecter et/ou à détokeniser. Dans le cas d'un fichier csv, batchSize correspond au nombre de lignes dans un lot. Les utilisateurs doivent déterminer la taille du lot en fonction de la taille des enregistrements et de la taille du fichier. Notez que l'API Cloud DLP limite la taille de la charge utile à 524 Ko par appel d'API.
inspectTemplateName [Facultatif] Modèle d'inspection Cloud DLP à utiliser pour les requêtes d'API, spécifié à l'aide du schéma projects/{template_project_id}/identifyTemplates/{idTemplateId}. Exemple :projects/my_project/identifyTemplates/100

Exécuter le modèle Masquage/Tokenisation de données de Cloud Storage vers BigQuery (à l'aide de Cloud DLP)

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécuter à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Remplacez l'élément suivant :

  • TEMPLATE_PROJECT_ID : ID de votre projet de modèle
  • DLP_API_PROJECT_ID : ID de votre projet dans l'API Cloud DLP
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • INPUT_DATA : chemin d'accès de votre fichier d'entrée
  • DEIDENTIFY_TEMPLATE : numéro de modèle d'anonymisation Cloud DLP
  • DATASET_NAME : nom de l'ensemble de données BigQuery
  • INSPECT_TEMPLATE_NUMBER : numéro du modèle d'inspection Cloud DLP
  • BATCH_SIZE_VALUE : taille du lot (nombre de lignes par API pour les données CSV)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

Exécuter à partir de l'API REST

Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • TEMPLATE_PROJECT_ID : ID de votre projet de modèle
  • DLP_API_PROJECT_ID : ID de votre projet dans l'API Cloud DLP
  • JOB_NAME : nom de la tâche de votre choix
  • REGION : point de terminaison régional (par exemple, us-west1)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • INPUT_DATA : chemin d'accès de votre fichier d'entrée
  • DEIDENTIFY_TEMPLATE : numéro de modèle d'anonymisation Cloud DLP
  • DATASET_NAME : nom de l'ensemble de données BigQuery
  • INSPECT_TEMPLATE_NUMBER : numéro du modèle d'inspection Cloud DLP
  • BATCH_SIZE_VALUE : taille du lot (nombre de lignes par API pour les données CSV)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub (Flux)

Le modèle Capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub est un pipeline de streaming qui lit les messages Pub/Sub avec des données modifiées provenant d'une base de données MySQL et écrit les enregistrements dans BigQuery. Un connecteur Debezium enregistre les modifications apportées à la base de données MySQL et publie les données modifiées dans Pub/Sub. Le modèle lit ensuite les messages Pub/Sub et les écrit dans BigQuery.

Vous pouvez utiliser ce modèle pour synchroniser des bases de données MySQL et des tables BigQuery. Le pipeline écrit les données modifiées dans une table de préproduction BigQuery et met à jour une table BigQuery par intermittence en répliquant la base de données MySQL.

Conditions requises pour ce pipeline :

  • Le connecteur Debezium doit être déployé.
  • Les messages Pub/Sub doivent être sérialisés dans une classe Beam Row.

Paramètres de modèle

Paramètres Description
inputSubscriptions Liste des abonnements en entrée Pub/Sub à lire séparés par une virgule, au format <subscription>,<subscription>, ....
changeLogDataset Ensemble de données BigQuery permettant de stocker les tables de préproduction, au format <my-dataset>.
replicaDataset Emplacement de l'ensemble de données BigQuery dans lequel stocker les tables dupliquées, au format <my-dataset>.
updateFrequencySecs (Facultatif) Intervalle auquel le pipeline met à jour la table BigQuery en répliquant la base de données MySQL.

Exécuter la capture de données modifiées avec Debezium et MySQL à partir du modèle Pub/Sub vers BigQuery

Pour exécuter ce modèle, procédez comme suit :

  1. Sur votre ordinateur local, clonez le dépôt DataflowTemplates.
  2. Accédez au répertoire v2/cdc-parent.
  3. Assurez-vous que le connecteur Debezium est déployé.
  4. Exécutez le modèle Dataflow à l'aide de Maven.

    Remplacez les valeurs suivantes :

    • PROJECT_ID : ID de votre projet.
    • YOUR_SUBSCRIPTIONS : liste des noms de vos abonnements Pub/Sub, séparés par une virgule.
    • YOUR_CHANGELOG_DATASET : ensemble de données BigQuery pour les données du journal des modifications.
    • YOUR_REPLICA_DATASET : ensemble de données BigQuery pour les tables dupliquées.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka vers BigQuery

Le modèle Apache Kafka vers BigQuery est un pipeline de streaming qui ingère les données textuelles issues d'Apache Kafka, exécute une fonction définie par l'utilisateur et génère les enregistrements obtenus dans BigQuery. Les erreurs survenant lors de la transformation des données, de l'exécution de la fonction définie par l'utilisateur ou de l'insertion dans la table de sortie sont enregistrées dans une table d'erreurs distincte dans BigQuery. Si la table d'erreurs n'existe pas avant l'exécution, elle est créée.

Conditions requises pour ce pipeline

  • La table BigQuery de sortie doit exister.
  • Le serveur de courtiers Apache Kafka doit être en cours d'exécution et joignable depuis les machines de nœud de calcul Dataflow.
  • Les sujets Apache Kafka doivent exister et les messages doivent être encodés dans un format JSON valide.

Paramètres de modèle

Paramètres Description
outputTableSpec Emplacement de la table de sortie BigQuery dans lequel écrire les messages Apache Kafka, au format my-project:dataset.table.
inputTopics Sujets d'entrée Apache Kafka à lire dans une liste d'éléments séparés par une virgule. Par exemple : messages
bootstrapServers Adresse hôte des serveurs de courtiers Apache Kafka en cours d'exécution dans une liste d'éléments séparés par une virgule, chaque adresse hôte au format 35.70.252.199:9092.
javascriptTextTransformGcsPath (Facultatif) Chemin d'accès de l'emplacement Cloud Storage vers la fonction JavaScript définie par l'utilisateur (UDF). Par exemple : gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript à appeler en tant que votre UDF. Par exemple : transform
outputDeadletterTable (Facultatif) Table BigQuery pour les messages qui n'ont pas pu atteindre la table de sortie, au format my-project:dataset.my-deadletter-table. Si elle n'existe pas, la table est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.

Exécuter le modèle Apache Kafka vers BigQuery

Console

Exécuter le modèle depuis Google Cloud Console
  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).
  4. Bouton &quot;Créer une tâche à partir d&#39;un modèle&quot; dans Cloud Console
  5. Sélectionnez the Apache Kafka to BigQuery template dans le menu déroulant Modèle Dataflow.
  6. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  7. Saisissez vos valeurs de paramètres dans les champs fournis.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécution à partir de l'outil de ligne de commande gcloud

Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Remplacez l'élément suivant :

  • YOUR_PROJECT_ID : ID de votre projet de modèle
  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : nom de la région Dataflow. Par exemple : us-central1
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont proposés, veuillez suivre les instructions expliquant comment échapper les virgules.
  • PATH_TO_JAVASCRIPT_UDF_FILE : chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript
  • YOUR_JAVASCRIPT_FUNCTION : nom de votre fonction définie par l'utilisateur (UDF)
  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, veuillez suivre les instructions expliquant comment échapper les virgules.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Exécution à partir de l'API REST

Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Remplacez l'élément suivant :

  • YOUR_PROJECT_ID : ID de votre projet de modèle
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : nom de la région Dataflow. Par exemple : us-central1
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont proposés, veuillez suivre les instructions expliquant comment échapper les virgules.
  • PATH_TO_JAVASCRIPT_UDF_FILE : chemin d'accès Cloud Storage au fichier .js contenant votre code JavaScript
  • YOUR_JAVASCRIPT_FUNCTION : nom de votre fonction définie par l'utilisateur (UDF)
  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, veuillez suivre les instructions expliquant comment échapper les virgules.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}