Vorlage „Cloud Storage nach Elasticsearch“

Die Vorlage "Cloud Storage nach Elasticsearch" ist eine Batchpipeline, die Daten aus CSV-Dateien liest, die in einem Cloud Storage-Bucket gespeichert sind, und diese Daten als JSON-Dokumente in Elasticsearch schreibt.

Pipelineanforderungen

  • Der Cloud Storage-Bucket muss vorhanden sein.
  • Ein Elasticsearch-Host auf einer Google Cloud Instanz oder in Elasticsearch Cloud, auf den über Dataflow zugegriffen werden kann, muss vorhanden sein.
  • Es muss eine BigQuery-Tabelle für die Fehlerausgabe vorhanden sein.

CSV-Schema

Wenn die CSV-Dateien Header enthalten, legen Sie für den Vorlagenparameter containsHeaders den Wert true fest.

Erstellen Sie andernfalls eine JSON-Schemadatei, die die Daten beschreibt. Geben Sie den Cloud Storage-URI der Schemadatei im Vorlagenparameter jsonSchemaPath an. Das folgende Beispiel zeigt ein JSON-Schema:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Alternativ können Sie eine benutzerdefinierte Funktion (UDF) bereitstellen, die den CSV-Text parst und Elasticsearch-Dokumente ausgibt.

Vorlagenparameter

Erforderliche Parameter

  • deadletterTable: Die BigQuery-Dead-Letter-Tabelle, an die fehlgeschlagene Einfügungen gesendet werden sollen. Beispiel: your-project:your-dataset.your-table-name.
  • inputFileSpec: Das Cloud Storage-Dateimuster für die Suche nach CSV-Dateien. Beispiel: gs://mybucket/test-*.csv.
  • connectionUrl: Die Elasticsearch-URL im Format https://hostname:[port]. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. Beispiel: https://elasticsearch-host:9200
  • apiKey: Der Base64-codierte API-Schlüssel für die Authentifizierung.
  • index: Der Elasticsearch-Index, an den die Anfragen gesendet werden. Beispiel: my-index.

Optionale Parameter

  • inputFormat: Das Format der Eingabedatei. Die Standardeinstellung ist CSV.
  • containsHeaders: CSV-Eingabedateien enthalten einen Header-Eintrag (true/false). Nur erforderlich, wenn CSV-Dateien gelesen werden. Die Standardeinstellung ist "false".
  • delimiter: Das Spaltentrennzeichen der Eingabetextdateien. Standard: ,, z. B. ,.
  • csvFormat: CSV-Formatspezifikation zum Parsen von Einträgen. Standardwert ist Default. Weitere Informationen finden Sie unter https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Muss genau mit den Formatnamen übereinstimmen, die unter https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html zu finden sind.
  • jsonSchemaPath: Der Pfad zum JSON-Schema. Die Standardeinstellung ist null. Beispiel: gs://path/to/schema
  • largeNumFiles: Auf „true“ setzen, wenn die Anzahl der Dateien im Zehntausenderbereich liegt. Die Standardeinstellung ist false.
  • csvFileEncoding: Das Zeichencodierungsformat der CSV-Datei. Zulässige Werte sind US-ASCII, ISO-8859-1, UTF-8 und UTF-16. Standardmäßig ist dies auf UTF8 eingestellt.
  • logDetailedCsvConversionErrors: Legen Sie true fest, um detaillierte Fehlerprotokollierung zu aktivieren, wenn das CSV-Parsen fehlschlägt. Beachten Sie, dass dadurch vertrauliche Daten in den Protokollen offengelegt werden können (z.B. wenn die CSV-Datei Passwörter enthält). Standardeinstellung: false.
  • elasticsearchUsername: Der Elasticsearch-Nutzername, mit dem Sie sich authentifizieren möchten. Wenn dieses angegeben ist, wird der Wert von apiKey ignoriert.
  • elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von apiKey ignoriert.
  • batchSize: Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist 1000.
  • batchSizeBytes: Die Batchgröße in Anzahl der Byte. Standardeinstellung: 5242880 (5 MB).
  • maxRetryAttempts: Die maximale Anzahl der Wiederholungsversuche. Muss größer als Null (0) sein. Die Standardeinstellung ist no retries.
  • maxRetryDuration: Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist no retries.
  • propertyAsIndex: Das Attribut im indexierten Dokument, dessen Wert die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer _index-UDF. Die Standardeinstellung ist none.
  • javaScriptIndexFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptIndexFnName: Der Name der UDF-JavaScript-Funktion, die _index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • propertyAsId: Ein Attribut im indexierten Dokument, dessen Wert die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer _id-UDF. Die Standardeinstellung ist none.
  • javaScriptIdFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptIdFnName: Der Name der UDF-JavaScript-Funktion, die die _id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptTypeFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die _type-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptTypeFnName: Der Name der UDF-JavaScript-Funktion, die die _type-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung ist none.
  • javaScriptIsDeleteFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von true oder false zurück. Die Standardeinstellung ist none.
  • javaScriptIsDeleteFnName: Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von true oder false zurück. Die Standardeinstellung ist none.
  • usePartialUpdate: Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist false.
  • bulkInsertMethod: Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung ist CREATE.
  • trustSelfSignedCerts: Gibt an, ob selbst signierten Zertifikaten vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. (Standard: false).
  • disableCertificateValidation: Wenn true, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auf true. Die Standardeinstellung ist false.
  • apiKeyKMSEncryptionKey: Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter ist erforderlich, wenn apiKeySource auf KMS festgelegt ist. Wenn dieser Parameter angegeben wird, muss ein verschlüsselter apiKey-String übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Verwenden Sie für den Schlüssel das Format projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt, z. B. projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: Die Secret Manager-Secret-ID für den apiKey. Geben Sie diesen Parameter an, wenn apiKeySource auf SECRET_MANAGER festgelegt ist. Verwenden Sie das Format projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: Die Quelle des API-Schlüssels. Zulässige Werte sind PLAINTEXT, KMS und SECRET_MANAGER. Dieser Parameter ist erforderlich, wenn Sie Secret Manager oder KMS verwenden. Wenn apiKeySource auf KMS festgelegt ist, müssen apiKeyKMSEncryptionKey und der verschlüsselte apiKey angegeben werden. Wenn apiKeySource auf SECRET_MANAGER festgelegt ist, muss apiKeySecretId angegeben werden. Wenn apiKeySource auf PLAINTEXT festgelegt ist, muss apiKey angegeben werden. Standardeinstellung: PLAINTEXT.
  • socketTimeout: Wenn festgelegt, wird das Standardzeitlimit für die maximale Anzahl von Wiederholungen und das Standard-Socket-Zeitlimit (30.000 ms) im Elastic RestClient überschrieben.
  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Benutzerdefinierte Funktionen

Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Texttransformationsfunktion

