Modello Cloud Storage to Elasticsearch

Il modello da Cloud Storage a Elasticsearch è una pipeline batch che legge i dati dai file CSV archiviati in un bucket Cloud Storage e li scrive in Elasticsearch come documenti JSON.

Requisiti della pipeline

  • Il bucket Cloud Storage deve esistere.
  • Deve esistere un host Elasticsearch su un' Google Cloud istanza o su Elasticsearch Cloud accessibile da Dataflow.
  • Deve esistere una tabella BigQuery per l'output di errore.

Schema CSV

Se i file CSV contengono intestazioni, imposta il parametro del modello containsHeaders su true.

In caso contrario, crea un file schema JSON che descriva i dati. Specifica l'URI Cloud Storage del file dello schema nel parametro jsonSchemaPath del modello. L'esempio seguente mostra uno schema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

In alternativa, puoi fornire una funzione definita dall'utente (UDF) che analizzi il testo CSV e generi documenti Elasticsearch.

Parametri del modello

Parametri obbligatori

  • deadletterTable: la tabella BigQuery per i messaggi non recapitabili a cui inviare gli inserimenti non riusciti. Ad esempio, your-project:your-dataset.your-table-name.
  • inputFileSpec: il pattern di file Cloud Storage per la ricerca dei file CSV. Ad esempio, gs://mybucket/test-*.csv.
  • connectionUrl: l'URL di Elasticsearch nel formato https://hostname:[port]. Se utilizzi Elastic Cloud, specifica il CloudID. Ad esempio: https://elasticsearch-host:9200.
  • apiKey: la chiave API codificata in Base64 da utilizzare per l'autenticazione.
  • index: l'indice Elasticsearch a cui vengono inviate le richieste. Ad esempio, my-index.

