Modèle Pub/Sub vers MongoDB avec des fonctions Python définies par l'utilisateur

Le modèle Pub/Sub vers MongoDB avec fonctions Python définies par l'utilisateur est un pipeline de streaming qui lit les messages encodés au format JSON à partir d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents. Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction Python définie par l'utilisateur.

Si des erreurs se produisent lors du traitement des enregistrements, le modèle les écrit dans une table BigQuery, avec le message d'entrée. Par exemple, des erreurs peuvent se produire en raison d'une non-concordance du schéma, d'un format JSON non valide ou lors de l'exécution de transformations. Indiquez le nom de la table dans le paramètre deadletterTable. Si la table n'existe pas, le pipeline la crée automatiquement.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
  • Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.

Paramètres de modèle

Paramètre Description
inputSubscription Nom de l'abonnement Pub/Sub. Par exemple : projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Liste de serveurs MongoDB séparés par une virgule. Par exemple : 192.285.234.12:27017,192.287.123.11:27017
database Base de données dans MongoDB pour stocker la collection. Exemple : my-db.
collection Nom de la collection dans la base de données MongoDB. Exemple : my-collection.
deadletterTable Table BigQuery qui stocke les messages en raison d'échecs (schéma non correspondant, format JSON non valide, etc.). Exemple : project-id:dataset-name.table-name.
pythonExternalTextTransformGcsPath Facultatif : URI Cloud Storage du fichier de code Python qui définit la fonction définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName (Facultatif) : Nom de la fonction Python définie par l'utilisateur (UDF) que vous souhaitez utiliser.
batchSize Facultatif : Taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. Valeur par défaut : 1000
batchSizeBytes Facultatif : Taille du lot en octets. Valeur par défaut : 5242880
maxConnectionIdleTime Facultatif : Durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. Valeur par défaut : 60000
sslEnabled Facultatif : Valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. Valeur par défaut : true
ignoreSSLCertificate Facultatif : Valeur booléenne indiquant si le certificat SSL doit être ignoré. Valeur par défaut : true
withOrdered Facultatif : Valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. Valeur par défaut : true
withSSLInvalidHostNameAllowed Facultatif : Valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. Valeur par défaut : true

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : une ligne provenant d'un fichier CSV d'entrée.
  • Sortie : document JSON concaténé à insérer dans MongoDB.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to MongoDB with Python UDFs template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)

Étape suivante