Modelo do Cloud Storage para Elasticsearch

O modelo do Cloud Storage para Elasticsearch é um pipeline em lote que lê dados de arquivos CSV armazenados em um bucket do Cloud Storage e os grava no Elasticsearch como documentos JSON.

Requisitos de pipeline

  • O bucket do Cloud Storage precisa existir.
  • É necessário que haja um host do Elasticsearch em uma instância do Google Cloud ou no Elasticsearch Cloud acessível pelo Dataflow.
  • Uma tabela do BigQuery para saída de erros precisa existir.

Esquema CSV

Se os arquivos CSV tiverem cabeçalhos, defina o parâmetro de modelo containsHeaders como true.

Caso contrário, crie um arquivo de esquema JSON que descreva os dados. Especifique o URI do Cloud Storage do arquivo de esquema no parâmetro de modelo jsonSchemaPath. O exemplo a seguir mostra um esquema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Como alternativa, é possível fornecer uma função definida pelo usuário (UDF, na sigla em inglês) que analisa o texto CSV e gera documentos do Elasticsearch.

Parâmetros do modelo

Parâmetros obrigatórios

  • deadletterTable: a tabela de mensagens inativas do BigQuery para onde enviar inserções com falha. Por exemplo, your-project:your-dataset.your-table-name.
  • inputFileSpec: o padrão de arquivo do Cloud Storage para pesquisar arquivos CSV. Por exemplo, gs://mybucket/test-*.csv.
  • connectionUrl: o URL do Elasticsearch no formato https://hostname:[port]. Se estiver usando o Elastic Cloud, especifique o CloudID. Por exemplo, https://elasticsearch-host:9200.
  • apiKey: a chave de API codificada em Base64 que será usada para autenticação.
  • index: o índice do Elasticsearch para o qual as solicitações são emitidas. Por exemplo, my-index.

