Die Vorlage "BigQuery für Elasticsearch" ist eine Batchpipeline, die Daten aus einer BigQuery-Tabelle als Dokumente in Elasticsearch aufnimmt. Die Vorlage kann entweder die gesamte Tabelle oder bestimmte Datensätze mithilfe einer angegebenen Abfrage lesen.
Pipelineanforderungen
- Die BigQuery-Quelltabelle
- Ein Elasticsearch-Host auf einer Google Cloud -Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Muss über die Dataflow-Worker-Maschinen zugänglich sein.
Vorlagenparameter
Erforderliche Parameter
- connectionUrl: Die Elasticsearch-URL im Format
https://hostname:[port]
. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. Beispiel:https://elasticsearch-host:9200
- apiKey: Der Base64-codierte API-Schlüssel für die Authentifizierung.
- index: Der Elasticsearch-Index, an den die Anfragen gesendet werden. Beispiel:
my-index
.
Optionale Parameter
- inputTableSpec: Die BigQuery-Tabelle, aus der gelesen werden soll. Wenn Sie
inputTableSpec
angeben, liest die Vorlage die Daten mithilfe der BigQuery Storage Read API direkt aus dem BigQuery-Speicher (https://cloud.google.com/bigquery/docs/reference/storage). Informationen zu Einschränkungen in der Storage Read API finden Sie unter https://cloud.google.com/bigquery/docs/reference/storage#limitations. Sie müssen entwederinputTableSpec
oderquery
angeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameterquery
. Beispiel:<BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
. - outputDeadletterTable: Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn eine Tabelle nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Falls nichts angegeben wird, wird
<outputTableSpec>_error_records
verwendet. Beispiel:<PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
- query: Die SQL-Abfrage zum Lesen von Daten aus BigQuery. Wenn sich das BigQuery-Dataset in einem anderen Projekt als der Dataflow-Job befindet, geben Sie den vollständigen Dataset-Namen in der SQL-Abfrage an, z. B.: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Standardmäßig wird für den Parameter
query
GoogleSQL verwendet (https://cloud.google.com/bigquery/docs/introduction-sql), es sei denn,useLegacySql
isttrue
. Sie müssen entwederinputTableSpec
oderquery
angeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameterquery
. Beispiel:select * from sampledb.sample_table
. - useLegacySql: Legen Sie
true
fest, um Legacy-SQL zu verwenden. Dieser Parameter gilt nur, wenn der Parameterquery
verwendet wird. Die Standardeinstellung istfalse
. - queryLocation: Wird benötigt, wenn aus einer autorisierten Ansicht ohne Berechtigung der zugrunde liegenden Tabelle gelesen wird. Beispiel:
US
. - queryTempDataset: Mit dieser Option können Sie ein vorhandenes Dataset festlegen, um die temporäre Tabelle zum Speichern der Ergebnisse der Abfrage zu erstellen. Beispiel:
temp_dataset
. - KMSEncryptionKey: Wenn Sie mithilfe der Abfragequelle Daten aus BigQuery lesen, verwenden Sie diesen Cloud KMS-Schlüssel, um alle erstellten temporären Tabellen zu verschlüsseln. Beispiel:
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
. - elasticsearchUsername: Der Elasticsearch-Nutzername, mit dem Sie sich authentifizieren möchten. Wenn dieses angegeben ist, wird der Wert von
apiKey
ignoriert. - elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von
apiKey
ignoriert. - batchSize: Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist
1000
. - batchSizeBytes: Die Batchgröße in Anzahl der Byte. Standardeinstellung:
5242880
(5 MB). - maxRetryAttempts: Die maximale Anzahl der Wiederholungsversuche. Muss größer als Null (0) sein. Die Standardeinstellung ist
no retries
. - maxRetryDuration: Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist
no retries
. - propertyAsIndex: Das Attribut im indexierten Dokument, dessen Wert die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_index
-UDF. Die Standardeinstellung istnone
. - javaScriptIndexFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptIndexFnName: Der Name der UDF-JavaScript-Funktion, die
_index
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - propertyAsId: Ein Attribut im indexierten Dokument, dessen Wert die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_id
-UDF. Die Standardeinstellung istnone
. - javaScriptIdFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptIdFnName: Der Name der UDF-JavaScript-Funktion, die die
_id
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptTypeFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_type
-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptTypeFnName: Der Name der UDF-JavaScript-Funktion, die die
_type
-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone
. - javaScriptIsDeleteFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
true
oderfalse
zurück. Die Standardeinstellung istnone
. - javaScriptIsDeleteFnName: Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
true
oderfalse
zurück. Die Standardeinstellung istnone
. - usePartialUpdate: Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist
false
. - bulkInsertMethod: Gibt an, ob
INDEX
(Indexieren, Upserts sind zulässig) oderCREATE
(Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung istCREATE
. - trustSelfSignedCerts: Gibt an, ob selbst signierten Zertifikaten vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. (Standard:
false
). - disableCertificateValidation: Wenn
true
, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auftrue
. Die Standardeinstellung istfalse
. - apiKeyKMSEncryptionKey: Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter ist erforderlich, wenn
apiKeySource
aufKMS
festgelegt ist. Wenn dieser Parameter angegeben wird, muss ein verschlüsselterapiKey
-String übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Verwenden Sie für den Schlüssel das Formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt, z. B.projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: Die Secret Manager-Secret-ID für den apiKey. Geben Sie diesen Parameter an, wenn
apiKeySource
aufSECRET_MANAGER
festgelegt ist. Verwenden Sie das Formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: Die Quelle des API-Schlüssels. Zulässige Werte sind
PLAINTEXT
,KMS
undSECRET_MANAGER
. Dieser Parameter ist erforderlich, wenn Sie Secret Manager oder KMS verwenden. WennapiKeySource
aufKMS
festgelegt ist, müssenapiKeyKMSEncryptionKey
und der verschlüsselte apiKey angegeben werden. WennapiKeySource
aufSECRET_MANAGER
festgelegt ist, mussapiKeySecretId
angegeben werden. WennapiKeySource
aufPLAINTEXT
festgelegt ist, mussapiKey
angegeben werden. Standardeinstellung: PLAINTEXT. - socketTimeout: Wenn festgelegt, wird das Standardzeitlimit für die maximale Wiederholung und das Standard-Socket-Zeitlimit (30.000 ms) im Elastic RestClient überschrieben.
- javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel:
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Benutzerdefinierte Funktionen
Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Indexfunktion
Gibt den Index zurück, zu dem das Dokument gehört.
Vorlagenparameter:
javaScriptIndexFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIndexFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_index
des Dokuments.
Funktion „Dokument-ID“
Gibt die Dokument-ID zurück.
Vorlagenparameter:
javaScriptIdFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIdFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_id
des Dokuments.
Funktion zum Löschen von Dokumenten
Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX
fest und geben Sie eine Funktion für die Dokument-ID an.
Vorlagenparameter:
javaScriptIsDeleteFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIsDeleteFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Geben Sie den String
"true"
zurück, um das Dokument zu löschen, oder"false"
, um das Dokument zu aktualisieren.
Funktion für den Abgleichstyp
Gibt den Zuordnungstyp des Dokuments zurück.
Vorlagenparameter:
javaScriptTypeFnGcsPath
: Der Cloud Storage-URI der JavaScript-Datei.javaScriptTypeFnName
: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_type
des Dokuments.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the BigQuery to Elasticsearch templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
INPUT_TABLE_SPEC
: ist der BigQuery-TabellennameCONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-Index
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
INPUT_TABLE_SPEC
: ist der BigQuery-TabellennameCONNECTION_URL
: ist die Elasticsearch-URLAPIKEY
: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX
: ist ihr Elasticsearch-Index
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.