Modelo do Pub/Sub para Elasticsearch

O modelo do Pub/Sub para Elasticsearch é um pipeline de streaming que lê mensagens de uma assinatura do Pub/Sub, executa uma função definida pelo usuário (UDF, na sigla em inglês) e as grava no Elasticsearch como documentos. O modelo do Dataflow usa o recurso de fluxos de dados do Elasticsearch para armazenar dados de série temporal em vários índices, oferecendo um único recurso nomeado para solicitações. Esses fluxos são adequados para registros, métricas, rastros e outros dados gerados continuamente armazenados no Pub/Sub.

O modelo cria um fluxo de dados chamado logs-gcp.DATASET-NAMESPACE, em que:

  • DATASET é o valor do parâmetro de modelo dataset, ou pubsub se não for especificado.
  • NAMESPACE é o valor do parâmetro de modelo namespace, ou default se não for especificado.

Requisitos de pipeline

  • A assinatura do Pub/Sub de origem precisa existir e as mensagens precisam ser codificadas em um formato JSON válido.
  • Um host Elasticsearch acessível publicamente em uma instância do Google Cloud ou no Elastic Cloud com o Elasticsearch versão 7.0 ou mais recente. Consulte Integração do Google Cloud para Elastic para ver mais detalhes.
  • Um tópico do Pub/Sub para saída de erros.

Parâmetros do modelo

Parâmetros obrigatórios

