Vorlage „MongoDB für BigQuery (CDC)“

Diese Vorlage erstellt eine Streamingpipeline, die mit MongoDB-Änderungsstreams funktioniert. Wenn Sie diese Vorlage verwenden möchten, veröffentlichen Sie die Änderungsstreamdaten in Pub/Sub. Die Pipeline liest die JSON-Datensätze aus Pub/Sub und schreibt sie in BigQuery. Die in BigQuery geschriebenen Datensätze haben das gleiche Format wie die Batchvorlage für MongoDB für BigQuery.

Pipelineanforderungen

  • Das BigQuery-Ziel-Dataset muss vorhanden sein.
  • Die MongoDB-Quellinstanz muss über die Dataflow-Worker-Maschinen zugänglich sein.
  • Sie müssen ein Pub/Sub-Thema erstellen, um den Änderungsstream zu lesen. Achten Sie während der Ausführung der Pipeline auf CDC-Ereignisse (Change Data Capture) im MongoDB-Änderungsstream und veröffentlichen Sie sie als JSON-Einträge in Pub/Sub. Weitere Informationen zum Veröffentlichen von Nachrichten in Pub/Sub finden Sie unter Nachrichten in Themen veröffentlichen.

Vorlagenparameter

Erforderliche Parameter

  • mongoDbUri: MongoDB-Verbindungs-URI im Format mongodb+srv://:@.
  • database: Datenbank in MongoDB, aus der die Sammlung gelesen werden soll. Beispiel: my-db.
  • collection: Name der Sammlung in der MongoDB-Datenbank. (Beispiel: my-collection).
  • userOption : Nutzeroption: FLATTEN oder NONE. FLATTEN vereinfacht die Dokumente auf die Einzelebene. NONE speichert das gesamte Dokument als JSON-String. Die Standardeinstellung ist NONE.
  • inputTopic : Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll, im Format „projects/your-project-id/topics/your-topic-name“ (Example: projects/your-project-id/topics/your-topic-name).
  • outputTableSpec : Der Speicherort der BigQuery-Tabelle, in die die Ausgabe geschrieben werden soll. Der Name muss das Format <project>:<dataset>.<table_name> haben. Das Schema der Tabelle muss mit Eingabeobjekten übereinstimmen.

Optionale Parameter

  • useStorageWriteApiAtLeastOnce : Dieser Parameter wird nur wirksam, wenn "BigQuery Storage Write API verwenden" aktiviert ist. Wenn diese Option aktiviert ist, wird für die Storage Write API die "Mindestens einmal"-Semantik verwendet. Andernfalls wird die "Genau einmal"-Semantik verwendet. Die Standardeinstellung ist "false".
  • KMSEncryptionKey : Cloud KMS-Verschlüsselungsschlüssel zum Entschlüsseln des Mongodb-URI-Verbindungsstrings. Wenn der Cloud KMS-Schlüssel übergeben wird, muss der Mongodb-URI-Verbindungsstring verschlüsselt übergeben werden. (Beispiel: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
  • useStorageWriteApi : Wenn „true“, verwendet die Pipeline beim Schreiben der Daten in BigQuery die Storage Write API (siehe https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Der Standardwert ist „false“. Wenn Sie die Storage Write API im genau einmaligen Modus verwenden, müssen Sie die folgenden Parameter festlegen: „Anzahl der Streams für die BigQuery Storage Write API“ und „Triggerhäufigkeit in Sekunden für die BigQuery Storage Write API“. Wenn Sie den Dataflow-Modus „Mindestens einmal“ aktivieren oder den Parameter „useStorageWriteApiAtLeastOnce“ auf „true“ setzen, müssen Sie die Anzahl der Streams oder die Triggerhäufigkeit nicht festlegen.
  • numStorageWriteApiStreams: Die Anzahl der Streams definiert die Parallelität der Write-Transformation von BigQueryIO und entspricht ungefähr der Anzahl der Streams der Storage Write API, die von der Pipeline verwendet werden. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Die Triggerhäufigkeit legt fest, wie schnell die Daten für Abfragen in BigQuery sichtbar sind. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.
  • javascriptDocumentTransformGcsPath : Das Cloud Storage-Pfadmuster für den JavaScript-Code, der Ihre benutzerdefinierten Funktionen enthält. (Beispiel: gs://your-bucket/your-transforms/*.js).
  • javascriptDocumentTransformFunctionName : Der Funktionsname darf nur Buchstaben, Ziffern und Unterstriche enthalten. Beispiel: "transform" oder "transform_udf1". (Beispiel: transform).

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) in JavaScript schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert.

Wenn Sie eine UDF verwenden möchten, laden Sie die JavaScript-Datei in Cloud Storage hoch und legen Sie die folgenden Vorlagenparameter fest:

ParameterBeschreibung
javascriptDocumentTransformGcsPath Der Cloud Storage-Speicherort der JavaScript-Datei.
javascriptDocumentTransformFunctionName Der Name der JavaScript-Funktion.

Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Ein MongoDB-Dokument.
  • Ausgabe: Ein Objekt, das als JSON-String serialisiert ist.
  • Führen Sie die Vorlage aus.

    Console

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the MongoDB to BigQuery (CDC) templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Klicken Sie auf Job ausführen.

    gcloud

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • OUTPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Zieltabelle.
    • MONGO_DB_URI: Ihr MongoDB-URI.
    • DATABASE: Ihre MongoDB-Datenbank.
    • COLLECTION: Ihre MongoDB-Sammlung.
    • USER_OPTION: FLATTEN oder NONE.
    • INPUT_TOPIC: Ihr Pub/Sub-Eingabethema.

    API

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • OUTPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Zieltabelle.
    • MONGO_DB_URI: Ihr MongoDB-URI.
    • DATABASE: Ihre MongoDB-Datenbank.
    • COLLECTION: Ihre MongoDB-Sammlung.
    • USER_OPTION: FLATTEN oder NONE.
    • INPUT_TOPIC: Ihr Pub/Sub-Eingabethema.

    Nächste Schritte