Modelo do Pub/Sub para Elasticsearch

O modelo do Pub/Sub para Elasticsearch é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub, executa uma função definida pelo usuário (UDF, na sigla em inglês) e as grava no Elasticsearch como documentos. O modelo do Dataflow usa o recurso de fluxos de dados do Elasticsearch para armazenar dados de série temporal em vários índices, oferecendo um único recurso nomeado para solicitações. Esses fluxos são adequados para registros, métricas, rastros e outros dados gerados continuamente armazenados no Pub/Sub.

O modelo cria um fluxo de dados chamado logs-gcp.DATASET-NAMESPACE, em que:

  • DATASET é o valor do parâmetro de modelo dataset, ou pubsub se não for especificado.
  • NAMESPACE é o valor do parâmetro de modelo namespace, ou default se não for especificado.

Requisitos de pipeline

  • A assinatura do Pub/Sub de origem precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • Um host Elasticsearch acessível publicamente em uma instância do Google Cloud ou no Elastic Cloud com o Elasticsearch versão 7.0 ou mais recente. Consulte Integração do Google Cloud para Elastic para mais detalhes.
  • Um tópico do Pub/Sub para saída de erros.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputSubscription: assinatura do Pub/Sub da qual consumir a entrada. Por exemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • errorOutputTopic: o tópico de saída do Pub/Sub para publicar registros com falha no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • connectionUrl: o URL do Elasticsearch no formato https://hostname:[port]. Se estiver usando o Elastic Cloud, especifique o CloudID. Por exemplo, https://elasticsearch-host:9200.
  • apiKey: a chave de API codificada em Base64 que será usada para autenticação.

Parâmetros opcionais

  • dataset: o tipo de registros enviados via Pub/Sub, para os quais temos um painel pronto para uso. Os valores de tipos de registro conhecidos são audit, vpcflow e firewall. O padrão é pubsub.
  • namespace: um agrupamento arbitrário, como um ambiente (dev, prod ou qa), uma equipe ou uma unidade de negócios estratégica. O padrão é default.
  • elasticsearchTemplateVersion: identificador da versão do modelo do Dataflow, geralmente definido pelo Google Cloud. Padrão: 1.0.0.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos. Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for 0, o recarregamento da UDF será desativado. O valor padrão é 0.
  • elasticsearchUsername: o nome de usuário do Elasticsearch usado para autenticação. Se especificado, o valor de apiKey será ignorado.
  • elasticsearchPassword: a senha do Elasticsearch para autenticação. Se especificado, o valor de apiKey será ignorado.
  • batchSize: o tamanho do lote em número de documentos. O padrão é 1000.
  • batchSizeBytes: o tamanho do lote em número de bytes. O padrão é 5242880 (5 MB).
  • maxRetryAttempts: o número máximo de novas tentativas. Precisa ser maior que zero. O padrão é no retries.
  • maxRetryDuration: a duração máxima da nova tentativa em milissegundos. Precisa ser maior que zero. O padrão é no retries.
  • propertyAsIndex: uma propriedade no documento que está sendo indexada com um valor que especifica os metadados _index a serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF _index. O padrão é none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especifica os metadados _index a serem incluídos no documento em solicitações em massa. O padrão é none.
  • javaScriptIndexFnName: o nome da função JavaScript da UDF que especifica os metadados _index a serem incluídos no documento em solicitações em massa. O padrão é none.
  • propertyAsId: uma propriedade no documento que está sendo indexada com um valor que especifica metadados _id a serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF _id. O padrão é none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que especifica os metadados _id a serem incluídos no documento em solicitações em massa. O padrão é none.
  • javaScriptIdFnName: o nome da função JavaScript da UDF que especifica os metadados _id a serem incluídos com o documento em solicitações em massa. O padrão é none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especifica os metadados _type a serem incluídos com documentos em solicitações em massa. O padrão é none.
  • javaScriptTypeFnName: o nome da função JavaScript da UDF que especifica os metadados _type a serem incluídos com o documento em solicitações em massa. O padrão é none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que determina se o documento precisa ser excluído em vez de inserido ou atualizado. A função retorna um valor de string de true ou false. O padrão é none.
  • javaScriptIsDeleteFnName: o nome da função JavaScript da UDF que determina se é necessário excluir o documento em vez de inserir ou atualizar. A função retorna um valor de string de true ou false. O padrão é none.
  • usePartialUpdate: indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. O padrão é false.
  • bulkInsertMethod: indica se é necessário usar INDEX (índice, permite ajustes) ou CREATE (criar, erros em _id duplicados) com solicitações em massa do Elasticsearch. O padrão é CREATE.
  • trustSelfSignedCerts: se é possível ou não confiar em um certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative essa opção como "True" para ignorar a validação no certificado SSL. O padrão é false.
  • disableCertificateValidation: se for true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina esse parâmetro como true. O padrão é false.
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para descriptografar a chave de API. Este parâmetro será obrigatório se apiKeySource for definido como KMS. Se esse parâmetro for fornecido, transmita uma string apiKey criptografada. Criptografe parâmetros usando o endpoint de criptografia da API KMS. Para a chave, use o formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: o ID do secret do Secret Manager para a chave de API. Se apiKeySource estiver definido como SECRET_MANAGER, forneça esse parâmetro. Use o formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: a origem da chave de API. Os valores permitidos são PLAINTEXT, KMS e SECRET_MANAGER. Esse parâmetro é obrigatório quando você usa o Secret Manager ou o KMS. Se apiKeySource estiver definido como KMS, apiKeyKMSEncryptionKey e a chave apiKey criptografada precisar ser fornecida. Se apiKeySource estiver definido como SECRET_MANAGER, apiKeySecretId precisará ser fornecido. Se apiKeySource estiver definido como PLAINTEXT, apiKey precisará ser fornecido. O padrão é: PLAINTEXT.
  • socketTimeout: se definido, substitui o tempo limite máximo de novas tentativas e o tempo limite de soquete padrão (30000ms) no RestClient do Elastic.

