Vorlage „BigQuery für MongoDB“

Die Vorlage "BigQuery für MongoDB" ist eine Batchpipeline, die Zeilen aus BigQuery liest und als Dokumente in MongoDB schreibt. Derzeit wird jede Zeile als Dokument gespeichert.

Pipelineanforderungen

  • Die BigQuery-Quelltabelle
  • Die MongoDB-Zielinstanz sollte über die Dataflow-Worker-Maschinen zugänglich sein.

Vorlagenparameter

Erforderliche Parameter

  • mongoDbUri: MongoDB-Verbindungs-URI im Format mongodb+srv://:@.
  • database: Datenbank in MongoDB zum Speichern der Sammlung. Beispiel: my-db.
  • collection: Der Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection.
  • inputTableSpec: Die BigQuery-Tabelle, aus der gelesen werden soll. Beispiel: bigquery-project:dataset.input_table.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the BigQuery to MongoDB templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

  gcloud dataflow flex-template run JOB_NAME \
      --project=PROJECT_ID \
      --region=REGION_NAME \
      --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_MongoDB \
      --parameters \
  inputTableSpec=INPUT_TABLE_SPEC,\
  mongoDbUri=MONGO_DB_URI,\
  database=DATABASE,\
  collection=COLLECTION
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • INPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Quelltabelle.
  • MONGO_DB_URI: Ihr MongoDB-URI.
  • DATABASE: Ihre MongoDB-Datenbank.
  • COLLECTION: Ihre MongoDB-Sammlung.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
     "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputTableSpec": "INPUT_TABLE_SPEC",
            "mongoDbUri": "MONGO_DB_URI",
            "database": "DATABASE",
            "collection": "COLLECTION"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_MongoDB",
     }
  }

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • INPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Quelltabelle.
  • MONGO_DB_URI: Ihr MongoDB-URI.
  • DATABASE: Ihre MongoDB-Datenbank.
  • COLLECTION: Ihre MongoDB-Sammlung.
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.mongodb.templates;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.BigQueryReadOptions;
import com.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.MongoDbOptions;
import com.google.cloud.teleport.v2.mongodb.templates.BigQueryToMongoDb.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.bson.Document;

/**
 * The {@link BigQueryToMongoDb} pipeline is a batch pipeline which reads data from BigQuery and
 * outputs the resulting records to MongoDB.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-mongodb/README_BigQuery_to_MongoDB.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_MongoDB",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to MongoDB",
    description =
        "The BigQuery to MongoDB template is a batch pipeline that reads rows from a BigQuery and writes them to MongoDB as documents. "
            + "Currently each row is stored as a document.",
    optionsClass = Options.class,
    flexContainerName = "bigquery-to-mongodb",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-mongodb",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The source BigQuery table must exist.",
      "The target MongoDB instance should be accessible from the Dataflow worker machines."
    })
public class BigQueryToMongoDb {
  /**
   * Options supported by {@link BigQueryToMongoDb}
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options extends PipelineOptions, MongoDbOptions, BigQueryReadOptions {}

  private static class ParseAsDocumentsFn extends DoFn<String, Document> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(Document.parse(context.element()));
    }
  }

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
  }

  public static boolean run(Options options) {
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(BigQueryIO.readTableRows().withoutValidation().from(options.getInputTableSpec()))
        .apply(
            "bigQueryDataset",
            ParDo.of(
                new DoFn<TableRow, Document>() {
                  @ProcessElement
                  public void process(ProcessContext c) {
                    Document doc = new Document();
                    TableRow row = c.element();
                    row.forEach(
                        (key, value) -> {
                          if (!key.equals("_id")) {
                            doc.append(key, value);
                          }
                        });
                    c.output(doc);
                  }
                }))
        .apply(
            MongoDbIO.write()
                .withUri(options.getMongoDbUri())
                .withDatabase(options.getDatabase())
                .withCollection(options.getCollection()));
    pipeline.run();
    return true;
  }
}

Nächste Schritte