Die Vorlage "Cloud Storage nach Elasticsearch" ist eine Batchpipeline, die Daten aus CSV-Dateien liest, die in einem Cloud Storage-Bucket gespeichert sind, und diese Daten als JSON-Dokumente in Elasticsearch schreibt.
Pipelineanforderungen
- Der Cloud Storage-Bucket muss vorhanden sein.
- Ein Elasticsearch-Host auf einer Google Cloud-Instanz oder in Elasticsearch Cloud, auf den über Dataflow zugegriffen werden kann, muss vorhanden sein.
- Es muss eine BigQuery-Tabelle für die Fehlerausgabe vorhanden sein.
CSV-Schema
Wenn die CSV-Dateien Header enthalten, legen Sie für den Vorlagenparameter containsHeaders
den Wert true
fest.
Erstellen Sie andernfalls eine JSON-Schemadatei, die die Daten beschreibt. Geben Sie den Cloud Storage-URI der Schemadatei im Vorlagenparameter jsonSchemaPath
an. Das folgende Beispiel zeigt ein JSON-Schema:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Alternativ können Sie eine benutzerdefinierte Funktion (UDF) bereitstellen, die den CSV-Text parst und Elasticsearch-Dokumente ausgibt.
Vorlagenparameter
Erforderliche Parameter
- deadletterTable : Die BigQuery-Dead-Letter-Tabelle, an die fehlgeschlagene Einfügungen gesendet werden sollen. (Beispiel: your-project-id:your-dataset.your-table-name).
- inputFileSpec : Das Cloud Storage-Dateimuster für die Suche nach CSV-Dateien. Beispiel: gs://mybucket/test-*.csv.
- connectionUrl: Die Elasticsearch-URL im Format https://hostname:[port]. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. (Beispiel: https://elasticsearch-host:9200).
- apiKey : Der Base64-codierte API-Schlüssel für die Authentifizierung.
- index : Der Elasticsearch-Index, an den die Anfragen ausgegeben werden, z. B.
my-index.
(Beispiel: my-index).
Optionale Parameter
- inputFormat: Format der Eingabedatei. Standardwert: CSV
- containsHeaders : CSV-Eingabedateien enthalten einen Header-Eintrag (true/false). Nur erforderlich, wenn CSV-Dateien gelesen werden. Die Standardeinstellung ist "false".
- delimiter : Das Spaltentrennzeichen der Eingabetextdateien. Standardeinstellung: Verwenden Sie das im csvFormat angegebene Trennzeichen (Beispiel: ,).
- csvFormat : Die CSV-Formatspezifikation zum Parsen von Einträgen. Der Standardwert ist "Default". Weitere Informationen finden Sie unter https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Muss genau mit den Formatnamen übereinstimmen, die unter https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html zu finden sind.
- jsonSchemaPath : Der Pfad zum JSON-Schema. Standardeinstellung: null. (Beispiel: gs://path/to/schema).
- largeNumFiles : Auf "true" setzen, wenn die Anzahl der Dateien im Zehntausenderbereich liegt. Die Standardeinstellung ist "false".
- csvFileEncoding : Das Zeichencodierungsformat der CSV-Datei. Zulässige Werte sind US-ASCII, ISO-8859-1, UTF-8 und UTF-16. Standardmäßig ist dies auf UTF8 eingestellt.
- logDetailedCsvConversionErrors : Setzen Sie den Wert auf "true", um das detaillierte Fehlerlogging zu aktivieren, wenn das CSV-Parsing fehlschlägt. Beachten Sie, dass dadurch vertrauliche Daten in den Protokollen offengelegt werden können (z. B. wenn die CSV-Datei Passwörter enthält). Standardeinstellung: false.
- elasticsearchUsername : Der Elasticsearch-Nutzername, mit dem die Authentifizierung erfolgen soll. Wenn dieses angegeben ist, wird der Wert von "apiKey" ignoriert.
- elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von "apiKey" ignoriert.
- batchSize : Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist 1000.
- batchSizeBytes : Die Batchgröße in Anzahl der Byte. Standardeinstellung: 5242880 (5 MB).
- maxRetryAttempts : Die maximale Anzahl an Wiederholungsversuchen. Muss größer als Null (0) sein. Die Standardeinstellung ist: keine Wiederholungen.
- maxRetryDuration : Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist: keine Wiederholungen.
- propertyAsIndex : Das Attribut im indexierten Dokument, dessen Wert die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_index
-UDF. Die Standardeinstellung ist „Keine“. - javaScriptIndexFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“. - javaScriptIndexFnName : Der Name der UDF-JavaScript-Funktion, die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“. - propertyAsId : Ein Attribut im indexierten Dokument, dessen Wert die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_id
-UDF. Die Standardeinstellung ist „Keine“. - javaScriptIdFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“. - javaScriptIdFnName : Der Name der UDF-JavaScript-Funktion, die die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“. - javaScriptTypeFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_type
-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Standardwert: none. - javaScriptTypeFnName : Der Name der UDF-JavaScript-Funktion, die die
_type
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“. - javaScriptIsDeleteFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
true
oderfalse
zurück. Die Standardeinstellung ist „Keine“. - javaScriptIsDeleteFnName: Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
true
oderfalse
zurück. Die Standardeinstellung ist „Keine“. - usePartialUpdate : Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist "false".
- bulkInsertMethod : Gibt an, ob
INDEX
(Indexieren, Upserts sind zulässig) oderCREATE
(Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung ist CREATE. - trustSelfSignedCerts : Gibt an, ob dem selbst signierten Zertifikat vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. Standardwert ist False.
- disableCertificateValidation: Wenn „true“, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auf „true“. Standardeinstellung: false.
- apiKeyKMSEncryptionKey : Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter muss angegeben werden, wenn apiKeySource auf KMS gesetzt ist. Wenn dieser Parameter angegeben wird, muss der apiKey-String verschlüsselt übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Der Schlüssel sollte das Format „projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}“ haben. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Beispiel: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
- apiKeySecretId : Secret Manager-ID für den apiKey. Dieser Parameter sollte angegeben werden, wenn apiKeySource auf SECRET_MANAGER festgelegt ist. Muss das Format „projects/{project}/secrets/{secret}/versions/{secret_version}“ haben. (Beispiel: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
- apiKeySource: Quelle des API-Schlüssels. Entweder PLAINTEXT, KMS oder SECRET_MANAGER. Dieser Parameter muss angegeben werden, wenn Secret Manager oder KMS verwendet wird. Wenn apiKeySource auf KMS gesetzt ist, müssen apiKeyKMSEncryptionKey und der verschlüsselte apiKey bereitgestellt werden. Wenn apiKeySource auf „SECRET_MANAGER“ festgelegt ist, muss apiKeySecretId angegeben werden. Wenn apiKeySource auf PLAINTEXT festgelegt ist, muss apiKey angegeben werden. Standardeinstellung: PLAINTEXT.
- javascriptTextTransformGcsPath Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. (Beispiel: gs://my-bucket/my-udfs/my_file.js).
- javascriptTextTransformFunctionName Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Benutzerdefinierte Funktionen
Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Texttransformationsfunktion
Wandelt die CSV-Daten in ein Elasticsearch-Dokument um.
Vorlagenparameter:
javascriptTextTransformGcsPath
: den Cloud Storage-URI der JavaScript-Datei.javascriptTextTransformFunctionName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Eine einzelne Zeile aus einer CSV-Eingabedatei
- Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.
Indexfunktion
Gibt den Index zurück, zu dem das Dokument gehört.
Vorlagenparameter:
javaScriptIndexFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIndexFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_index
des Dokuments.
Funktion „Dokument-ID“
Gibt die Dokument-ID zurück.
Vorlagenparameter:
javaScriptIdFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIdFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_id
des Dokuments.
Funktion zum Löschen von Dokumenten
Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX
fest und geben Sie eine Funktion für die Dokument-ID an.
Vorlagenparameter:
javaScriptIsDeleteFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIsDeleteFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Geben Sie den String
"true"
zurück, um das Dokument zu löschen, oder"false"
, um das Dokument zu aktualisieren.
Funktion für den Abgleichstyp
Gibt den Zuordnungstyp des Dokuments zurück.
Vorlagenparameter:
javaScriptTypeFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptTypeFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_type
des Dokuments.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage to Elasticsearch templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
INPUT_FILE_SPEC
: Ihr Cloud Storage-Dateimuster.CONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-IndexDEADLETTER_TABLE
: Ihre BigQuery-Tabelle.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
INPUT_FILE_SPEC
: Ihr Cloud Storage-Dateimuster.CONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-IndexDEADLETTER_TABLE
: Ihre BigQuery-Tabelle.
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.