Parametri facoltativi

  • inputFormat: il formato del file di input. Il valore predefinito è CSV.
  • containsHeaders: i file CSV di input contengono un record di intestazione (true/false). Obbligatorio solo se si leggono file CSV. Il valore predefinito è false.
  • delimiter: il delimitatore di colonna dei file di testo di input. Valore predefinito: ,, ad esempio ,.
  • csvFormat: specifica del formato CSV da utilizzare per l'analisi dei record. Il valore predefinito è Default. Per ulteriori dettagli, visita la pagina https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Deve corrispondere esattamente ai nomi dei formati disponibili all'indirizzo: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html.
  • jsonSchemaPath: il percorso allo schema JSON. Il valore predefinito è null. Ad esempio: gs://path/to/schema.
  • largeNumFiles: impostato su true se il numero di file è compreso tra le decine di migliaia. Il valore predefinito è false.
  • csvFileEncoding: il formato di codifica dei caratteri del file CSV. I valori consentiti sono US-ASCII, ISO-8859-1, UTF-8 e UTF-16. Il valore predefinito è UTF-8.
  • logDetailedCsvConversionErrors: impostato su true per attivare la registrazione degli errori dettagliata quando l'analisi del file CSV non va a buon fine. Tieni presente che questa operazione potrebbe esporre dati sensibili nei log (ad esempio se il file CSV contiene password). Valore predefinito: false.
  • elasticsearchUsername: il nome utente di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di apiKey viene ignorato.
  • elasticsearchPassword: la password di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di apiKey viene ignorato.
  • batchSize: le dimensioni del batch in numero di documenti. Il valore predefinito è 1000.
  • batchSizeBytes: le dimensioni del batch in numero di byte. Il valore predefinito è 5242880 (5 MB).
  • maxRetryAttempts: il numero massimo di nuovi tentativi. Deve essere maggiore di zero. Il valore predefinito è no retries.
  • maxRetryDuration: la durata massima dei nuovi tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è no retries.
  • propertyAsIndex: la proprietà del documento sottoposto a indicizzazione il cui valore specifica i metadati _index da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _index. Il valore predefinito è none.
  • javaScriptIndexFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente per una funzione che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIndexFnName: il nome della funzione JavaScript UDF che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • propertyAsId: una proprietà del documento sottoposto a indicizzazione il cui valore specifica i metadati _id da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _id. Il valore predefinito è none.
  • javaScriptIdFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript UDF per la funzione che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIdFnName: il nome della funzione JavaScript UDF che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptTypeFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente per una funzione che specifica i metadati _type da includere con i documenti nelle richieste collettive. Il valore predefinito è none.
  • javaScriptTypeFnName: il nome della funzione JavaScript UDF che specifica i metadati _type da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIsDeleteFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa true o false. Il valore predefinito è none.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript UDF che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa true o false. Il valore predefinito è none.
  • usePartialUpdate: indica se utilizzare aggiornamenti parziali (aggiornamento anziché creazione o indicizzazione, consentendo documenti parziali) con le richieste Elasticsearch. Il valore predefinito è false.
  • bulkInsertMethod: indica se utilizzare INDEX (indice, consente gli upsert) o CREATE (crea, errori su _id duplicati) con le richieste collettive di Elasticsearch. Il valore predefinito è CREATE.
  • trustSelfSignedCerts: indica se il certificato autofirmato deve essere considerato attendibile o meno. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Imposta questa opzione su true per bypassare la convalida del certificato SSL. (il valore predefinito è false).
  • disableCertificateValidation: se true, considera attendibile il certificato SSL autofirmato. Un'istanza Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro su true. Il valore predefinito è false.
  • apiKeyKMSEncryptionKey: la chiave Cloud KMS per decriptare la chiave API. Questo parametro è obbligatorio se apiKeySource è impostato su KMS. Se viene fornito questo parametro, passa una stringa apiKey criptata. Crittografa i parametri utilizzando l'endpoint di crittografia dell'API KMS. Per la chiave, utilizza il formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Ad esempio, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: l'ID secret di Secret Manager per l'apiKey. Se apiKeySource è impostato su SECRET_MANAGER, fornisci questo parametro. Utilizza il formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: l'origine della chiave API. I valori consentiti sono PLAINTEXT, KMS e SECRET_MANAGER. Questo parametro è obbligatorio quando utilizzi Secret Manager o KMS. Se apiKeySource è impostato su KMS, devono essere forniti apiKeyKMSEncryptionKey e l'apiKey criptato. Se apiKeySource è impostato su SECRET_MANAGER, deve essere fornito apiKeySecretId. Se apiKeySource è impostato su PLAINTEXT, deve essere fornito apiKey. Valore predefinito: PLAINTEXT.
  • socketTimeout: se impostato, sovrascrive il timeout massimo per i tentativi e il timeout della socket predefiniti (30000 ms) in Elastic RestClient.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente (UDF) in diversi punti della pipeline, descritti di seguito. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Funzione di trasformazione del testo

Trasforma i dati CSV in un documento Elasticsearch.

Parametri del modello:

  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.
  • javascriptTextTransformFunctionName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: una singola riga di un file CSV di input.
  • Output: un documento JSON con stringa da inserire in Elasticsearch.

Funzione di indice

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione dei documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisce la stringa "true" per eliminare il documento o "false" per eseguire l'upsert del documento.

Funzione di tipo di mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Storage to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_FILE_SPEC: il pattern dei file di Cloud Storage.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la chiave API codificata in base64 per l'autenticazione.
  • INDEX: l'indice Elasticsearch.
  • DEADLETTER_TABLE: la tua tabella BigQuery.

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_FILE_SPEC: il pattern dei file di Cloud Storage.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la chiave API codificata in base64 per l'autenticazione.
  • INDEX: l'indice Elasticsearch.
  • DEADLETTER_TABLE: la tua tabella BigQuery.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.GCSToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.CsvConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link GCSToElasticsearch} pipeline exports data from one or more CSV files in Cloud Storage
 * to Elasticsearch.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_GCS_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_to_Elasticsearch",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch",
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Javascript user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      flexContainerName = "gcs-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      }),
  @Template(
      name = "GCS_to_Elasticsearch_Xlang",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch with Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Python user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      flexContainerName = "gcs-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      })
})
public class GCSToElasticsearch {