Die CSV-Daten werden in ein Elasticsearch-Dokument umgewandelt.

Vorlagenparameter:

  • javascriptTextTransformGcsPath: den Cloud Storage-URI der JavaScript-Datei.
  • javascriptTextTransformFunctionName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Eine einzelne Zeile aus einer CSV-Eingabedatei
  • Ausgabe: Ein String-JSON-Dokument, das in Elasticsearch eingefügt werden soll.

Indexfunktion

Gibt den Index zurück, zu dem das Dokument gehört.

Vorlagenparameter:

  • javaScriptIndexFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIndexFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _index des Dokuments.

Funktion „Dokument-ID“

Gibt die Dokument-ID zurück.

Vorlagenparameter:

  • javaScriptIdFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIdFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _id des Dokuments.

Funktion zum Löschen von Dokumenten

Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX fest und geben Sie eine Funktion für die Dokument-ID an.

Vorlagenparameter:

  • javaScriptIsDeleteFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptIsDeleteFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Geben Sie den String "true" zurück, um das Dokument zu löschen, oder "false", um das Dokument zu aktualisieren.

Funktion für den Abgleichstyp

Gibt den Zuordnungstyp des Dokuments zurück.

Vorlagenparameter:

  • javaScriptTypeFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.
  • javaScriptTypeFnName: Der Name der JavaScript-Funktion.

Funktionsspezifikation:

  • Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
  • Ausgabe: Der Wert des Metadatenfelds _type des Dokuments.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage to Elasticsearch templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • INPUT_FILE_SPEC: Ihr Cloud Storage-Dateimuster.
  • CONNECTION_URL: ist die Elasticsearch-URL
  • APIKEY: ist der base64-codierte API-Schlüssel für die Authentifizierung
  • INDEX: ist ihr Elasticsearch-Index
  • DEADLETTER_TABLE: Ihre BigQuery-Tabelle.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • INPUT_FILE_SPEC: Ihr Cloud Storage-Dateimuster.
  • CONNECTION_URL: ist die Elasticsearch-URL
  • APIKEY: ist der base64-codierte API-Schlüssel für die Authentifizierung
  • INDEX: ist ihr Elasticsearch-Index
  • DEADLETTER_TABLE: Ihre BigQuery-Tabelle.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.GCSToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.CsvConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link GCSToElasticsearch} pipeline exports data from one or more CSV files in Cloud Storage
 * to Elasticsearch.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_GCS_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_to_Elasticsearch",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch",
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Javascript user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      flexContainerName = "gcs-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      }),
  @Template(
      name = "GCS_to_Elasticsearch_Xlang",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch with Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Python user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      flexContainerName = "gcs-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      })
})
public class GCSToElasticsearch {

