Vorlage „Pub/Sub für Elasticsearch“

Die Vorlage „Pub/Sub für Elasticsearch“ ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und sie als Dokumente in Elasticsearch schreibt. Die Dataflow-Vorlage verwendet die Datenstreams-Funktion von Elasticsearch, um Zeitachsendaten über mehrere Indexe zu speichern, wobei Sie eine einzige benannte Ressource für Anfragen erhalten. Datenstreams eignen sich gut für Logs, Messwerte, Traces und andere kontinuierlich generierte Daten, die in Pub/Sub gespeichert sind.

Mit der Vorlage wird ein Datenstream mit dem Namen logs-gcp.DATASET-NAMESPACE erstellt. Dabei gilt:

  • DATASET ist der Wert des Vorlagenparameters dataset oder pubsub, wenn nicht anders angegeben.
  • NAMESPACE ist der Wert des Vorlagenparameters namespace oder default, wenn nicht anders angegeben.

Pipelineanforderungen

  • Das Quell-Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Ein öffentlich erreichbarer Elasticsearch-Host auf einer Google Cloud -Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Weitere Informationen finden Sie unter Google Cloud-Integration für Elastic.
  • Ein Pub/Sub-Thema für die Fehlerausgabe

Vorlagenparameter

Erforderliche Parameter

  • inputSubscription: Pub/Sub-Abo, von dem die Eingabe verarbeitet wird. Beispiel: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • errorOutputTopic: Das Pub/Sub-Ausgabethema für die Veröffentlichung fehlgeschlagener Datensätze im Format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • connectionUrl: Die Elasticsearch-URL im Format https://hostname:[port]. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. Beispiel: https://elasticsearch-host:9200
  • apiKey: Der Base64-codierte API-Schlüssel für die Authentifizierung.

Optionale Parameter

  • dataset: Der Typ von über Pub/Sub gesendeten Logs, für die wir ein sofort einsatzfähiges Dashboard haben. Bekannte Werte für Logtypen sind audit, vpcflow und firewall. Standardeinstellung: pubsub.
  • namespace: Eine beliebige Gruppierung, z. B. eine Umgebung (dev, prod oder qa), ein Team oder eine strategische Geschäftseinheit. Die Standardeinstellung ist default.
  • elasticsearchTemplateVersion: Versions-ID der Dataflow-Vorlage, in der Regel von Google Cloud definiert. Die Standardeinstellung ist 1.0.0.
  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Gibt an, wie oft die UDF aktualisiert werden soll (in Minuten). Wenn der Wert größer als 0 ist, prüft Dataflow regelmäßig die UDF-Datei in Cloud Storage und lädt die UDF neu, wenn die Datei geändert wurde. Mit diesem Parameter können Sie die UDF aktualisieren, während die Pipeline ausgeführt wird, ohne den Job neu starten zu müssen. Wenn der Wert 0 ist, ist das Neuladen der UDF deaktiviert. Der Standardwert ist 0.
  • elasticsearchUsername: Der Elasticsearch-Nutzername, mit dem Sie sich authentifizieren möchten. Wenn dieses angegeben ist, wird der Wert von apiKey ignoriert.
  • elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von apiKey ignoriert.
  • batchSize: Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist 1000.
  • batchSizeBytes: Die Batchgröße in Anzahl der Byte. Standardeinstellung: 5242880 (5 MB).
  • maxRetryAttempts: Die maximale Anzahl der Wiederholungsversuche. Muss größer als Null (0) sein. Die Standardeinstellung ist no retries.
  • maxRetryDuration: Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist no retries.
  • propertyAsIndex: Das Attribut im indexierten Dokument, dessen Wert die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer _index-UDF. Die Standardeinstellung ist none.
  • javaScriptIndexFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptIndexFnName: Der Name der UDF-JavaScript-Funktion, die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • propertyAsId: Ein Attribut im indexierten Dokument, dessen Wert die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer _id-UDF. Die Standardeinstellung ist none.
  • javaScriptIdFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptIdFnName: Der Name der UDF-JavaScript-Funktion, die die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptTypeFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die _type-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptTypeFnName: Der Name der UDF-JavaScript-Funktion, die die _type-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptIsDeleteFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von true oder false zurück. Die Standardeinstellung ist none.
  • javaScriptIsDeleteFnName: Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von true oder false zurück. Die Standardeinstellung ist none.
  • usePartialUpdate: Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist false.
  • bulkInsertMethod: Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung ist CREATE.
  • trustSelfSignedCerts: Gibt an, ob selbst signierten Zertifikaten vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. (Standard: false).
  • disableCertificateValidation: Wenn true, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auf true. Die Standardeinstellung ist false.
  • apiKeyKMSEncryptionKey: Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter ist erforderlich, wenn apiKeySource auf KMS festgelegt ist. Wenn dieser Parameter angegeben wird, muss ein verschlüsselter apiKey-String übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Verwenden Sie für den Schlüssel das Format projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt, z. B. projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: Die Secret Manager-Secret-ID für den apiKey. Geben Sie diesen Parameter an, wenn apiKeySource auf SECRET_MANAGER festgelegt ist. Verwenden Sie das Format projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: Die Quelle des API-Schlüssels. Zulässige Werte sind PLAINTEXT, KMS und SECRET_MANAGER. Dieser Parameter ist erforderlich, wenn Sie Secret Manager oder KMS verwenden. Wenn apiKeySource auf KMS festgelegt ist, müssen apiKeyKMSEncryptionKey und der verschlüsselte apiKey angegeben werden. Wenn apiKeySource auf SECRET_MANAGER festgelegt ist, muss apiKeySecretId angegeben werden. Wenn apiKeySource auf PLAINTEXT festgelegt ist, muss apiKey angegeben werden. Standardeinstellung: PLAINTEXT.
  • socketTimeout: Wenn festgelegt, wird das Standardzeitlimit für die maximale Wiederholung und das Standard-Socket-Zeitlimit (30.000 ms) im Elastic RestClient überschrieben.

