Modèle MongoDB vers BigQuery

Ce modèle crée un pipeline par lots qui lit les documents de MongoDB et les écrit dans BigQuery.

Si vous souhaitez capturer les données de flux de modifications MongoDB, vous pouvez utiliser le modèle MongoDB vers BigQuery (CDC).

Conditions requises pour ce pipeline

  • L'ensemble de données BigQuery cible doit exister.
  • L'instance MongoDB source doit être accessible à partir des machines de nœud de calcul Dataflow.

Format de sortie

Le format des enregistrements de sortie dépend de la valeur du paramètre userOption. Si la valeur de userOption est NONE, le résultat ressemble au suivant. Le champ source_data contient le document au format JSON.

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Si userOption est FLATTEN, le pipeline aplatit les documents et écrit les champs de premier niveau en tant que colonnes de tableau. Par exemple, supposons que les documents de la collection MongoDB contiennent les champs suivants :

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

Avec FLATTEN, le schéma de sortie est le suivant. Le champ timestamp est ajouté par le modèle.

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Si userOption est JSON, le pipeline stocke le document au format JSON BigQuery. BigQuery est compatible avec les données JSON à l'aide du type de données JSON. Pour en savoir plus, consultez la page Utiliser des données JSON en langage GoogleSQL.

Paramètres de modèle

Paramètres obligatoires

  • mongoDbUri: URI de connexion MongoDB au format mongodb+srv://:@..
  • database: base de données de MongoDB à partir de laquelle lire la collection. Exemple :my-db
  • collection: nom de la collection dans la base de données MongoDB. Exemple :my-collection
  • userOption: FLATTEN, JSON ou NONE. FLATTEN aplatit les documents au niveau unique. JSON stocke le document au format JSON BigQuery. NONE stocke l'intégralité du document sous forme de chaîne au format JSON. La valeur par défaut est "NONE".
  • outputTableSpec: table BigQuery dans laquelle écrire. Exemple :bigquery-project:dataset.output_table

Paramètres facultatifs

  • KMSEncryptionKey: clé de chiffrement Cloud KMS permettant de déchiffrer la chaîne de connexion URI mongodb. Si la clé Cloud KMS est transmise, l'uri de la chaîne de connexion mongodb doit toutes être transmises de manière chiffrée. Exemple :projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • filter: filtre Bson au format JSON. Exemple :{ "val": { $gt: 0, $lt: 9 }}
  • useStorageWriteApi: si la valeur est true, le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
  • bigQuerySchemaPath: chemin d'accès Cloud Storage pour le schéma JSON BigQuery. Exemple :gs://your-bucket/your-schema.json
  • javascriptDocumentTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Par exemple, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples. Par exemple, transform.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF) en JavaScript. Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON.

Pour utiliser une UDF, importez le fichier JavaScript dans Cloud Storage et définissez les paramètres de modèle suivants :

ParamètreDescription
javascriptDocumentTransformGcsPath Emplacement Cloud Storage du fichier JavaScript.
javascriptDocumentTransformFunctionName Nom de la fonction JavaScript.

Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : document MongoDB.
  • Résultat : objet sérialisé en tant que chaîne JSON. Si la valeur de userOption est NONE, l'objet JSON doit inclure une propriété nommée _id contenant l'ID du document.
  • Exécuter le modèle

    1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
    2. Accéder à la page Créer un job à partir d'un modèle
    3. Dans le champ Nom du job, saisissez un nom de job unique.
    4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

      Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

    5. Dans le menu déroulant Modèle Dataflow, sélectionnez the MongoDB to BigQuery template.
    6. Dans les champs fournis, saisissez vos valeurs de paramètres.
    7. Cliquez sur Run Job (Exécuter la tâche).

    Dans le shell ou le terminal, exécutez le modèle :

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • OUTPUT_TABLE_SPEC : nom de votre table BigQuery cible.
    • MONGO_DB_URI : votre URI MongoDB.
    • DATABASE : votre base de données MongoDB.
    • COLLECTION : votre collection MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.

    Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • OUTPUT_TABLE_SPEC : nom de votre table BigQuery cible.
    • MONGO_DB_URI : votre URI MongoDB.
    • DATABASE : votre base de données MongoDB.
    • COLLECTION : votre collection MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.client.json.gson.GsonFactory;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiBatchOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.common.base.Strings;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.FileSystems;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.mongodb.FindQuery;
    import org.apache.beam.sdk.io.mongodb.MongoDbIO;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.BsonDocument;
    import org.bson.Document;
    
    /**
     * The {@link MongoDbToBigQuery} pipeline is a batch pipeline which ingests data from MongoDB and
     * outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery",
        category = TemplateCategory.BATCH,
        displayName = "MongoDB to BigQuery",
        description =
            "The MongoDB to BigQuery template is a batch pipeline that reads documents from MongoDB and writes them to "
                + "BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines."
        })
    public class MongoDbToBigQuery {
      /**
       * Options supported by {@link MongoDbToBigQuery}
       *
       * <p>Inherits standard configuration options.
       */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              BigQueryWriteOptions,
              BigQueryStorageApiBatchOptions,
              JavascriptDocumentTransformerOptions {}
    
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    
        BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    
        run(options);
      }
    
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri plain text or base64 encrypted with a specific KMS encryption key
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getBigQuerySchemaPath() != null) {
          // initialize FileSystem to read from GCS
          FileSystems.setDefaultPipelineOptions(options);
          String jsonSchema = getGcsFileAsString(options.getBigQuerySchemaPath());
          GsonFactory gf = new GsonFactory();
          bigquerySchema = gf.fromString(jsonSchema, TableSchema.class);
        } else if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        MongoDbIO.Read readDocuments =
            MongoDbIO.read()
                .withUri(mongoDbUri)
                .withDatabase(options.getDatabase())
                .withCollection(options.getCollection());
    
        String filterJson = options.getFilter();
        BsonDocument filter;
        if (!Strings.isNullOrEmpty(filterJson)
            && !(filter = BsonDocument.parse(filterJson)).isEmpty()) {
          readDocuments = readDocuments.withQueryFn(FindQuery.create().withFilters(filter));
        }
    
        pipeline
            .apply("Read Documents", readDocuments)
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Transform to TableRow",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
    
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                "Write to Bigquery",
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    Étape suivante