  /** The tag for the headers of the CSV if required. */
  static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the dead-letter output of the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(GCSToElasticsearch.class);

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    GCSToElasticsearchOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(GCSToElasticsearchOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  private static PipelineResult run(GCSToElasticsearchOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    // Throw error if containsHeaders is true and a schema or Udf is also set.
    if (options.getContainsHeaders()) {
      checkArgument(
          options.getJavascriptTextTransformGcsPath() == null
              && options.getJsonSchemaPath() == null
              && options.getPythonExternalTextTransformGcsPath() == null,
          "Cannot parse file containing headers with UDF or Json schema.");
    }

    // Throw error if only one retry configuration parameter is set.
    checkArgument(
        (options.getMaxRetryAttempts() == null && options.getMaxRetryDuration() == null)
            || (options.getMaxRetryAttempts() != null && options.getMaxRetryDuration() != null),
        "To specify retry configuration both max attempts and max duration must be set.");

    // Throw error if both Javascript UDF and Python UDF are set. We can only apply one or the
    // other.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps: 1) Read records from CSV(s) via {@link CsvConverters.ReadCsv}.
     *        2) Convert lines to JSON strings via {@link CsvConverters.LineToFailsafeJson}.
     *        3a) Write JSON strings as documents to Elasticsearch via {@link ElasticsearchIO}.
     *        3b) Write elements that failed processing to {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
     */
    PCollectionTuple readCsvLines =
        pipeline
            /*
             * Step 1: Read CSV file(s) from Cloud Storage using {@link CsvConverters.ReadCsv}.
             */
            .apply(
            "ReadCsv",
            CsvConverters.ReadCsv.newBuilder()
                .setCsvFormat(options.getCsvFormat())
                .setDelimiter(options.getDelimiter())
                .setHasHeaders(options.getContainsHeaders())
                .setInputFileSpec(options.getInputFileSpec())
                .setHeaderTag(CSV_HEADERS)
                .setLineTag(CSV_LINES)
                .setFileEncoding(options.getCsvFileEncoding())
                .build());
    /*
     * Step 2: Convert lines to Elasticsearch document.
     */
    CsvConverters.LineToFailsafeJson.Builder lineToFailsafeJsonBuilder =
        CsvConverters.LineToFailsafeJson.newBuilder()
            .setDelimiter(options.getDelimiter())
            .setJsonSchemaPath(options.getJsonSchemaPath())
            .setHeaderTag(CSV_HEADERS)
            .setLineTag(CSV_LINES)
            .setUdfOutputTag(PROCESSING_OUT)
            .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT);
    if (options.getPythonExternalTextTransformGcsPath() != null) {
      lineToFailsafeJsonBuilder
          .setPythonUdfFileSystemPath(options.getPythonExternalTextTransformGcsPath())
          .setPythonUdfFunctionName(options.getPythonExternalTextTransformFunctionName());
    } else {
      lineToFailsafeJsonBuilder
          .setJavascriptUdfFileSystemPath(options.getJavascriptTextTransformGcsPath())
          .setJavascriptUdfFunctionName(options.getJavascriptTextTransformFunctionName());
    }
    PCollectionTuple convertedCsvLines =
        readCsvLines.apply("ConvertLine", lineToFailsafeJsonBuilder.build());
    /*
     * Step 3a: Write elements that were successfully processed to Elasticsearch using {@link WriteToElasticsearch}.
     */
    convertedCsvLines
        .get(PROCESSING_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-gcs-to-elasticsearch-template/v2")
                .setOptions(options.as(GCSToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
     */
    convertedCsvLines
        .get(PROCESSING_DEADLETTER_OUT)
        .apply(
            "AddTimestamps",
            WithTimestamps.of((FailsafeElement<String, String> failures) -> new Instant()))
        .apply(
            "WriteFailedElementsToBigQuery",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getDeadletterTable())
                .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                .build());

    return pipeline.run();
  }
}

Nächste Schritte