Modèle Pub/Sub vers MongoDB

Le modèle Pub/Sub vers MongoDB est un pipeline de streaming qui lit les messages encodés au format JSON d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents. Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction JavaScript définie par l'utilisateur.

Si des erreurs se produisent lors du traitement des enregistrements, le modèle les écrit dans une table BigQuery, avec le message d'entrée. Par exemple, des erreurs peuvent se produire en raison d'une non-concordance du schéma, d'un format JSON non valide ou lors de l'exécution de transformations. Indiquez le nom de la table dans le paramètre deadletterTable. Si la table n'existe pas, le pipeline la crée automatiquement.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
  • Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.

Paramètres de modèle

Paramètres obligatoires

  • inputSubscription : nom de l'abonnement Pub/Sub. (Par exemple : projects/votre-id-projet/subscriptions/nom-de-votre-abonnement).
  • mongoDBUri : Liste de serveurs MongoDB séparés par une virgule. (par exemple, host1:port,host2:port,host3:port).
  • database : Base de données dans MongoDB pour stocker la collection. (Exemple : my-db).
  • collection : nom de la collection dans la base de données MongoDB. (Exemple : my-collection).
  • deadletterTable : table BigQuery qui stocke les messages causés par des échecs, tels qu'un schéma non correspondant, un format JSON non valide, etc. (Par exemple : your-project-id:your-dataset.your-table-name).

Paramètres facultatifs

  • batchSize : Taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. La valeur par défaut est 1000.
  • batchSizeBytes : taille du lot en octets. La valeur par défaut est 5242880.
  • maxConnectionIdleTime : Durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. La valeur par défaut est 60000.
  • sslEnabled : valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. La valeur par défaut est "true".
  • ignoreSSLCertificate : valeur booléenne indiquant si le certificat SSL doit être ignoré. La valeur par défaut est "true".
  • withOrdered : Valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. La valeur par défaut est "true".
  • withSSLInvalidHostNameAllowed : valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. La valeur par défaut est "true".
  • javascriptTextTransformGcsPath : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. (Exemple: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes : spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : une ligne provenant d'un fichier CSV d'entrée.
  • Sortie : document JSON concaténé à insérer dans MongoDB.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to MongoDB template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)

Étapes suivantes