Vorlage „BigQuery für Elasticsearch“

Die Vorlage "BigQuery für Elasticsearch" ist eine Batchpipeline, die Daten aus einer BigQuery-Tabelle als Dokumente in Elasticsearch aufnimmt. Die Vorlage kann entweder die gesamte Tabelle oder bestimmte Datensätze mithilfe einer angegebenen Abfrage lesen.

Pipelineanforderungen

  • Die BigQuery-Quelltabelle
  • Ein Elasticsearch-Host auf einer Google Cloud-Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Muss über die Dataflow-Worker-Maschinen zugänglich sein.

Vorlagenparameter

Erforderliche Parameter

  • connectionUrl : Die Elasticsearch-URL im Format https://hostname:[port]. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. (Beispiel: https://elasticsearch-host:9200).
  • apiKey : Der Base64-codierte API-Schlüssel für die Authentifizierung.
  • index : Der Elasticsearch-Index, an den die Anfragen ausgegeben werden, z. B. my-index. (Beispiel: my-index).

Optionale Parameter

  • inputTableSpec Die BigQuery-Tabelle, aus der gelesen werden soll. Format: projectId:datasetId.tablename. Wenn Sie inputTableSpec angeben, liest die Vorlage die Daten mithilfe der BigQuery Storage Read API direkt aus dem BigQuery-Speicher (https://cloud.google.com/bigquery/docs/reference/storage). Informationen zu Einschränkungen bei der Storage Read API finden Sie unter https://cloud.google.com/bigquery/docs/reference/storage#limitations. Sie müssen entweder inputTableSpec oder query angeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameter query. Beispiel: bigquery-project:dataset.input_table.
  • outputDeadletterTable : Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben, im Format <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>. Wenn keine Tabelle vorhanden ist, wird sie während der Pipelineausführung erstellt. Falls nichts angegeben wird, wird <outputTableSpec>_error_records verwendet. (Beispiel: your-project-id:your-dataset.your-table-name).
  • query : Die SQL-Abfrage zum Lesen von Daten aus BigQuery. Wenn sich das BigQuery-Dataset in einem anderen Projekt als der Dataflow-Job befindet, geben Sie den vollständigen Dataset-Namen in der SQL-Abfrage an, z. B. <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Standardmäßig verwendet der Parameter query GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), es sei denn useLegacySql ist true. Sie müssen entweder inputTableSpec oder query angeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameter query. (Beispiel: Wählen Sie "*" aus sampledb.sample_table aus).
  • useLegacySql : Für die Verwendung des Legacy-SQL-Dialekts legen Sie "true" fest. Dieser Parameter gilt nur, wenn der Parameter query verwendet wird. Die Standardeinstellung ist "false".
  • queryLocation : Erforderlich, wenn aus einer autorisierten Ansicht ohne die Berechtigung der zugrunde liegenden Tabelle gelesen wird. (Beispiel: US).
  • elasticsearchUsername : Der Elasticsearch-Nutzername für die Authentifizierung. Wenn dieses angegeben ist, wird der Wert von "apiKey" ignoriert.
  • elasticsearchPassword : Das Elasticsearch-Passwort, mit dem die Authentifizierung erfolgen soll. Wenn dieses angegeben ist, wird der Wert von "apiKey" ignoriert.
  • batchSize : Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist 1000.
  • batchSizeBytes : Die Batchgröße in Anzahl der Byte. Die Standardeinstellung ist: 5242880 (5mb).
  • maxRetryAttempts : Die maximale Anzahl von Wiederholungsversuchen. Muss größer als Null (0) sein. Die Standardeinstellung ist: keine Wiederholungen.
  • maxRetryDuration : Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist: keine Wiederholungen.
  • propertyAsIndex : Die Eigenschaft im zu indexierenden Dokument, deren Wert _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer UDF vom Typ _index. Die Standardeinstellung ist „Keine“.
  • javaScriptIndexFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“.
  • javaScriptIndexFnName : Der Name der UDF-JavaScript-Funktion, die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“.
  • propertyAsId : Ein Attribut im indexierten Dokument, dessen Wert die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer _id-UDF. Die Standardeinstellung ist „Keine“.
  • javaScriptIdFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“.
  • javaScriptIdFnName : Der Name der UDF-JavaScript-Funktion, die die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“.
  • javaScriptTypeFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die _type-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Standardwert: none.
  • javaScriptTypeFnName : Der Name der UDF-JavaScript-Funktion, die die _type-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist „Keine“.
  • javaScriptIsDeleteFnGcsPath : Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von true oder false zurück. Die Standardeinstellung ist „Keine“.
  • javaScriptIsDeleteFnName : Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von true oder false zurück. Die Standardeinstellung ist „Keine“.
  • usePartialUpdate : Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist "false".
  • bulkInsertMethod : Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung ist CREATE.
  • trustSelfSignedCerts : Gibt an, ob dem selbst signierten Zertifikat vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbst signiertes Zertifikat. Aktivieren Sie diese Option auf „True“, um die Validierung des SSL-Zertifikats zu umgehen. Standardwert ist False.
  • disableCertificateValidation : Wenn "true", wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Setzen Sie diesen Parameter auf "true", um die Überprüfung des Zertifikats zu umgehen. Standardeinstellung: false.
  • apiKeyKMSEncryptionKey : Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter muss angegeben werden, wenn apiKeySource auf KMS gesetzt ist. Wenn dieser Parameter angegeben wird, sollte der apiKey-String verschlüsselt übergeben werden. Parameter mit dem KMS API-Verschlüsselungsendpunkt verschlüsseln Der Schlüssel muss das Format „projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}“ haben. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Beispiel: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId : Secret Manager-ID für den apiKey. Dieser Parameter sollte angegeben werden, wenn die apiKeySource auf SECRET_MANAGER festgelegt ist. Muss im Format „projects/{project}/secrets/{secret}/versions/{secret_version}“ vorliegen. Beispiel: projects/your-project-id/secrets/your-secret/versions/your-secret-version.
  • apiKeySource : Quelle des API-Schlüssels. Entweder PLAINTEXT, KMS oder SECRET_MANAGER. Dieser Parameter muss angegeben werden, wenn Secret Manager oder KMS verwendet wird. Wenn apiKeySource auf KMS festgelegt ist, müssen apiKeyKMSEncryptionKey und verschlüsselter apiKey angegeben werden. Wenn apiKeySource auf SECRET_MANAGER festgelegt ist, muss apiKeySecretId angegeben werden. Wenn apiKeySource auf PLAINTEXT festgelegt ist, muss apiKey bereitgestellt werden. Die Standardeinstellung ist PLAINTEXT.
  • javascriptTextTransformGcsPath Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. (Beispiel: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Benutzerdefinierte Funktionen

Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Indexfunktion

Gibt den Index zurück, zu dem das Dokument gehört.

Vorlagenparameter:

  • javaScriptIndexFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIndexFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _index des Dokuments.

Dokument-ID-Funktion

Gibt die Dokument-ID zurück

Vorlagenparameter:

  • javaScriptIdFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIdFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _id des Dokuments.

Funktion zum Löschen von Dokumenten

Gibt an, ob ein Dokument gelöscht werden soll. Um diese Funktion zu verwenden, legen Sie den Modus für Bulk-Einfügung auf INDEX fest und stellen eine Dokument-ID-Funktion bereit.

Vorlagenparameter:

  • javaScriptIsDeleteFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIsDeleteFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Geben Sie den String "true" zurück, um das Dokument zu löschen, oder "false", um das Dokument zu aktualisieren.

Funktion für Zuordnungstyp

Gibt den Zuordnungstyp des Dokuments zurück.

Vorlagenparameter:

  • javaScriptTypeFnGcsPath: der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptTypeFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _type des Dokuments.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the BigQuery to Elasticsearch templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • INPUT_TABLE_SPEC: ist der BigQuery-Tabellenname
  • CONNECTION_URL: ist die Elasticsearch-URL
  • APIKEY: ist der base64-codierte API-Schlüssel für die Authentifizierung
  • INDEX: ist ihr Elasticsearch-Index

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • INPUT_TABLE_SPEC: ist der BigQuery-Tabellenname
  • CONNECTION_URL: ist die Elasticsearch-URL
  • APIKEY: ist der base64-codierte API-Schlüssel für die Authentifizierung
  • INDEX: ist ihr Elasticsearch-Index

Nächste Schritte