Die Vorlage „Pub/Sub für Elasticsearch“ ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und sie als Dokumente in Elasticsearch schreibt. Die Dataflow-Vorlage verwendet die Datenstreams-Funktion von Elasticsearch, um Zeitachsendaten über mehrere Indexe zu speichern, wobei Sie eine einzige benannte Ressource für Anfragen erhalten. Datenstreams eignen sich gut für Logs, Messwerte, Traces und andere kontinuierlich generierte Daten, die in Pub/Sub gespeichert sind.
Mit der Vorlage wird ein Datenstream mit dem Namen logs-gcp.DATASET-NAMESPACE
erstellt. Dabei gilt:
- DATASET ist der Wert des Vorlagenparameters
dataset
oderpubsub
, wenn nicht anders angegeben. - NAMESPACE ist der Wert des Vorlagenparameters
namespace
oderdefault
, wenn nicht anders angegeben.
Pipelineanforderungen
- Das Quell-Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
- Ein öffentlich erreichbarer Elasticsearch-Host auf einer Google Cloud-Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Weitere Informationen finden Sie unter Google Cloud-Integration für Elastic.
- Ein Pub/Sub-Thema für die Fehlerausgabe
Vorlagenparameter
Erforderliche Parameter
- inputSubscription : Pub/Sub-Abo, von dem die Eingabe verarbeitet wird. Der Name muss das Format „projects/your-project-id/subscriptions/your-subscription-name“ haben (Beispiel: projects/your-project-id/subscriptions/your-subscription-name).
- errorOutputTopic: Pub/Sub-Ausgabethema für die Veröffentlichung fehlgeschlagener Datensätze im Format „projects/your-project-id/topics/your-topic-name“.
- connectionUrl : Elasticsearch-URL im Format https://hostname:[port] oder bei Verwendung von Elastic Cloud (Beispiel: https://elasticsearch-host:9200) eine CloudID angeben.
- apiKey : Base64-codierter API-Schlüssel für die Authentifizierung. Informationen hierzu unter: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html#security-api-create-api-key-request.
Optionale Parameter
- dataset : Der Typ von über Pub/Sub gesendeten Logs, für die wir ein sofort einsatzfähiges Dashboard haben. Bekannte Werte für Logtypen sind "audit", "vpcflow" und "firewall". Standardeinstellung: „pubsub“.
- namespace : Eine beliebige Gruppierung, z. B. eine Umgebung (dev, prod oder qa), ein Team oder eine strategische Geschäftseinheit. Standardeinstellung: default
- elasticsearchTemplateVersion: Versions-ID der Dataflow-Vorlage, in der Regel von Google Cloud definiert. Die Standardeinstellung ist 1.0.0.
- javascriptTextTransformGcsPath : Das Cloud Storage-Pfadmuster für den JavaScript-Code, der Ihre benutzerdefinierten Funktionen enthält. Beispiel: gs://Ihr-Bucket/Ihre-Funktion.js.
- javascriptTextTransformFunctionName : Der Name der Funktion, die aus Ihrer JavaScript-Datei aufgerufen werden soll. Verwenden Sie nur Buchstaben, Ziffern und Unterstriche. (Beispiel: „transform“ oder „transform_udf1“).
- javascriptTextTransformReloadIntervalMinutes : Definieren Sie das Intervall, in dem die Worker möglicherweise nach JavaScript-UDF-Änderungen suchen, um die Dateien neu zu laden. Die Standardeinstellung ist 0.
- elasticsearchUsername: Der Elasticsearch-Nutzername, mit dem Sie sich authentifizieren möchten. Wenn dieses angegeben ist, wird der Wert von "apiKey" ignoriert.
- elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von "apiKey" ignoriert.
- batchSize : Batchgröße in der Anzahl an Dokumenten. Standardeinstellung: '1000'.
- batchSizeBytes: Batchgröße in Byte für die Batcheinfügung von Nachrichten in Elasticsearch. Standardeinstellung: '5242880 (5mb)'.
- maxRetryAttempts : Maximale Wiederholungsversuche, muss > 0 sein. Standardeinstellung: 'no retries'.
- maxRetryDuration : Maximale Wiederholungsdauer in Millisekunden, muss > 0 sein. Standardeinstellung: 'no retries'.
- propertyAsIndex : Ein Attribut im indexierten Dokument, dessen Wert angibt, dass "_index"-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer "_index"-UDF). Standardwert: none.
- javaScriptIndexFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass „_index“-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen.Standard: none.
- javaScriptIndexFnName : UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
- propertyAsId : Ein Attribut im indexierten Dokument, dessen Wert angibt, dass "_id"-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer "_id"-UDF). Standardwert: none.
- javaScriptIdFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass „_id“-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen.Standard: none.
- javaScriptIdFnName : UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
- javaScriptTypeFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass „_index“-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen.Standard: none.
- javaScriptTypeFnName: UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass „_type“-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
- javaScriptIsDeleteFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte den Stringwert „true“ oder „false“ zurückgeben. Standardwert: none.
- javaScriptIsDeleteFnName : UDF-JavaScript-Funktionsname für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte den Stringwert „true“ oder „false“ zurückgeben. Standardwert: none.
- usePartialUpdate : Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Standardwert: false
- bulkInsertMethod: Gibt an, ob „INDEX“ (Indexieren, Upserts sind zulässig) oder „CREATE“ (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Standardeinstellung: „CREATE“.
- trustSelfSignedCerts: Gibt an, ob selbst signierten Zertifikaten vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. Standardwert ist False.
- disableCertificateValidation: Wenn „true“, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auf „true“. Standardeinstellung: false.
- apiKeyKMSEncryptionKey : Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter muss angegeben werden, wenn apiKeySource auf KMS gesetzt ist. Wenn dieser Parameter angegeben wird, muss der apiKey-String verschlüsselt übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Der Schlüssel sollte das Format „projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}“ haben. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Beispiel: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
- apiKeySecretId : Secret Manager-ID für den apiKey. Dieser Parameter sollte angegeben werden, wenn apiKeySource auf SECRET_MANAGER festgelegt ist. Muss das Format „projects/{project}/secrets/{secret}/versions/{secret_version}“ haben. (Beispiel: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
- apiKeySource: Quelle des API-Schlüssels. Entweder PLAINTEXT, KMS oder SECRET_MANAGER. Dieser Parameter muss angegeben werden, wenn Secret Manager oder KMS verwendet wird. Wenn apiKeySource auf KMS gesetzt ist, müssen apiKeyKMSEncryptionKey und der verschlüsselte apiKey bereitgestellt werden. Wenn apiKeySource auf „SECRET_MANAGER“ festgelegt ist, muss apiKeySecretId angegeben werden. Wenn apiKeySource auf PLAINTEXT festgelegt ist, muss apiKey angegeben werden. Standardeinstellung: PLAINTEXT.
Benutzerdefinierte Funktionen
Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Texttransformationsfunktion
Die Pub/Sub-Nachricht wird in ein Elasticsearch-Dokument umgewandelt.
Vorlagenparameter:
javascriptTextTransformGcsPath
: den Cloud Storage-URI der JavaScript-Datei.javascriptTextTransformFunctionName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
- Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.
Indexfunktion
Gibt den Index zurück, zu dem das Dokument gehört.
Vorlagenparameter:
javaScriptIndexFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIndexFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_index
des Dokuments.
Funktion „Dokument-ID“
Gibt die Dokument-ID zurück.
Vorlagenparameter:
javaScriptIdFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIdFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_id
des Dokuments.
Funktion zum Löschen von Dokumenten
Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX
fest und geben Sie eine Funktion für die Dokument-ID an.
Vorlagenparameter:
javaScriptIsDeleteFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIsDeleteFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Geben Sie den String
"true"
zurück, um das Dokument zu löschen, oder"false"
, um das Dokument zu aktualisieren.
Funktion für den Abgleichstyp
Gibt den Zuordnungstyp des Dokuments zurück.
Vorlagenparameter:
javaScriptTypeFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptTypeFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_type
des Dokuments.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Elasticsearch templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
ERROR_OUTPUT_TOPIC
: das Pub/Sub-Thema für die FehlerausgabeSUBSCRIPTION_NAME
: der Name Ihres Pub/Sub-AbosCONNECTION_URL
: die Elasticsearch-URLDATASET
: Ihr LogtypNAMESPACE
: Ihr Namespace für das DatasetAPIKEY
: der base64-codierte API-Schlüssel für die Authentifizierung
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch", } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
ERROR_OUTPUT_TOPIC
: das Pub/Sub-Thema für die FehlerausgabeSUBSCRIPTION_NAME
: der Name Ihres Pub/Sub-AbosCONNECTION_URL
: die Elasticsearch-URLDATASET
: Ihr LogtypNAMESPACE
: Ihr Namespace für das DatasetAPIKEY
: der base64-codierte API-Schlüssel für die Authentifizierung
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.