Die Vorlage "Cloud Storage nach Elasticsearch" ist eine Batchpipeline, die Daten aus CSV-Dateien liest, die in einem Cloud Storage-Bucket gespeichert sind, und diese Daten als JSON-Dokumente in Elasticsearch schreibt.
Pipelineanforderungen
- Der Cloud Storage-Bucket muss vorhanden sein.
- Ein Elasticsearch-Host auf einer Google Cloud -Instanz oder in Elasticsearch Cloud, auf den über Dataflow zugegriffen werden kann, muss vorhanden sein.
- Es muss eine BigQuery-Tabelle für die Fehlerausgabe vorhanden sein.
CSV-Schema
Wenn die CSV-Dateien Header enthalten, legen Sie für den Vorlagenparameter containsHeaders
den Wert true
fest.
Erstellen Sie andernfalls eine JSON-Schemadatei, die die Daten beschreibt. Geben Sie den Cloud Storage-URI der Schemadatei im Vorlagenparameter jsonSchemaPath
an. Das folgende Beispiel zeigt ein JSON-Schema:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Alternativ können Sie eine benutzerdefinierte Funktion (UDF) bereitstellen, die den CSV-Text parst und Elasticsearch-Dokumente ausgibt.
Vorlagenparameter
Erforderliche Parameter
- deadletterTable: Die BigQuery-Dead-Letter-Tabelle, an die fehlgeschlagene Einfügungen gesendet werden sollen. Beispiel:
your-project:your-dataset.your-table-name
. - inputFileSpec: Das Cloud Storage-Dateimuster für die Suche nach CSV-Dateien. Beispiel:
gs://mybucket/test-*.csv
. - connectionUrl: Die Elasticsearch-URL im Format
https://hostname:[port]
. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. Beispiel:https://elasticsearch-host:9200
- apiKey: Der Base64-codierte API-Schlüssel für die Authentifizierung.
- index: Der Elasticsearch-Index, an den die Anfragen gesendet werden. Beispiel:
my-index
.
Optionale Parameter
- inputFormat: Das Format der Eingabedatei. Die Standardeinstellung ist
CSV
. - containsHeaders: CSV-Eingabedateien enthalten einen Header-Eintrag (true/false). Nur erforderlich, wenn CSV-Dateien gelesen werden. Die Standardeinstellung ist "false".
- delimiter: Das Spaltentrennzeichen der Eingabetextdateien. Standard:
,
, z. B.,
. - csvFormat: CSV-Formatspezifikation zum Parsen von Einträgen. Standardwert ist
Default
. Weitere Informationen finden Sie unter https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Muss genau mit den Formatnamen übereinstimmen, die unter https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html zu finden sind. - jsonSchemaPath: Der Pfad zum JSON-Schema. Die Standardeinstellung ist
null
. Beispiel:gs://path/to/schema
- largeNumFiles: Auf „true“ setzen, wenn die Anzahl der Dateien im Zehntausenderbereich liegt. Die Standardeinstellung ist
false
. - csvFileEncoding: Das Zeichencodierungsformat der CSV-Datei. Zulässige Werte sind
US-ASCII
,ISO-8859-1
,UTF-8
undUTF-16
. Standardmäßig ist dies auf UTF8 eingestellt. - logDetailedCsvConversionErrors: Legen Sie
true
fest, um eine detaillierte Fehlerprotokollierung zu aktivieren, wenn das CSV-Parsen fehlschlägt. Beachten Sie, dass dadurch vertrauliche Daten in den Protokollen offengelegt werden können (z.B. wenn die CSV-Datei Passwörter enthält). Standardeinstellung:false
. - elasticsearchUsername: Der Elasticsearch-Nutzername, mit dem Sie sich authentifizieren möchten. Wenn dieses angegeben ist, wird der Wert von
apiKey
ignoriert. - elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von
apiKey
ignoriert. - batchSize: Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist
1000
. - batchSizeBytes: Die Batchgröße in Anzahl der Byte. Standardeinstellung:
5242880
(5 MB). - maxRetryAttempts: Die maximale Anzahl der Wiederholungsversuche. Muss größer als Null (0) sein. Die Standardeinstellung ist
no retries
. - maxRetryDuration: Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist
no retries
. - propertyAsIndex: Das Attribut im indexierten Dokument, dessen Wert die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_index
-UDF. Die Standardeinstellung istnone
. - javaScriptIndexFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptIndexFnName: Der Name der UDF-JavaScript-Funktion, die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - propertyAsId: Ein Attribut im indexierten Dokument, dessen Wert die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_id
-UDF. Die Standardeinstellung istnone
. - javaScriptIdFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptIdFnName: Der Name der UDF-JavaScript-Funktion, die die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptTypeFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_type
-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptTypeFnName: Der Name der UDF-JavaScript-Funktion, die die
_type
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptIsDeleteFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
true
oderfalse
zurück. Die Standardeinstellung istnone
. - javaScriptIsDeleteFnName: Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
true
oderfalse
zurück. Die Standardeinstellung istnone
. - usePartialUpdate: Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist
false
. - bulkInsertMethod: Gibt an, ob
INDEX
(Indexieren, Upserts sind zulässig) oderCREATE
(Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung istCREATE
. - trustSelfSignedCerts: Gibt an, ob selbst signierten Zertifikaten vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. (Standard:
false
). - disableCertificateValidation: Wenn
true
, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auftrue
. Die Standardeinstellung istfalse
. - apiKeyKMSEncryptionKey: Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter ist erforderlich, wenn
apiKeySource
aufKMS
festgelegt ist. Wenn dieser Parameter angegeben wird, muss ein verschlüsselterapiKey
-String übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Verwenden Sie für den Schlüssel das Formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt, z. B.projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: Die Secret Manager-Secret-ID für den apiKey. Geben Sie diesen Parameter an, wenn
apiKeySource
aufSECRET_MANAGER
festgelegt ist. Verwenden Sie das Formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: Die Quelle des API-Schlüssels. Zulässige Werte sind
PLAINTEXT
,KMS
undSECRET_MANAGER
. Dieser Parameter ist erforderlich, wenn Sie Secret Manager oder KMS verwenden. WennapiKeySource
aufKMS
festgelegt ist, müssenapiKeyKMSEncryptionKey
und der verschlüsselte apiKey angegeben werden. WennapiKeySource
aufSECRET_MANAGER
festgelegt ist, mussapiKeySecretId
angegeben werden. WennapiKeySource
aufPLAINTEXT
festgelegt ist, mussapiKey
angegeben werden. Standardeinstellung: PLAINTEXT. - socketTimeout: Wenn festgelegt, wird das Standardzeitlimit für die maximale Wiederholung und das Standard-Socket-Zeitlimit (30.000 ms) im Elastic RestClient überschrieben.
- javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel:
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Benutzerdefinierte Funktionen
Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Texttransformationsfunktion
Die CSV-Daten werden in ein Elasticsearch-Dokument umgewandelt.
Vorlagenparameter:
javascriptTextTransformGcsPath
: den Cloud Storage-URI der JavaScript-Datei.javascriptTextTransformFunctionName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Eine einzelne Zeile aus einer CSV-Eingabedatei
- Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.
Indexfunktion
Gibt den Index zurück, zu dem das Dokument gehört.
Vorlagenparameter:
javaScriptIndexFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIndexFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_index
des Dokuments.
Funktion „Dokument-ID“
Gibt die Dokument-ID zurück.
Vorlagenparameter:
javaScriptIdFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIdFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_id
des Dokuments.
Funktion zum Löschen von Dokumenten
Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX
fest und geben Sie eine Funktion für die Dokument-ID an.
Vorlagenparameter:
javaScriptIsDeleteFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIsDeleteFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Geben Sie den String
"true"
zurück, um das Dokument zu löschen, oder"false"
, um das Dokument zu aktualisieren.
Funktion für den Abgleichstyp
Gibt den Zuordnungstyp des Dokuments zurück.
Vorlagenparameter:
javaScriptTypeFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptTypeFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_type
des Dokuments.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage to Elasticsearch templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
INPUT_FILE_SPEC
: Ihr Cloud Storage-Dateimuster.CONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-IndexDEADLETTER_TABLE
: Ihre BigQuery-Tabelle.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
INPUT_FILE_SPEC
: Ihr Cloud Storage-Dateimuster.CONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-IndexDEADLETTER_TABLE
: Ihre BigQuery-Tabelle.
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.