Funções definidas pelo usuário

Esse modelo é compatível com funções definidas pelo usuário (UDFs) em vários pontos do pipeline, descritas abaixo. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Função de transformação de texto

Transforma a mensagem do Pub/Sub em um documento do Elasticsearch.

Parâmetros do modelo:

  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javascriptTextTransformFunctionName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: um documento JSON em formato de string para inserir no Elasticsearch.

Função de índice

Retorna o índice ao qual o documento pertence.

Parâmetros do modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _index do documento.

Função ID do documento

Retorna o ID do documento.

Parâmetros do modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _id do documento.

Função de exclusão de documentos

Especifica se um documento deve ser excluído. Para usar essa função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros do modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: retorna a string "true" para excluir o documento ou "false" para manter o documento.

Função do tipo de mapeamento

Retorna o tipo de mapeamento do documento.

Parâmetros do modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _type do documento.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.PubSubToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.FailedPubsubMessageToPubsubTopicFn;
import com.google.cloud.teleport.v2.elasticsearch.transforms.ProcessEventMetadata;
import com.google.cloud.teleport.v2.elasticsearch.transforms.PubSubMessageToJsonDocument;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIndex;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToElasticsearch} pipeline is a streaming pipeline which ingests data in JSON
 * format from PubSub, applies a Javascript UDF if provided and writes the resulting records to
 * Elasticsearch. If the element fails to be processed then it is written to an error output table
 * in BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_PubSub_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "PubSub_to_Elasticsearch_Flex",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to Elasticsearch",
      description = {
        "The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. "
            + "The Dataflow template uses Elasticsearch's <a href=\"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html\">data streams</a> feature to store time series data across multiple indices while giving you a single named resource for requests. "
            + "Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.\n",
        "The template creates a datastream named <code>logs-gcp.DATASET-NAMESPACE</code>, where:\n"
            + "- <code>DATASET</code> is the value of the <code>dataset</code> template parameter, or <code>pubsub</code> if not specified.\n"
            + "- <code>NAMESPACE</code> is the value of the <code>namespace</code> template parameter, or <code>default</code> if not specified."
      },
      optionsClass = PubSubToElasticsearchOptions.class,
      skipOptions = {
        "index",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName",
      }, // Template just ignores what is sent as "index"
      flexContainerName = "pubsub-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.",
        "A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#google-cloud-integration-for-elastic\">Google Cloud Integration for Elastic</a> for more details.",
        "A Pub/Sub topic for error output.",
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "PubSub_to_Elasticsearch_Xlang",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to Elasticsearch With Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a Python user-defined function (UDF), and writes them to Elasticsearch as documents. "
            + "The Dataflow template uses Elasticsearch's <a href=\"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html\">data streams</a> feature to store time series data across multiple indices while giving you a single named resource for requests. "
            + "Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.\n",
        "The template creates a datastream named <code>logs-gcp.DATASET-NAMESPACE</code>, where:\n"
            + "- <code>DATASET</code> is the value of the <code>dataset</code> template parameter, or <code>pubsub</code> if not specified.\n"
            + "- <code>NAMESPACE</code> is the value of the <code>namespace</code> template parameter, or <code>default</code> if not specified."
      },
      optionsClass = PubSubToElasticsearchOptions.class,
      skipOptions = {
        "index",
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      }, // Template just ignores what is sent as "index" and javascript udf as this is for python
      // udf only.
      flexContainerName = "pubsub-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.",
        "A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#google-cloud-integration-for-elastic\">Google Cloud Integration for Elastic</a> for more details.",
        "A Pub/Sub topic for error output.",
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public class PubSubToElasticsearch {

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** The tag for the error output table of the json to table row transform. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_ERROR_OUTPUT_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** Pubsub message/string coder for pipeline. */
  public static final FailsafeElementCoder<PubsubMessage, String> CODER =
      FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToElasticsearch.class);

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line.
    PubSubToElasticsearchOptions pubSubToElasticsearchOptions =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PubSubToElasticsearchOptions.class);

    pubSubToElasticsearchOptions.setIndex(
        new ElasticsearchIndex(
                pubSubToElasticsearchOptions.getDataset(),
                pubSubToElasticsearchOptions.getNamespace())
            .getIndex());

    validateOptions(pubSubToElasticsearchOptions);
    run(pubSubToElasticsearchOptions);
  }

  public static void validateOptions(PubSubToElasticsearchOptions options) {
    switch (options.getApiKeySource()) {
      case "PLAINTEXT":
        return;
      case "KMS":
        // validate that the encryption key is provided.
        if (StringUtils.isEmpty(options.getApiKeyKMSEncryptionKey())) {
          throw new IllegalArgumentException(
              "If apiKeySource is set to KMS, apiKeyKMSEncryptionKey should be provided.");
        }
        return;
      case "SECRET_MANAGER":
        // validate that secretId is provided.
        if (StringUtils.isEmpty(options.getApiKeySecretId())) {
          throw new IllegalArgumentException(
              "If apiKeySource is set to SECRET_MANAGER, apiKeySecretId should be provided.");
        }
    }
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(PubSubToElasticsearchOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();

    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);

    /*
     * Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
     *        2) Apply Javascript UDF if provided.
     *        3) Index Json string to output ES index.
     *
     */
    LOG.info("Reading from subscription: " + options.getInputSubscription());

    PCollectionTuple convertedPubsubMessages =
        pipeline
            /*
             * Step #1: Read from a PubSub subscription.
             */
            .apply(
                "ReadPubSubSubscription",
                PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(options.getInputSubscription()))
            /*
             * Step #2: Transform the PubsubMessages into Json documents.
             */
            .apply(
                "ConvertMessageToJsonDocument",
                PubSubMessageToJsonDocument.newBuilder()
                    .setJavascriptTextTransformFunctionName(
                        options.getJavascriptTextTransformFunctionName())
                    .setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
                    .setPythonExternalTextTransformGcsPath(
                        options.getPythonExternalTextTransformGcsPath())
                    .setPythonExternalTextTransformFunctionName(
                        options.getPythonExternalTextTransformFunctionName())
                    .build());

    /*
     * Step #3a: Write Json documents into Elasticsearch using {@link ElasticsearchTransforms.WriteToElasticsearch}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply("Insert metadata", new ProcessEventMetadata())
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-pubsub-to-elasticsearch-template/v2")
                .setOptions(options.as(PubSubToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to error output PubSub topic via {@link PubSubIO}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_ERROR_OUTPUT_OUT)
        .apply(ParDo.of(new FailedPubsubMessageToPubsubTopicFn()))
        .apply("writeFailureMessages", PubsubIO.writeMessages().to(options.getErrorOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

A seguir