Die Vorlage "Cloud Storage nach Elasticsearch" ist eine Batchpipeline, die Daten aus CSV-Dateien liest, die in einem Cloud Storage-Bucket gespeichert sind, und diese Daten als JSON-Dokumente in Elasticsearch schreibt.
Pipelineanforderungen
- Der Cloud Storage-Bucket muss vorhanden sein.
- Ein Elasticsearch-Host auf einer Google Cloud-Instanz oder in Elasticsearch Cloud, auf den über Dataflow zugegriffen werden kann, muss vorhanden sein.
- Es muss eine BigQuery-Tabelle für die Fehlerausgabe vorhanden sein.
CSV-Schema
Wenn die CSV-Dateien Header enthalten, legen Sie für den Vorlagenparameter containsHeaders
den Wert true
fest.
Erstellen Sie andernfalls eine JSON-Schemadatei, die die Daten beschreibt. Geben Sie den Cloud Storage-URI der Schemadatei im Vorlagenparameter jsonSchemaPath
an. Das folgende Beispiel zeigt ein JSON-Schema:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Alternativ können Sie eine benutzerdefinierte Funktion (UDF) bereitstellen, die den CSV-Text parst und Elasticsearch-Dokumente ausgibt.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputFileSpec |
Das Cloud Storage-Dateimuster für die Suche nach CSV-Dateien. Beispiel: gs://mybucket/test-*.csv . |
connectionUrl |
Elasticsearch-URL im Format https://hostname:[port] oder geben Sie die CloudID an, wenn Elastic Cloud verwendet wird. |
apiKey |
Base64-codierter API-Schlüssel für die Authentifizierung. |
index |
Der Elasticsearch-Index, an den die Anfragen ausgegeben werden, z. B. my-index . |
deadletterTable |
Die BigQuery-Dead-Letter-Tabelle, an die fehlgeschlagene Einfügungen gesendet werden sollen. Beispiel: <your-project>:<your-dataset>.<your-table-name> . |
containsHeaders |
(Optional) Boolescher Wert, der festlegt, ob Header in der CSV-Datei enthalten sein sollen. Standard-false . |
delimiter |
(Optional) Das Trennzeichen, das in der CSV-Datei verwendet wird. Beispiel: , |
csvFormat |
(Optional) Das CSV-Format gemäß dem Apache Commons CSV-Format. Standardeinstellung: Default . |
jsonSchemaPath |
(Optional) Der Pfad zum JSON-Schema. Standardeinstellung: null . |
largeNumFiles |
(Optional) Auf "true" setzen, wenn die Anzahl der Dateien im Zehntausenderbereich liegt. Standardeinstellung: false . |
javascriptTextTransformGcsPath |
Optional:
Der Cloud Storage-URI der Datei .js , in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
|
javascriptTextTransformFunctionName |
Optional:
Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten.
Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform . Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
|
batchSize |
(Optional) Batchgröße in der Anzahl an Dokumenten. Standardeinstellung: 1000 . |
batchSizeBytes |
(Optional) Batchgröße in der Anzahl an Byte. Standardeinstellung: 5242880 (5 MB). |
maxRetryAttempts |
(Optional) Maximale Wiederholungsversuche, muss > 0 sein. Standardeinstellung: keine Wiederholungen. |
maxRetryDuration |
(Optional) Maximale Wiederholungsdauer in Millisekunden, muss > 0 sein. Standardeinstellung: keine Wiederholungen. |
csvFileEncoding |
(Optional) CSV-Dateicodierung. |
propertyAsIndex |
(Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _index -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_index -UDF). Standardwert: none. |
propertyAsId |
(Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _id -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_id -UDF). Standardwert: none. |
javaScriptIndexFnGcsPath |
(Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _index -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none. |
javaScriptIndexFnName |
(Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _index -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none. |
javaScriptIdFnGcsPath |
(Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _id -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none. |
javaScriptIdFnName |
(Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _id -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none. |
javaScriptTypeFnGcsPath |
(Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _type -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none. |
javaScriptTypeFnName |
(Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _type -Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none. |
javaScriptIsDeleteFnGcsPath |
(Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte einen der Stringwerte "true" oder "false" zurückgeben. Standardwert: none. |
javaScriptIsDeleteFnName |
(Optional) UDF-JavaScript-Funktionsname für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte einen der Stringwerte "true" oder "false" zurückgeben. Standardwert: none. |
usePartialUpdate |
(Optional) Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Standardeinstellung: false . |
bulkInsertMethod |
(Optional) Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Standardeinstellung: CREATE . |
Benutzerdefinierte Funktionen
Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Texttransformationsfunktion
Wandelt die CSV-Daten in ein Elasticsearch-Dokument um.
Vorlagenparameter:
javascriptTextTransformGcsPath
: der Cloud Storage-URI der JavaScript-Datei.javascriptTextTransformFunctionName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation
- Eingabe: Eine einzelne Zeile aus einer CSV-Eingabedatei
- Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.
Indexfunktion
Gibt den Index zurück, zu dem das Dokument gehört.
Vorlagenparameter:
javaScriptIndexFnGcsPath
: der Cloud Storage-URI der JavaScript-Datei.javaScriptIndexFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_index
des Dokuments.
Dokument-ID-Funktion
Gibt die Dokument-ID zurück
Vorlagenparameter:
javaScriptIdFnGcsPath
: der Cloud Storage-URI der JavaScript-Datei.javaScriptIdFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_id
des Dokuments.
Funktion zum Löschen von Dokumenten
Gibt an, ob ein Dokument gelöscht werden soll. Um diese Funktion zu verwenden, legen Sie den Modus für Bulk-Einfügung auf INDEX
fest und stellen eine Dokument-ID-Funktion bereit.
Vorlagenparameter:
javaScriptIsDeleteFnGcsPath
: der Cloud Storage-URI der JavaScript-Datei.javaScriptIsDeleteFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Geben Sie den String
"true"
zurück, um das Dokument zu löschen, oder"false"
, um das Dokument zu aktualisieren.
Funktion für Zuordnungstyp
Gibt den Zuordnungstyp des Dokuments zurück.
Vorlagenparameter:
javaScriptTypeFnGcsPath
: der Cloud Storage-URI der JavaScript-Datei.javaScriptTypeFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_type
des Dokuments.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage to Elasticsearch templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Ersetzen Sie Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
INPUT_FILE_SPEC
: Ihr Cloud Storage-Dateimuster.CONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-IndexDEADLETTER_TABLE
: Ihre BigQuery-Tabelle.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
INPUT_FILE_SPEC
: Ihr Cloud Storage-Dateimuster.CONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-IndexDEADLETTER_TABLE
: Ihre BigQuery-Tabelle.
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.