Parâmetros opcionais

  • inputFormat: o formato do arquivo de entrada. O padrão é CSV.
  • containsHeaders: os arquivos CSV de entrada contêm um registro de cabeçalho (verdadeiro/falso). Obrigatório apenas ao ler arquivos CSV. O padrão é: falso.
  • delimitador: o delimitador de coluna dos arquivos de texto de entrada. Padrão: , Por exemplo, ,.
  • csvFormat: a especificação de formato CSV a ser usada para analisar registros. O padrão é Default. Consulte https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html para mais detalhes. Precisa corresponder exatamente aos nomes de formato encontrados em: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html.
  • jsonSchemaPath: o caminho para o esquema JSON. O padrão é null. Por exemplo, gs://path/to/schema.
  • largeNumFiles: defina como "verdadeiro" se o número de arquivos estiver na casa de milhares. O padrão é false.
  • csvFileEncoding: o formato de codificação de caracteres do arquivo CSV. Os valores permitidos são US-ASCII, ISO-8859-1, UTF-8 e UTF-16. O padrão é UTF-8.
  • logDetailedCsvConversionErrors: defina como true para ativar o registro detalhado de erros quando a análise de CSV falhar. Isso pode expor dados sensíveis nos registros, por exemplo, se o arquivo CSV contiver senhas. Padrão: false.
  • elasticsearchUsername: o nome de usuário do Elasticsearch usado para autenticação. Se especificado, o valor de apiKey será ignorado.
  • elasticsearchPassword: a senha do Elasticsearch para autenticação. Se especificado, o valor de apiKey será ignorado.
  • batchSize: o tamanho do lote em número de documentos. O padrão é 1000.
  • batchSizeBytes: o tamanho do lote em número de bytes. O padrão é 5242880 (5 MB).
  • maxRetryAttempts: o número máximo de novas tentativas. Precisa ser maior que zero. O padrão é no retries.
  • maxRetryDuration: a duração máxima da nova tentativa em milissegundos. Precisa ser maior que zero. O padrão é no retries.
  • propertyAsIndex: uma propriedade no documento que está sendo indexada com um valor que especifica os metadados _index a serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF _index. O padrão é none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especifica os metadados _index a serem incluídos no documento em solicitações em massa. O padrão é none.
  • javaScriptIndexFnName: o nome da função JavaScript da UDF que especifica os metadados _index a serem incluídos no documento em solicitações em massa. O padrão é none.
  • propertyAsId: uma propriedade no documento que está sendo indexada com um valor que especifica metadados _id a serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF _id. O padrão é none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que especifica os metadados _id a serem incluídos no documento em solicitações em massa. O padrão é none.
  • javaScriptIdFnName: o nome da função JavaScript da UDF que especifica os metadados _id a serem incluídos com o documento em solicitações em massa. O padrão é none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especifica os metadados _type a serem incluídos com documentos em solicitações em massa. O padrão é none.
  • javaScriptTypeFnName: o nome da função JavaScript da UDF que especifica os metadados _type a serem incluídos com o documento em solicitações em massa. O padrão é none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que determina se o documento precisa ser excluído em vez de inserido ou atualizado. A função retorna um valor de string de true ou false. O padrão é none.
  • javaScriptIsDeleteFnName: o nome da função JavaScript da UDF que determina se é necessário excluir o documento em vez de inserir ou atualizar. A função retorna um valor de string de true ou false. O padrão é none.
  • usePartialUpdate: indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. O padrão é false.
  • bulkInsertMethod: indica se é necessário usar INDEX (índice, permite ajustes) ou CREATE (criar, erros em _id duplicados) com solicitações em massa do Elasticsearch. O padrão é CREATE.
  • trustSelfSignedCerts: se é possível ou não confiar em um certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative essa opção como "True" para ignorar a validação no certificado SSL. O padrão é false.
  • disableCertificateValidation: se for true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina esse parâmetro como true. O padrão é false.
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para descriptografar a chave de API. Este parâmetro será obrigatório se apiKeySource for definido como KMS. Se esse parâmetro for fornecido, transmita uma string apiKey criptografada. Criptografe parâmetros usando o endpoint de criptografia da API KMS. Para a chave, use o formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: o ID do secret do Secret Manager para a chave de API. Se apiKeySource estiver definido como SECRET_MANAGER, forneça esse parâmetro. Use o formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: a origem da chave de API. Os valores permitidos são PLAINTEXT, KMS e SECRET_MANAGER. Esse parâmetro é obrigatório quando você usa o Secret Manager ou o KMS. Se apiKeySource estiver definido como KMS, apiKeyKMSEncryptionKey e a chave apiKey criptografada precisar ser fornecida. Se apiKeySource estiver definido como SECRET_MANAGER, apiKeySecretId precisará ser fornecido. Se apiKeySource estiver definido como PLAINTEXT, apiKey precisará ser fornecido. O padrão é: PLAINTEXT.
  • socketTimeout: se definido, substitui o tempo limite máximo de novas tentativas e o tempo limite de soquete padrão (30000ms) no RestClient do Elastic.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funções definidas pelo usuário

Esse modelo é compatível com funções definidas pelo usuário (UDFs) em vários pontos do pipeline, descritas abaixo. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Função de transformação de texto

Transforma os dados CSV em um documento do Elasticsearch.

Parâmetros do modelo:

  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javascriptTextTransformFunctionName: o nome da função JavaScript.

Especificação da função:

  • Entrada: uma única linha de um arquivo CSV de entrada.
  • Saída: um documento JSON em formato de string para inserir no Elasticsearch.

Função de índice

Retorna o índice ao qual o documento pertence.

Parâmetros do modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _index do documento.

Função ID do documento

Retorna o ID do documento.

Parâmetros do modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _id do documento.

Função de exclusão de documentos

Especifica se um documento deve ser excluído. Para usar essa função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros do modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: retorna a string "true" para excluir o documento ou "false" para manter o documento.

Função do tipo de mapeamento

Retorna o tipo de mapeamento do documento.

Parâmetros do modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _type do documento.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Storage to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • INPUT_FILE_SPEC: o padrão de arquivo do Cloud Storage.
  • CONNECTION_URL: seu URL do Elasticsearch
  • APIKEY: sua chave de API codificada em base64 para autenticação.
  • INDEX: seu índice do Elasticsearch.
  • DEADLETTER_TABLE: sua tabela do BigQuery.

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • INPUT_FILE_SPEC: o padrão de arquivo do Cloud Storage.
  • CONNECTION_URL: seu URL do Elasticsearch
  • APIKEY: sua chave de API codificada em base64 para autenticação.
  • INDEX: seu índice do Elasticsearch.
  • DEADLETTER_TABLE: sua tabela do BigQuery.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.GCSToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.CsvConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link GCSToElasticsearch} pipeline exports data from one or more CSV files in Cloud Storage
 * to Elasticsearch.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_GCS_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_to_Elasticsearch",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch",
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Javascript user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      flexContainerName = "gcs-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      }),
  @Template(
      name = "GCS_to_Elasticsearch_Xlang",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch with Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Python user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      flexContainerName = "gcs-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      })
})
public class GCSToElasticsearch {

