Modèle BigQuery vers MongoDB

Le modèle BigQuery vers MongoDB est un pipeline par lots qui lit les lignes d'une table BigQuery et les écrit dans MongoDB sous forme de documents. Actuellement, chaque ligne est stockée en tant que document.

Conditions requises pour ce pipeline

  • La table BigQuery source doit exister.
  • L'instance MongoDB cible doit être accessible à partir des machines de nœud de calcul Dataflow.

Paramètres de modèle

Paramètres obligatoires

  • mongoDbUri: URI de connexion MongoDB au format mongodb+srv://:@.
  • database: base de données dans MongoDB pour stocker la collection. Exemple :my-db
  • collection: nom de la collection dans la base de données MongoDB. Exemple :my-collection
  • inputTableSpec: table BigQuery à lire. Exemple :bigquery-project:dataset.input_table

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the BigQuery to MongoDB template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

  gcloud dataflow flex-template run JOB_NAME \
      --project=PROJECT_ID \
      --region=REGION_NAME \
      --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_MongoDB \
      --parameters \
  inputTableSpec=INPUT_TABLE_SPEC,\
  mongoDbUri=MONGO_DB_URI,\
  database=DATABASE,\
  collection=COLLECTION
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_TABLE_SPEC : nom de votre table BigQuery source.
  • MONGO_DB_URI : votre URI MongoDB.
  • DATABASE : votre base de données MongoDB.
  • COLLECTION : votre collection MongoDB.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
     "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputTableSpec": "INPUT_TABLE_SPEC",
            "mongoDbUri": "MONGO_DB_URI",
            "database": "DATABASE",
            "collection": "COLLECTION"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_MongoDB",
     }
  }

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • INPUT_TABLE_SPEC : nom de votre table BigQuery source.
  • MONGO_DB_URI : votre URI MongoDB.
  • DATABASE : votre base de données MongoDB.
  • COLLECTION : votre collection MongoDB.
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.mongodb.templates;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.BigQueryReadOptions;
import com.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.MongoDbOptions;
import com.google.cloud.teleport.v2.mongodb.templates.BigQueryToMongoDb.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.bson.Document;

/**
 * The {@link BigQueryToMongoDb} pipeline is a batch pipeline which reads data from BigQuery and
 * outputs the resulting records to MongoDB.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-mongodb/README_BigQuery_to_MongoDB.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_MongoDB",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to MongoDB",
    description =
        "The BigQuery to MongoDB template is a batch pipeline that reads rows from a BigQuery and writes them to MongoDB as documents. "
            + "Currently each row is stored as a document.",
    optionsClass = Options.class,
    flexContainerName = "bigquery-to-mongodb",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-mongodb",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The source BigQuery table must exist.",
      "The target MongoDB instance should be accessible from the Dataflow worker machines."
    })
public class BigQueryToMongoDb {
  /**
   * Options supported by {@link BigQueryToMongoDb}
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options extends PipelineOptions, MongoDbOptions, BigQueryReadOptions {}

  private static class ParseAsDocumentsFn extends DoFn<String, Document> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(Document.parse(context.element()));
    }
  }

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
  }

  public static boolean run(Options options) {
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(BigQueryIO.readTableRows().withoutValidation().from(options.getInputTableSpec()))
        .apply(
            "bigQueryDataset",
            ParDo.of(
                new DoFn<TableRow, Document>() {
                  @ProcessElement
                  public void process(ProcessContext c) {
                    Document doc = new Document();
                    TableRow row = c.element();
                    row.forEach(
                        (key, value) -> {
                          if (!key.equals("_id")) {
                            doc.append(key, value);
                          }
                        });
                    c.output(doc);
                  }
                }))
        .apply(
            MongoDbIO.write()
                .withUri(options.getMongoDbUri())
                .withDatabase(options.getDatabase())
                .withCollection(options.getCollection()));
    pipeline.run();
    return true;
  }
}

Étape suivante