Ce modèle crée un pipeline de streaming qui fonctionne avec les flux de modifications MongoDB. Pour utiliser ce modèle, publiez les données du flux de modifications dans Pub/Sub. Le pipeline lit les enregistrements JSON à partir de Pub/Sub et les écrit dans BigQuery. Les enregistrements écrits dans BigQuery ont le même format que le modèle par lots MongoDB vers BigQuery.
Conditions requises pour ce pipeline
- L'ensemble de données BigQuery cible doit exister.
- L'instance MongoDB source doit être accessible à partir des machines de nœud de calcul Dataflow.
- Vous devez créer un sujet Pub/Sub pour lire le flux de modifications. Pendant l'exécution du pipeline, écoutez les événements de capture des données modifiées (CDC, Change Data Capture) dans le flux de modifications MongoDB, puis publiez-les dans Pub/Sub en tant qu'enregistrements JSON. Pour en savoir plus sur la publication de messages dans Pub/Sub, consultez la section Publier des messages dans des sujets.
- Ce modèle utilise les flux de modifications MongoDB. Il n'est pas compatible avec la capture des données modifiées BigQuery.
Paramètres de modèle
Paramètres obligatoires
- mongoDbUri: URI de connexion MongoDB au format
mongodb+srv://:@.
. - database: base de données de MongoDB à partir de laquelle lire la collection. Exemple :
my-db
- collection: nom de la collection dans la base de données MongoDB. Exemple :
my-collection
- userOption:
FLATTEN
,JSON
ouNONE
.FLATTEN
aplatit les documents au niveau unique.JSON
stocke le document au format JSON BigQuery.NONE
stocke l'intégralité du document sous forme de chaîne au format JSON. La valeur par défaut est "NONE". - inputTopic: sujet d'entrée Pub/Sub à lire, au format
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. - outputTableSpec: table BigQuery dans laquelle écrire. Exemple :
bigquery-project:dataset.output_table
Paramètres facultatifs
- useStorageWriteApiAtLeastOnce: spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur
true
. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre surfalse
. Ce paramètre ne s'applique que lorsque la valeur deuseStorageWriteApi
est définie surtrue
. La valeur par défaut estfalse
. - KMSEncryptionKey: clé de chiffrement Cloud KMS permettant de déchiffrer la chaîne de connexion URI mongodb. Si la clé Cloud KMS est transmise, l'uri de la chaîne de connexion mongodb doit toutes être transmises de manière chiffrée. Exemple :
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
- filter: filtre Bson au format JSON. Exemple :
{ "val": { $gt: 0, $lt: 9 }}
- useStorageWriteApi: si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est
false
. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre. La valeur par défaut est 0. - storageWriteApiTriggeringFrequencySec: spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre. - bigQuerySchemaPath: chemin d'accès Cloud Storage pour le schéma JSON BigQuery. Exemple :
gs://your-bucket/your-schema.json
- javascriptDocumentTransformGcsPath: URI Cloud Storage du fichier
.js
qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Par exemple,gs://your-bucket/your-transforms/*.js
. - javascriptDocumentTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples. Par exemple,transform
.
Fonction définie par l'utilisateur
Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF) en JavaScript. Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON.
Pour utiliser une UDF, importez le fichier JavaScript dans Cloud Storage et définissez les paramètres de modèle suivants :
Paramètre | Description |
---|---|
javascriptDocumentTransformGcsPath |
Emplacement Cloud Storage du fichier JavaScript. |
javascriptDocumentTransformFunctionName |
Nom de la fonction JavaScript. |
Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Spécification de la fonction
La spécification de l'UDF se présente comme suit :
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the MongoDB (CDC) to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: nom de votre table BigQuery cible.MONGO_DB_URI
: votre URI MongoDB.DATABASE
: votre base de données MongoDB.COLLECTION
: votre collection MongoDB.USER_OPTION
: FLATTEN, JSON ou NONE.INPUT_TOPIC
: votre sujet d'entrée Pub/Sub.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
: nom de votre table BigQuery cible.MONGO_DB_URI
: votre URI MongoDB.DATABASE
: votre base de données MongoDB.COLLECTION
: votre collection MongoDB.USER_OPTION
: FLATTEN, JSON ou NONE.INPUT_TOPIC
: votre sujet d'entrée Pub/Sub.
Étape suivante
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.