Parâmetros opcionais

  • dataset: o tipo de registros enviados via Pub/Sub, para os quais temos um painel pronto para uso. Os valores de tipos de registro conhecidos são audit, vpcflow e firewall. "pubsub" padrão.
  • namespace: um agrupamento arbitrário, como um ambiente (dev, prod ou qa), uma equipe ou uma unidade de negócios estratégica. Padrão: 'default'.
  • elasticsearchTemplateVersion: identificador da versão do modelo do Dataflow, geralmente definido pelo Google Cloud. Padrão: 1.0.0.
  • javascriptTextTransformGcsPath: o padrão de caminho do Cloud Storage para o código JavaScript que contém as funções definidas pelo usuário. Exemplo: gs://your-bucket/your-function.js.
  • javascriptTextTransformFunctionName: o nome da função a ser chamada no arquivo JavaScript. Use apenas letras, dígitos e sublinhados. Exemplo: "transform" ou "transform_udf1".
  • javascriptTextTransformReloadIntervalMinutes: defina o intervalo que os workers podem verificar se há alterações na UDF em JavaScript para recarregar os arquivos. Padrão: 0.
  • elasticsearchUsername: o nome de usuário do Elasticsearch usado para autenticação. Se especificado, o valor de "apiKey" é ignorado.
  • elasticsearchPassword: a senha do Elasticsearch para autenticação. Se especificado, o valor de "apiKey" é ignorado.
  • batchSize: tamanho do lote em número de documentos. Padrão: '1000'.
  • batchSizeBytes: tamanho do lote em bytes usado para inserção em lote de mensagens no elasticsearch. Padrão: '5242880 (5mb)'.
  • maxRetryAttempts: máximo de tentativas de repetição. Precisa ser > 0. Padrão: 'no retries'.
  • maxRetryDuration: a duração máxima da nova tentativa em milissegundos precisa ser maior que 0. Padrão: 'no retries'.
  • propertyAsIndex: uma propriedade no documento que está sendo indexada com um valor que especificará os metadados "_index" a serem incluídos na solicitação de documento em massa (tem precedência sobre uma UDF "_index"). Padrão: none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript para uma função que especificará os metadados "_index" a serem incluídos no documento em massa. Padrão: none.
  • javaScriptIndexFnName: nome da função UDF em JavaScript para a função que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa.metadata to be included with document in bulk request. Padrão: none.
  • propertyAsId: uma propriedade no documento que está sendo indexada com um valor que especificará os metadados "_id" a serem incluídos na solicitação de documento em massa (tem precedência sobre uma UDF "_id"). Padrão: none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especificará os metadados "_id" a serem incluídos em massa no documento request.Default: none.
  • javaScriptIdFnName: nome da função UDF em JavaScript para a função que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa. Padrão: none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que vai especificar os metadados "_type" a serem incluídos no documento em uma solicitação em massa. Padrão: none.
  • javaScriptTypeFnName: nome da função JavaScript da UDF para a função que vai especificar os metadados "_type" a serem incluídos no documento em uma solicitação em massa. Padrão: none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem UDF em JavaScript de uma função que determina se o documento deve ser excluído em vez de inserido ou atualizado. A função precisa retornar o valor de string "true" ou "false". Padrão: none.
  • javaScriptIsDeleteFnName: nome da função UDF em JavaScript de uma função que vai determinar se o documento deve ser excluído em vez de inserido ou atualizado. A função precisa retornar o valor de string "true" ou "false". Padrão: none.
  • usePartialUpdate: indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. Padrão: 'false'.
  • bulkInsertMethod: usar "INDEX" (índice, permite upsert) ou "CREATE" (criar, erros em _id duplicado) com solicitações em massa do Elasticsearch. Padrão: "CREATE".
  • trustSelfSignedCerts: se é possível ou não confiar em um certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative essa opção como "True" para ignorar a validação no certificado SSL. O padrão é False.
  • disableCertificateValidation: se for "true", confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina esse parâmetro como "true". (Padrão: falso).
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para descriptografar a chave de API. Esse parâmetro precisará ser fornecido se o apiKeySource estiver definido como KMS. Se esse parâmetro for fornecido, a string apiKey deverá ser transmitida de maneira criptografada. Criptografe parâmetros usando o endpoint de criptografia da API KMS. A chave precisa estar no formato projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Exemplo: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId: o ID do secret do Secret Manager para a apiKey. Esse parâmetro deve ser fornecido se apiKeySource está definido como SECRET_MANAGER. Precisa estar no formato projects/{project}/secrets/{secret}/versions/{secret_version}. Exemplo: projects/your-project-id/secrets/your-secret/versions/your-secret-version.
  • apiKeySource: fonte da chave de API. Um de PLAINTEXT, KMS ou SECRET_MANAGER. Esse parâmetro precisará ser fornecido se o Secret Manager ou o KMS forem usados. Se apiKeySource estiver definido como KMS, será necessário fornecer apiKeyKMSEncryptionKey e apiKey criptografada. Se apiKeySource estiver definido como SECRET_MANAGER, será necessário fornecer o apiKeySecretId. Se apiKeySource estiver definido como PLAINTEXT, será necessário fornecer apiKey. O padrão é: PLAINTEXT.

Funções definidas pelo usuário

Esse modelo é compatível com funções definidas pelo usuário (UDFs) em vários pontos do pipeline, descritas abaixo. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Função de transformação de texto

Transforma a mensagem do Pub/Sub em um documento do Elasticsearch.

Parâmetros do modelo:

  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javascriptTextTransformFunctionName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: um documento JSON em formato de string para inserir no Elasticsearch.

Função de índice

Retorna o índice ao qual o documento pertence.

Parâmetros do modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _index do documento.

Função ID do documento

Retorna o ID do documento.

Parâmetros do modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _id do documento.

Função de exclusão de documentos

Especifica se um documento deve ser excluído. Para usar essa função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros do modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: retorna a string "true" para excluir o documento ou "false" para manter o documento.

Função do tipo de mapeamento

Retorna o tipo de mapeamento do documento.

Parâmetros do modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _type do documento.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • ERROR_OUTPUT_TOPIC: o tópico do Pub/Sub para saída de erros
  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • CONNECTION_URL: seu URL do Elasticsearch
  • DATASET: seu tipo de registro
  • NAMESPACE: seu namespace para conjunto de dados
  • APIKEY: sua chave de API codificada em base64 para autenticação

A seguir