Modello Pub/Sub a Elasticsearch

Il modello Pub/Sub a Elasticsearch è una pipeline di flusso che legge i messaggi da una sottoscrizione Pub/Sub, esegue una funzione definita dall'utente (UDF) e li scrive in Elasticsearch sotto forma di documenti. Il modello Dataflow utilizza la funzionalità degli stream di dati di Elasticsearch per memorizzare i dati delle serie temporali in più indici, fornendo al contempo una singola risorsa denominata per le richieste. I flussi di dati sono adatti per log, metriche, tracce e altri dati generati continuamente archiviati in Pub/Sub.

Il modello crea un flusso di dati denominato logs-gcp.DATASET-NAMESPACE, dove:

  • DATASET è il valore del parametro del modello dataset o pubsub se non specificato.
  • NAMESPACE è il valore del parametro del modello namespace o default se non specificato.

Requisiti della pipeline

  • Deve esistere la sottoscrizione Pub/Sub di origine e i messaggi devono essere codificati in un formato JSON valido.
  • Un host Elasticsearch raggiungibile pubblicamente su un' Google Cloud istanza o su Elastic Cloud con Elasticsearch versione 7.0 o successive. Per ulteriori dettagli, consulta Integrazione di Google Cloud per Elastic.
  • Un argomento Pub/Sub per l'output degli errori.

Parametri del modello

Parametri obbligatori

  • inputSubscription: l'abbonamento Pub/Sub da cui consumare l'input. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • errorOutputTopic: l'argomento di output Pub/Sub per la pubblicazione dei record non riusciti, nel formato di projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • connectionUrl: l'URL di Elasticsearch nel formato https://hostname:[port]. Se utilizzi Elastic Cloud, specifica il CloudID. Ad esempio: https://elasticsearch-host:9200.
  • apiKey: la chiave API codificata in Base64 da utilizzare per l'autenticazione.

Parametri facoltativi

  • set di dati: il tipo di log inviati utilizzando Pub/Sub, per i quali è disponibile una dashboard pronta all'uso. I valori dei tipi di log noti sono audit, vpcflow e firewall. Il valore predefinito è pubsub.
  • spazio dei nomi: un raggruppamento arbitrario, ad esempio un ambiente (dev, prod o qa), un team o un'unità aziendale strategica. Il valore predefinito è default.
  • elasticsearchTemplateVersion: identificatore della versione del modello Dataflow, in genere definito da Google Cloud. Valore predefinito: 1.0.0.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle funzioni definite dall'utente è disattivato. Il valore predefinito è 0.
  • elasticsearchUsername: il nome utente di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di apiKey viene ignorato.
  • elasticsearchPassword: la password di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di apiKey viene ignorato.
  • batchSize: le dimensioni del batch in numero di documenti. Il valore predefinito è 1000.
  • batchSizeBytes: le dimensioni del batch in numero di byte. Il valore predefinito è 5242880 (5 MB).
  • maxRetryAttempts: il numero massimo di nuovi tentativi. Deve essere maggiore di zero. Il valore predefinito è no retries.
  • maxRetryDuration: la durata massima dei nuovi tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è no retries.
  • propertyAsIndex: la proprietà del documento sottoposto a indicizzazione il cui valore specifica i metadati _index da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _index. Il valore predefinito è none.
  • javaScriptIndexFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente per una funzione che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIndexFnName: il nome della funzione JavaScript UDF che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • propertyAsId: una proprietà del documento sottoposto a indicizzazione il cui valore specifica i metadati _id da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _id. Il valore predefinito è none.
  • javaScriptIdFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript UDF per la funzione che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIdFnName: il nome della funzione JavaScript UDF che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptTypeFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente per una funzione che specifica i metadati _type da includere con i documenti nelle richieste collettive. Il valore predefinito è none.
  • javaScriptTypeFnName: il nome della funzione JavaScript UDF che specifica i metadati _type da includere con il documento nelle richieste collettive. Il valore predefinito è none.
  • javaScriptIsDeleteFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa true o false. Il valore predefinito è none.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript UDF che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa true o false. Il valore predefinito è none.
  • usePartialUpdate: indica se utilizzare aggiornamenti parziali (aggiornamento anziché creazione o indicizzazione, consentendo documenti parziali) con le richieste Elasticsearch. Il valore predefinito è false.
  • bulkInsertMethod: indica se utilizzare INDEX (indice, consente gli upsert) o CREATE (crea, errori su _id duplicati) con le richieste collettive di Elasticsearch. Il valore predefinito è CREATE.
  • trustSelfSignedCerts: indica se il certificato autofirmato deve essere considerato attendibile o meno. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Imposta questa opzione su true per bypassare la convalida del certificato SSL. (il valore predefinito è false).
  • disableCertificateValidation: se true, considera attendibile il certificato SSL autofirmato. Un'istanza Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro su true. Il valore predefinito è false.
  • apiKeyKMSEncryptionKey: la chiave Cloud KMS per decriptare la chiave API. Questo parametro è obbligatorio se apiKeySource è impostato su KMS. Se viene fornito questo parametro, passa una stringa apiKey criptata. Crittografa i parametri utilizzando l'endpoint di crittografia dell'API KMS. Per la chiave, utilizza il formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Ad esempio, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: l'ID secret di Secret Manager per l'apiKey. Se apiKeySource è impostato su SECRET_MANAGER, fornisci questo parametro. Utilizza il formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: l'origine della chiave API. I valori consentiti sono PLAINTEXT, KMS o SECRET_MANAGER. Questo parametro è obbligatorio quando utilizzi Secret Manager o KMS. Se apiKeySource è impostato su KMS, devono essere forniti apiKeyKMSEncryptionKey e l'apiKey criptato. Se apiKeySource è impostato su SECRET_MANAGER, deve essere fornito apiKeySecretId. Se apiKeySource è impostato su PLAINTEXT, deve essere fornito apiKey. Valore predefinito: PLAINTEXT.
  • socketTimeout: se impostato, sovrascrive il timeout massimo per i tentativi e il timeout della socket predefiniti (30000 ms) in Elastic RestClient.

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente (UDF) in diversi punti della pipeline, descritti di seguito. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Funzione di trasformazione del testo