  /** The tag for the headers of the CSV if required. */
  static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the dead-letter output of the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(GCSToElasticsearch.class);

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    GCSToElasticsearchOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(GCSToElasticsearchOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  private static PipelineResult run(GCSToElasticsearchOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    // Throw error if containsHeaders is true and a schema or Udf is also set.
    if (options.getContainsHeaders()) {
      checkArgument(
          options.getJavascriptTextTransformGcsPath() == null
              && options.getJsonSchemaPath() == null
              && options.getPythonExternalTextTransformGcsPath() == null,
          "Cannot parse file containing headers with UDF or Json schema.");
    }

    // Throw error if only one retry configuration parameter is set.
    checkArgument(
        (options.getMaxRetryAttempts() == null && options.getMaxRetryDuration() == null)
            || (options.getMaxRetryAttempts() != null && options.getMaxRetryDuration() != null),
        "To specify retry configuration both max attempts and max duration must be set.");

    // Throw error if both Javascript UDF and Python UDF are set. We can only apply one or the
    // other.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps: 1) Read records from CSV(s) via {@link CsvConverters.ReadCsv}.
     *        2) Convert lines to JSON strings via {@link CsvConverters.LineToFailsafeJson}.
     *        3a) Write JSON strings as documents to Elasticsearch via {@link ElasticsearchIO}.
     *        3b) Write elements that failed processing to {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
     */
    PCollectionTuple readCsvLines =
        pipeline
            /*
             * Step 1: Read CSV file(s) from Cloud Storage using {@link CsvConverters.ReadCsv}.
             */
            .apply(
            "ReadCsv",
            CsvConverters.ReadCsv.newBuilder()
                .setCsvFormat(options.getCsvFormat())
                .setDelimiter(options.getDelimiter())
                .setHasHeaders(options.getContainsHeaders())
                .setInputFileSpec(options.getInputFileSpec())
                .setHeaderTag(CSV_HEADERS)
                .setLineTag(CSV_LINES)
                .setFileEncoding(options.getCsvFileEncoding())
                .build());
    /*
     * Step 2: Convert lines to Elasticsearch document.
     */
    CsvConverters.LineToFailsafeJson.Builder lineToFailsafeJsonBuilder =
        CsvConverters.LineToFailsafeJson.newBuilder()
            .setDelimiter(options.getDelimiter())
            .setJsonSchemaPath(options.getJsonSchemaPath())
            .setHeaderTag(CSV_HEADERS)
            .setLineTag(CSV_LINES)
            .setUdfOutputTag(PROCESSING_OUT)
            .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT);
    if (options.getPythonExternalTextTransformGcsPath() != null) {
      lineToFailsafeJsonBuilder
          .setPythonUdfFileSystemPath(options.getPythonExternalTextTransformGcsPath())
          .setPythonUdfFunctionName(options.getPythonExternalTextTransformFunctionName());
    } else {
      lineToFailsafeJsonBuilder
          .setJavascriptUdfFileSystemPath(options.getJavascriptTextTransformGcsPath())
          .setJavascriptUdfFunctionName(options.getJavascriptTextTransformFunctionName());
    }
    PCollectionTuple convertedCsvLines =
        readCsvLines.apply("ConvertLine", lineToFailsafeJsonBuilder.build());
    /*
     * Step 3a: Write elements that were successfully processed to Elasticsearch using {@link WriteToElasticsearch}.
     */
    convertedCsvLines
        .get(PROCESSING_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-gcs-to-elasticsearch-template/v2")
                .setOptions(options.as(GCSToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
     */
    convertedCsvLines
        .get(PROCESSING_DEADLETTER_OUT)
        .apply(
            "AddTimestamps",
            WithTimestamps.of((FailsafeElement<String, String> failures) -> new Instant()))
        .apply(
            "WriteFailedElementsToBigQuery",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getDeadletterTable())
                .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                .build());

    return pipeline.run();
  }
}

Passaggi successivi