Vorlage „MongoDB zu BigQuery“ (Stream)

Diese Vorlage erstellt eine Streamingpipeline, die mit MongoDB-Änderungsstreams funktioniert. Wenn Sie diese Vorlage verwenden möchten, veröffentlichen Sie die Änderungsstreamdaten in Pub/Sub. Die Pipeline liest die JSON-Datensätze aus Pub/Sub und schreibt sie in BigQuery. Die in BigQuery geschriebenen Datensätze haben das gleiche Format wie die Batchvorlage für MongoDB für BigQuery.

Pipelineanforderungen

  • Das BigQuery-Ziel-Dataset muss vorhanden sein.
  • Die MongoDB-Quellinstanz muss über die Dataflow-Worker-Maschinen zugänglich sein.
  • Sie müssen ein Pub/Sub-Thema erstellen, um den Änderungsstream zu lesen. Achten Sie während der Ausführung der Pipeline auf CDC-Ereignisse (Change Data Capture) im MongoDB-Änderungsstream und veröffentlichen Sie sie als JSON-Einträge in Pub/Sub. Weitere Informationen zum Veröffentlichen von Nachrichten in Pub/Sub finden Sie unter Nachrichten in Themen veröffentlichen.
  • Diese Vorlage verwendet MongoDB-Änderungsstreams. Die Erfassung von Änderungsdaten in BigQuery wird nicht unterstützt.

Vorlagenparameter

Erforderliche Parameter

  • mongoDbUri: MongoDB-Verbindungs-URI im Format mongodb+srv://:@..
  • database: Datenbank in MongoDB, aus der die Sammlung gelesen werden soll. Beispiel: my-db.
  • collection: Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection.
  • userOption: FLATTEN, JSON oder NONE. FLATTEN vereinfacht die Dokumente auf die Einzelebene. JSON speichert das Dokument im BigQuery-JSON-Format. NONE speichert das gesamte Dokument als JSON-formatierten STRING. Die Standardeinstellung ist: NONE.
  • inputTopic: Das Pub/Sub-Eingabethema, aus dem gelesen werden soll, im Format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputTableSpec: Die BigQuery-Tabelle, in die geschrieben werden soll. Beispiel: bigquery-project:dataset.output_table.

Optionale Parameter

  • useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
  • KMSEncryptionKey: Cloud KMS-Verschlüsselungsschlüssel zum Entschlüsseln des MongoDB-URI-Verbindungsstrings. Wenn der Cloud KMS-Schlüssel übergeben wird, muss der MongoDB-URI-Verbindungsstring verschlüsselt übergeben werden. Beispiel: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: Bson-Filter im JSON-Format. Beispiel: { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist false. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
  • bigQuerySchemaPath: Der Cloud Storage-Pfad für das BigQuery-JSON-Schema. Beispiel: gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: Der Cloud Storage-URI der Datei .js, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://your-bucket/your-transforms/*.js
  • javascriptDocumentTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Beispiel: transform

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) in JavaScript schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert.

Wenn Sie eine UDF verwenden möchten, laden Sie die JavaScript-Datei in Cloud Storage hoch und legen Sie die folgenden Vorlagenparameter fest:

ParameterBeschreibung
javascriptDocumentTransformGcsPath Der Cloud Storage-Speicherort der JavaScript-Datei.
javascriptDocumentTransformFunctionName Der Name der JavaScript-Funktion.

Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Ein MongoDB-Dokument.
  • Ausgabe: Ein Objekt, das als JSON-String serialisiert ist.
  • Führen Sie die Vorlage aus.

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the MongoDB (CDC) to BigQuery templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Klicken Sie auf Job ausführen.

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • OUTPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Zieltabelle.
    • MONGO_DB_URI: Ihr MongoDB-URI.
    • DATABASE: Ihre MongoDB-Datenbank.
    • COLLECTION: Ihre MongoDB-Sammlung.
    • USER_OPTION: FLATTEN, JSON oder NONE.
    • INPUT_TOPIC: Ihr Pub/Sub-Eingabethema.

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • OUTPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Zieltabelle.
    • MONGO_DB_URI: Ihr MongoDB-URI.
    • DATABASE: Ihre MongoDB-Datenbank.
    • COLLECTION: Ihre MongoDB-Sammlung.
    • USER_OPTION: FLATTEN, JSON oder NONE.
    • INPUT_TOPIC: Ihr Pub/Sub-Eingabethema.
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.PubSubOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbCdcToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link MongoDbCdcToBigQuery} pipeline is a streaming pipeline which reads data pushed to
     * PubSub from MongoDB Changestream and outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery_CDC.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery_CDC",
        category = TemplateCategory.STREAMING,
        displayName = "MongoDB (CDC) to BigQuery",
        description =
            "The MongoDB CDC (Change Data Capture) to BigQuery template is a streaming pipeline that works together with MongoDB change streams. "
                + "The pipeline reads the JSON records pushed to Pub/Sub via a MongoDB change stream and writes them to BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery-cdc",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-change-stream-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines.",
          "The change stream pushing changes from MongoDB to Pub/Sub should be running."
        },
        streaming = true,
        supportsAtLeastOnce = true)
    public class MongoDbCdcToBigQuery {
    
      private static final Logger LOG = LoggerFactory.getLogger(MongoDbCdcToBigQuery.class);
    
      /** Options interface. */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              PubSubOptions,
              BigQueryWriteOptions,
              JavascriptDocumentTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 1,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. To"
                    + " use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-"
                    + " once semantics, set the parameter to `false`. This parameter applies only when"
                    + " `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** class ParseAsDocumentsFn. */
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
    
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      /**
       * Main entry point for pipeline execution.
       *
       * @param args Command line arguments to the pipeline.
       */
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        run(options);
      }
    
      /** Pipeline to read data from PubSub and write to MongoDB. */
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        options.setStreaming(true);
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
        String inputOption = options.getInputTopic();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        pipeline
            .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(inputOption))
            .apply(
                "RTransform string to document",
                ParDo.of(
                    new DoFn<String, Document>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = Document.parse(c.element());
                        c.output(document);
                      }
                    }))
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Read and transform data",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    Nächste Schritte