  /** The tag for the headers of the CSV if required. */
  static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the dead-letter output of the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(GCSToElasticsearch.class);

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    GCSToElasticsearchOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(GCSToElasticsearchOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  private static PipelineResult run(GCSToElasticsearchOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    // Throw error if containsHeaders is true and a schema or Udf is also set.
    if (options.getContainsHeaders()) {
      checkArgument(
          options.getJavascriptTextTransformGcsPath() == null
              && options.getJsonSchemaPath() == null
              && options.getPythonExternalTextTransformGcsPath() == null,
          "Cannot parse file containing headers with UDF or Json schema.");
    }

    // Throw error if only one retry configuration parameter is set.
    checkArgument(
        (options.getMaxRetryAttempts() == null && options.getMaxRetryDuration() == null)
            || (options.getMaxRetryAttempts() != null && options.getMaxRetryDuration() != null),
        "To specify retry configuration both max attempts and max duration must be set.");

    // Throw error if both Javascript UDF and Python UDF are set. We can only apply one or the
    // other.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps: 1) Read records from CSV(s) via {@link CsvConverters.ReadCsv}.
     *        2) Convert lines to JSON strings via {@link CsvConverters.LineToFailsafeJson}.
     *        3a) Write JSON strings as documents to Elasticsearch via {@link ElasticsearchIO}.
     *        3b) Write elements that failed processing to {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
     */
    PCollectionTuple readCsvLines =
        pipeline
            /*
             * Step 1: Read CSV file(s) from Cloud Storage using {@link CsvConverters.ReadCsv}.
             */
            .apply(
            "ReadCsv",
            CsvConverters.ReadCsv.newBuilder()
                .setCsvFormat(options.getCsvFormat())
                .setDelimiter(options.getDelimiter())
                .setHasHeaders(options.getContainsHeaders())
                .setInputFileSpec(options.getInputFileSpec())
                .setHeaderTag(CSV_HEADERS)
                .setLineTag(CSV_LINES)
                .setFileEncoding(options.getCsvFileEncoding())
                .build());
    /*
     * Step 2: Convert lines to Elasticsearch document.
     */
    CsvConverters.LineToFailsafeJson.Builder lineToFailsafeJsonBuilder =
        CsvConverters.LineToFailsafeJson.newBuilder()
            .setDelimiter(options.getDelimiter())
            .setJsonSchemaPath(options.getJsonSchemaPath())
            .setHeaderTag(CSV_HEADERS)
            .setLineTag(CSV_LINES)
            .setUdfOutputTag(PROCESSING_OUT)
            .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT);
    if (options.getPythonExternalTextTransformGcsPath() != null) {
      lineToFailsafeJsonBuilder
          .setPythonUdfFileSystemPath(options.getPythonExternalTextTransformGcsPath())
          .setPythonUdfFunctionName(options.getPythonExternalTextTransformFunctionName());
    } else {
      lineToFailsafeJsonBuilder
          .setJavascriptUdfFileSystemPath(options.getJavascriptTextTransformGcsPath())
          .setJavascriptUdfFunctionName(options.getJavascriptTextTransformFunctionName());
    }
    PCollectionTuple convertedCsvLines =
        readCsvLines.apply("ConvertLine", lineToFailsafeJsonBuilder.build());
    /*
     * Step 3a: Write elements that were successfully processed to Elasticsearch using {@link WriteToElasticsearch}.
     */
    convertedCsvLines
        .get(PROCESSING_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-gcs-to-elasticsearch-template/v2")
                .setOptions(options.as(GCSToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
     */
    convertedCsvLines
        .get(PROCESSING_DEADLETTER_OUT)
        .apply(
            "AddTimestamps",
            WithTimestamps.of((FailsafeElement<String, String> failures) -> new Instant()))
        .apply(
            "WriteFailedElementsToBigQuery",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getDeadletterTable())
                .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                .build());

    return pipeline.run();
  }
}

A seguir