Vorlage „Cloud Storage nach Elasticsearch“

Die Vorlage "Cloud Storage nach Elasticsearch" ist eine Batchpipeline, die Daten aus CSV-Dateien liest, die in einem Cloud Storage-Bucket gespeichert sind, und diese Daten als JSON-Dokumente in Elasticsearch schreibt.

Pipelineanforderungen

  • Der Cloud Storage-Bucket muss vorhanden sein.
  • Ein Elasticsearch-Host auf einer Google Cloud-Instanz oder in Elasticsearch Cloud, auf den über Dataflow zugegriffen werden kann, muss vorhanden sein.
  • Es muss eine BigQuery-Tabelle für die Fehlerausgabe vorhanden sein.

CSV-Schema

Wenn die CSV-Dateien Header enthalten, legen Sie für den Vorlagenparameter containsHeaders den Wert true fest.

Erstellen Sie andernfalls eine JSON-Schemadatei, die die Daten beschreibt. Geben Sie den Cloud Storage-URI der Schemadatei im Vorlagenparameter jsonSchemaPath an. Das folgende Beispiel zeigt ein JSON-Schema:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Alternativ können Sie eine benutzerdefinierte Funktion (UDF) bereitstellen, die den CSV-Text parst und Elasticsearch-Dokumente ausgibt.

Vorlagenparameter

Parameter Beschreibung
inputFileSpec Das Cloud Storage-Dateimuster für die Suche nach CSV-Dateien. Beispiel: gs://mybucket/test-*.csv.
connectionUrl Elasticsearch-URL im Format https://hostname:[port] oder geben Sie die CloudID an, wenn Elastic Cloud verwendet wird.
apiKey Base64-codierter API-Schlüssel für die Authentifizierung.
index Der Elasticsearch-Index, an den die Anfragen ausgegeben werden, z. B. my-index.
deadletterTable Die BigQuery-Dead-Letter-Tabelle, an die fehlgeschlagene Einfügungen gesendet werden sollen. Beispiel: <your-project>:<your-dataset>.<your-table-name>.
containsHeaders (Optional) Boolescher Wert, der festlegt, ob Header in der CSV-Datei enthalten sein sollen. Standard-false.
delimiter (Optional) Das Trennzeichen, das in der CSV-Datei verwendet wird. Beispiel: ,
csvFormat (Optional) Das CSV-Format gemäß dem Apache Commons CSV-Format. Standardeinstellung: Default.
jsonSchemaPath (Optional) Der Pfad zum JSON-Schema. Standardeinstellung: null.
largeNumFiles (Optional) Auf "true" setzen, wenn die Anzahl der Dateien im Zehntausenderbereich liegt. Standardeinstellung: false.
javascriptTextTransformGcsPath Optional: Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName Optional: Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
batchSize (Optional) Batchgröße in der Anzahl an Dokumenten. Standardeinstellung: 1000.
batchSizeBytes (Optional) Batchgröße in der Anzahl an Byte. Standardeinstellung: 5242880 (5 MB).
maxRetryAttempts (Optional) Maximale Wiederholungsversuche, muss > 0 sein. Standardeinstellung: keine Wiederholungen.
maxRetryDuration (Optional) Maximale Wiederholungsdauer in Millisekunden, muss > 0 sein. Standardeinstellung: keine Wiederholungen.
csvFileEncoding (Optional) CSV-Dateicodierung.
propertyAsIndex (Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_index-UDF). Standardwert: none.
propertyAsId (Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_id-UDF). Standardwert: none.
javaScriptIndexFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIndexFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIdFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIdFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptTypeFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _type-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptTypeFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _type-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIsDeleteFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte einen der Stringwerte "true" oder "false" zurückgeben. Standardwert: none.
javaScriptIsDeleteFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte einen der Stringwerte "true" oder "false" zurückgeben. Standardwert: none.
usePartialUpdate (Optional) Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Standardeinstellung: false.
bulkInsertMethod (Optional) Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Standardeinstellung: CREATE.

Benutzerdefinierte Funktionen

Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Texttransformationsfunktion

Wandelt die CSV-Daten in ein Elasticsearch-Dokument um.

Vorlagenparameter:

  • javascriptTextTransformGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javascriptTextTransformFunctionName: Der Name der JavaScript-Funktion.

Funktionsspezifikation

  • Eingabe: Eine einzelne Zeile aus einer CSV-Eingabedatei
  • Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.

Indexfunktion

Gibt den Index zurück, zu dem das Dokument gehört.

Vorlagenparameter:

  • javaScriptIndexFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIndexFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _index des Dokuments.

Dokument-ID-Funktion

Gibt die Dokument-ID zurück

Vorlagenparameter:

  • javaScriptIdFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIdFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _id des Dokuments.

Funktion zum Löschen von Dokumenten

Gibt an, ob ein Dokument gelöscht werden soll. Um diese Funktion zu verwenden, legen Sie den Modus für Bulk-Einfügung auf INDEX fest und stellen eine Dokument-ID-Funktion bereit.

Vorlagenparameter:

  • javaScriptIsDeleteFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIsDeleteFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Geben Sie den String "true" zurück, um das Dokument zu löschen, oder "false", um das Dokument zu aktualisieren.

Funktion für Zuordnungstyp

Gibt den Zuordnungstyp des Dokuments zurück.

Vorlagenparameter:

  • javaScriptTypeFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptTypeFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _type des Dokuments.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage to Elasticsearch templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • INPUT_FILE_SPEC: Ihr Cloud Storage-Dateimuster.
  • CONNECTION_URL: ist die Elasticsearch-URL
  • APIKEY: ist der base64-codierte API-Schlüssel für die Authentifizierung
  • INDEX: ist ihr Elasticsearch-Index
  • DEADLETTER_TABLE: Ihre BigQuery-Tabelle.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • INPUT_FILE_SPEC: Ihr Cloud Storage-Dateimuster.
  • CONNECTION_URL: ist die Elasticsearch-URL
  • APIKEY: ist der base64-codierte API-Schlüssel für die Authentifizierung
  • INDEX: ist ihr Elasticsearch-Index
  • DEADLETTER_TABLE: Ihre BigQuery-Tabelle.

Nächste Schritte