Trasforma il messaggio Pub/Sub in un documento Elasticsearch.

Parametri del modello:

  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.
  • javascriptTextTransformFunctionName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il campo dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: un documento JSON con stringa da inserire in Elasticsearch.

Funzione di indice

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione dei documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisce la stringa "true" per eliminare il documento o "false" per eseguire l'upsert del documento.

Funzione di tipo di mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • ERROR_OUTPUT_TOPIC: l'argomento Pub/Sub per l'output degli errori
  • SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub
  • CONNECTION_URL: il tuo URL Elasticsearch
  • DATASET: il tipo di log
  • NAMESPACE: il tuo spazio dei nomi per il set di dati
  • APIKEY: la tua chiave API codificata in base64 per l'autenticazione

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • ERROR_OUTPUT_TOPIC: l'argomento Pub/Sub per l'output degli errori
  • SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub
  • CONNECTION_URL: il tuo URL Elasticsearch
  • DATASET: il tipo di log
  • NAMESPACE: il tuo spazio dei nomi per il set di dati
  • APIKEY: la tua chiave API codificata in base64 per l'autenticazione
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.PubSubToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.FailedPubsubMessageToPubsubTopicFn;
import com.google.cloud.teleport.v2.elasticsearch.transforms.ProcessEventMetadata;
import com.google.cloud.teleport.v2.elasticsearch.transforms.PubSubMessageToJsonDocument;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIndex;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToElasticsearch} pipeline is a streaming pipeline which ingests data in JSON
 * format from PubSub, applies a Javascript UDF if provided and writes the resulting records to
 * Elasticsearch. If the element fails to be processed then it is written to an error output table
 * in BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_PubSub_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "PubSub_to_Elasticsearch_Flex",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to Elasticsearch",
      description = {
        "The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. "
            + "The Dataflow template uses Elasticsearch's <a href=\"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html\">data streams</a> feature to store time series data across multiple indices while giving you a single named resource for requests. "
            + "Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.\n",
        "The template creates a datastream named <code>logs-gcp.DATASET-NAMESPACE</code>, where:\n"
            + "- <code>DATASET</code> is the value of the <code>dataset</code> template parameter, or <code>pubsub</code> if not specified.\n"
            + "- <code>NAMESPACE</code> is the value of the <code>namespace</code> template parameter, or <code>default</code> if not specified."
      },
      optionsClass = PubSubToElasticsearchOptions.class,
      skipOptions = {
        "index",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName",
      }, // Template just ignores what is sent as "index"
      flexContainerName = "pubsub-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.",
        "A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#google-cloud-integration-for-elastic\">Google Cloud Integration for Elastic</a> for more details.",
        "A Pub/Sub topic for error output.",
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "PubSub_to_Elasticsearch_Xlang",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to Elasticsearch With Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a Python user-defined function (UDF), and writes them to Elasticsearch as documents. "
            + "The Dataflow template uses Elasticsearch's <a href=\"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html\">data streams</a> feature to store time series data across multiple indices while giving you a single named resource for requests. "
            + "Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.\n",
        "The template creates a datastream named <code>logs-gcp.DATASET-NAMESPACE</code>, where:\n"
            + "- <code>DATASET</code> is the value of the <code>dataset</code> template parameter, or <code>pubsub</code> if not specified.\n"
            + "- <code>NAMESPACE</code> is the value of the <code>namespace</code> template parameter, or <code>default</code> if not specified."
      },
      optionsClass = PubSubToElasticsearchOptions.class,
      skipOptions = {
        "index",
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      }, // Template just ignores what is sent as "index" and javascript udf as this is for python
      // udf only.
      flexContainerName = "pubsub-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.",
        "A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#google-cloud-integration-for-elastic\">Google Cloud Integration for Elastic</a> for more details.",
        "A Pub/Sub topic for error output.",
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public class PubSubToElasticsearch {

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** The tag for the error output table of the json to table row transform. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_ERROR_OUTPUT_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** Pubsub message/string coder for pipeline. */
  public static final FailsafeElementCoder<PubsubMessage, String> CODER =
      FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToElasticsearch.class);

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line.
    PubSubToElasticsearchOptions pubSubToElasticsearchOptions =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PubSubToElasticsearchOptions.class);

    pubSubToElasticsearchOptions.setIndex(
        new ElasticsearchIndex(
                pubSubToElasticsearchOptions.getDataset(),
                pubSubToElasticsearchOptions.getNamespace())
            .getIndex());

    validateOptions(pubSubToElasticsearchOptions);
    run(pubSubToElasticsearchOptions);
  }

  public static void validateOptions(PubSubToElasticsearchOptions options) {
    switch (options.getApiKeySource()) {
      case "PLAINTEXT":
        return;
      case "KMS":
        // validate that the encryption key is provided.
        if (StringUtils.isEmpty(options.getApiKeyKMSEncryptionKey())) {
          throw new IllegalArgumentException(
              "If apiKeySource is set to KMS, apiKeyKMSEncryptionKey should be provided.");
        }
        return;
      case "SECRET_MANAGER":
        // validate that secretId is provided.
        if (StringUtils.isEmpty(options.getApiKeySecretId())) {
          throw new IllegalArgumentException(
              "If apiKeySource is set to SECRET_MANAGER, apiKeySecretId should be provided.");
        }
    }
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(PubSubToElasticsearchOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();

    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);

    /*
     * Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
     *        2) Apply Javascript UDF if provided.
     *        3) Index Json string to output ES index.
     *
     */
    LOG.info("Reading from subscription: " + options.getInputSubscription());

    PCollectionTuple convertedPubsubMessages =
        pipeline
            /*
             * Step #1: Read from a PubSub subscription.
             */
            .apply(
                "ReadPubSubSubscription",
                PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(options.getInputSubscription()))
            /*
             * Step #2: Transform the PubsubMessages into Json documents.
             */
            .apply(
                "ConvertMessageToJsonDocument",
                PubSubMessageToJsonDocument.newBuilder()
                    .setJavascriptTextTransformFunctionName(
                        options.getJavascriptTextTransformFunctionName())
                    .setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
                    .setPythonExternalTextTransformGcsPath(
                        options.getPythonExternalTextTransformGcsPath())
                    .setPythonExternalTextTransformFunctionName(
                        options.getPythonExternalTextTransformFunctionName())
                    .build());

    /*
     * Step #3a: Write Json documents into Elasticsearch using {@link ElasticsearchTransforms.WriteToElasticsearch}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply("Insert metadata", new ProcessEventMetadata())
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-pubsub-to-elasticsearch-template/v2")
                .setOptions(options.as(PubSubToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to error output PubSub topic via {@link PubSubIO}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_ERROR_OUTPUT_OUT)
        .apply(ParDo.of(new FailedPubsubMessageToPubsubTopicFn()))
        .apply("writeFailureMessages", PubsubIO.writeMessages().to(options.getErrorOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

Passaggi successivi