Benutzerdefinierte Funktionen

Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Texttransformationsfunktion

Die Pub/Sub-Nachricht wird in ein Elasticsearch-Dokument umgewandelt.

Vorlagenparameter:

  • javascriptTextTransformGcsPath: den Cloud Storage-URI der JavaScript-Datei.
  • javascriptTextTransformFunctionName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.

Indexfunktion

Gibt den Index zurück, zu dem das Dokument gehört.

Vorlagenparameter:

  • javaScriptIndexFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIndexFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _index des Dokuments.

Funktion „Dokument-ID“

Gibt die Dokument-ID zurück.

Vorlagenparameter:

  • javaScriptIdFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIdFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _id des Dokuments.

Funktion zum Löschen von Dokumenten

Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX fest und geben Sie eine Funktion für die Dokument-ID an.

Vorlagenparameter:

  • javaScriptIsDeleteFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIsDeleteFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Geben Sie den String "true" zurück, um das Dokument zu löschen, oder "false", um das Dokument zu aktualisieren.

Funktion für den Abgleichstyp

Gibt den Zuordnungstyp des Dokuments zurück.

Vorlagenparameter:

  • javaScriptTypeFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptTypeFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _type des Dokuments.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Elasticsearch templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • ERROR_OUTPUT_TOPIC: das Pub/Sub-Thema für die Fehlerausgabe
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • CONNECTION_URL: die Elasticsearch-URL
  • DATASET: Ihr Logtyp
  • NAMESPACE: Ihr Namespace für das Dataset
  • APIKEY: der base64-codierte API-Schlüssel für die Authentifizierung

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • ERROR_OUTPUT_TOPIC: das Pub/Sub-Thema für die Fehlerausgabe
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • CONNECTION_URL: die Elasticsearch-URL
  • DATASET: Ihr Logtyp
  • NAMESPACE: Ihr Namespace für das Dataset
  • APIKEY: der base64-codierte API-Schlüssel für die Authentifizierung

Nächste Schritte