Vorlage „Pub/Sub für MongoDB“

Die Vorlage „Pub/Sub für MongoDB“ ist eine Streamingpipeline, die JSON-codierte Nachrichten aus einem Pub/Sub-Abo liest und in MongoDB als Dokumente schreibt. Bei Bedarf unterstützt diese Pipeline zusätzliche Transformationen, die über eine benutzerdefinierte JavaScript-Funktion (UDF) eingebunden werden können.

Wenn während der Verarbeitung von Datensätzen Fehler auftreten, schreibt die Vorlage sie zusammen mit der Eingabenachricht in eine BigQuery-Tabelle. Fehler können beispielsweise aufgrund von nicht übereinstimmenden Schemas, fehlerhaftem JSON oder während der Ausführung von Transformationen auftreten. Geben Sie den Tabellennamen im Parameter deadletterTable an. Wenn die Tabelle nicht vorhanden ist, wird sie von der Pipeline automatisch erstellt.

Pipelineanforderungen

  • Das Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Der MongoDB-Cluster muss vorhanden und über die Dataflow-Worker-Maschinen zugänglich sein.

Vorlagenparameter

Erforderliche Parameter

  • inputSubscription: Name des Pub/Sub-Abos. (Beispiel: projects/your-project-id/subscriptions/your-subscription-name).
  • mongoDBUri: Durch Kommas getrennte Liste von MongoDB-Servern. (Beispiel: host1:port,host2:port,host3:port).
  • database: Datenbank in MongoDB zum Speichern der Sammlung. (Beispiel: my-db).
  • collection: Der Name der Sammlung in der MongoDB-Datenbank. (Beispiel: my-collection).
  • deadletterTable: Die BigQuery-Tabelle, die durch Fehler verursachte Nachrichten speichert, z. B. nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Dateien usw. (Beispiel: your-project-id:your-dataset.your-table-name).

Optionale Parameter

  • batchSize: Batchgröße für die Aufnahme von Dokumentenbatches in MongoDB. Die Standardeinstellung ist 1000.
  • batchSizeBytes: Batchgröße in Byte. Die Standardeinstellung ist 5242880.
  • maxConnectionIdleTime: Maximale zulässige Leerlaufzeit in Sekunden, bis eine Zeitüberschreitung der Verbindung auftritt. Die Standardeinstellung ist 60000.
  • sslEnabled: Boolescher Wert, der angibt, ob für die Verbindung zu MongoDB SSL aktiviert ist. Die Standardeinstellung ist true.
  • ignoreSSLCertificate: Boolescher Wert, der angibt, ob das SSL-Zertifikat ignoriert werden soll. Die Standardeinstellung ist true.
  • withOrdered: Boolescher Wert, mit dem geordnete Bulk-Aufnahmen in MongoDB aktiviert werden. Die Standardeinstellung ist true.
  • withSSLInvalidHostNameAllowed: Boolescher Wert, der angibt, ob ein ungültiger Hostname für die SSL-Verbindung zulässig ist. Die Standardeinstellung ist true.
  • javascriptTextTransformGcsPath Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. (Beispiel: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Gibt an, wie oft die UDF aktualisiert werden soll (in Minuten). Wenn der Wert größer als 0 ist, prüft Dataflow regelmäßig die UDF-Datei in Cloud Storage und lädt die UDF neu, wenn die Datei geändert wurde. Mit diesem Parameter können Sie die UDF aktualisieren, während die Pipeline ausgeführt wird, ohne den Job neu starten zu müssen. Wenn der Wert 0 ist, ist das Neuladen der UDF deaktiviert. Der Standardwert ist 0.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Eine einzelne Zeile aus einer CSV-Eingabedatei
  • Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to MongoDB templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)

Nächste Schritte