Plantilla de Pub/Sub a Elasticsearch

La plantilla de Pub/Sub a Elasticsearch es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub, ejecuta una función definida por el usuario (UDF) y los escribe en Elasticsearch como documentos. La plantilla de Dataflow usa la función de flujos de datos de Elasticsearch para almacenar datos de series temporales en varios índices y, al mismo tiempo, entregarte un solo recurso con nombre para solicitudes. Los flujos de datos son adecuados para los registros, las métricas, los seguimientos y otros datos generados de forma continua que se almacenan en Pub/Sub.

La plantilla crea un flujo de datos llamado logs-gcp.DATASET-NAMESPACE, en el que:

  • DATASET es el valor del parámetro de plantilla dataset, o pubsub si no se especifica.
  • NAMESPACE es el valor del parámetro de plantilla namespace, o default si no se especifica.

Requisitos de la canalización

  • La suscripción a Pub/Sub debe existir, y los mensajes deben estar codificados en un formato JSON válido.
  • Un host de Elasticsearch accesible de forma pública en una instancia de Google Cloud o en Elastic Cloud con Elasticsearch versión 7.0 o posterior. Consulta Integración de Google Cloud para Elastic si deseas obtener más detalles.
  • Un tema de Pub/Sub para resultados de error.

Parámetros de la plantilla

Parámetros obligatorios

Parámetros opcionales

  • conjunto de datos: El tipo de registros enviados usando Pub/Sub para los que tenemos un panel listo para usar. Los valores de tipos de registros conocidos son audit, vpcflow y firewall. El valor predeterminado es “pubsub”.
  • Espacio de nombres Una agrupación arbitraria, como un entorno (dev, prod, or qa), un equipo o una unidad de negocios estratégica. El valor predeterminado es "default".
  • elasticsearchTemplateVersion : Identificador de versión de la plantilla de Dataflow, por lo general, definido por Google Cloud. La configuración predeterminada es 1.0.0.
  • javascriptTextTransformGcsPath: el patrón de ruta de acceso de Cloud Storage para el código JavaScript que contiene las funciones definidas por el usuario. (Por ejemplo: gs://your-bucket/your-function.js).
  • javascriptTextTransformFunctionName : El nombre de la función a la que se llamará desde el archivo JavaScript. Usa solo letras, dígitos y guiones bajos. (Ejemplo: “transform” o “transform_udf1”).
  • javascriptTextTransformReloadIntervalMinutes: Define el intervalo que los trabajadores pueden verificar para detectar cambios en la UDF de JavaScript a fin de volver a cargar los archivos. La configuración predeterminada es 0.
  • elasticsearchUsername: El nombre de usuario de Elasticsearch con el que se autenticará. Si se especifica, se ignora el valor de “apiKey”.
  • elasticsearchPassword: La contraseña de Elasticsearch con la que se realizará la autenticación. Si se especifica, se ignora el valor de “apiKey”.
  • batchSize: El tamaño del lote en cantidad de documentos. Valor predeterminado: "1000".
  • batchSizeBytes: Tamaño del lote en bytes que se usa para la inserción por lotes de mensajes en elasticsearch. Valor predeterminado: '5242880 (5mb)'.
  • maxRetryAttempts: La cantidad máxima de reintentos debe ser mayor que 0. Valor predeterminado: "'no retries".
  • maxRetryDuration: La duración máxima del reintento en milisegundos debe ser superior a 0. Valor predeterminado: "'no retries".
  • propertyAsIndex: Una propiedad del documento que se indexa, cuyo valor especificará los metadatos "_index" para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF "_index"). Valor predeterminado: none.
  • javaScriptIndexFnGcsPath: La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos “_index” que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
  • javaScriptIndexFnName: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
  • propertyAsId: Una propiedad del documento que se indexa, cuyo valor especificará los metadatos "_id" para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF "_id"). Valor predeterminado: none.
  • javaScriptIdFnGcsPath: La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos “_id” que se incluirán en el documento en la solicitud masiva. Predeterminado: ninguno.
  • javaScriptIdFnName: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
  • javaScriptTypeFnGcsPath: La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos a fin de incluir el documento en la solicitud masiva. Valor predeterminado: none.
  • javaScriptTypeFnName: Nombre de función de JavaScript de la UDF para la función que especificará los metadatos “_type” que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
  • javaScriptIsDeleteFnGcsPath: La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para la función que determinará si el documento se debe borrar en lugar de insertar o actualizar. La función debe mostrar el valor de string "true" o "false". Valor predeterminado: none.
  • javaScriptIsDeleteFnName: El nombre de función de JavaScript de la UDF para la función que determinará si el documento se debe borrar en lugar de insertarse o actualizarse. La función debe mostrar el valor de string "true" o "false". Valor predeterminado: none.
  • usePartialUpdate: Indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar), que permite documentos parciales con solicitudes de Elasticsearch. Predeterminado: "false".
  • bulkInsertMethod : Indica si se debe usar “INDEX” (índice, permite inserción) o “CREATE” (creación, errores en duplicate _id) con solicitudes masivas de Elasticsearch. Configuración predeterminada: “CREATE”.
  • trustSelfSignedCerts : Indica si se debe confiar o no en el certificado autofirmado. Es posible que una instancia de Elasticsearch instalada tenga un certificado autofirmado. Habilítalo como verdadero para omitir la validación en el certificado SSL. (el valor predeterminado es False).
  • disableCertificateValidation : si es “true”, confía en el certificado SSL autofirmado. Una instancia de Elasticsearch puede tener un certificado SSL autofirmado. Para omitir la validación del certificado, establece este parámetro como “true”. Valor predeterminado: false.
  • apiKeyKMSEncryptionKey: La clave de Cloud KMS para desencriptar la clave de API. Este parámetro se debe proporcionar si apiKeySource se configura como KMS. Si se proporciona este parámetro, la cadena apiKey debe pasarse encriptada. Encripta parámetros con el extremo de encriptación de la API de KMS. La clave debe tener el formato proyectos/{proyecto_gcp}/ubicaciones/{región-de-la-clave}/llaveros-de-clave/{llavero-de-claves}/cryptoKeys/{nombre_de_la_clave_kms}. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Ejemplo: proyectos/id-de-tu-proyecto/ubicaciones/global/llaveros-de-claves/tu-llavero-de-claves/cryptoKeys/nombre-de-tu-clave).
  • apiKeySecretId : El ID del secreto de Secret Manager para la apiKey. Este parámetro se debe proporcionar si apiKeySource está configurado como SECRET_MANAGER. Debe tener el formato proyectos/{proyecto}/secrets/{secret}/versiones/{versión-del-secret}. (Ejemplo: proyectos/id-de-tu-proyecto/secrets/tu-secret/versiones/versión-de-tu-secret).
  • apiKeySource : Fuente de la clave de API. Puede ser PLAINTEXT, KMS o SECRET_MANAGER. Este parámetro se debe proporcionar si se usa Secret Manager o KMS. Si apiKeySource se configura como KMS, se deben proporcionar apiKeyKMSEncryptionKey y la apiKey encriptada. Si apiKeySource se configura como SECRET_MANAGER, se debe proporcionar apiKeySecretId. Si apiKeySource se configura como PLAINTEXT, se debe proporcionar apiKey. La configuración predeterminada es: PLAINTEXT.

Funciones definidas por el usuario

Esta plantilla admite funciones definidas por el usuario (UDF) en varios puntos de la canalización, que se describen a continuación. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Función de transformación de texto

Transforma el mensaje de Pub/Sub en un documento de Elasticsearch.

Parámetros de la plantilla:

  • javascriptTextTransformGcsPath: Es el URI de Cloud Storage del archivo JavaScript.
  • javascriptTextTransformFunctionName: Es el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una string JSON
  • Resultado: Un documento JSON en string para insertar en Elasticsearch.

Función de índice

Muestra el índice al que pertenece el documento.

Parámetros de la plantilla:

  • javaScriptIndexFnGcsPath: Es el URI de Cloud Storage del archivo JavaScript.
  • javaScriptIndexFnName: Es el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una string JSON.
  • Resultado: El valor del campo de metadatos _index del documento.

Función de ID de documento

Muestra el ID del documento.

Parámetros de la plantilla:

  • javaScriptIdFnGcsPath: Es el URI de Cloud Storage del archivo JavaScript.
  • javaScriptIdFnName: Es el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Resultado: El valor del campo de metadatos _id del documento.

Función de eliminación de documentos

Especifica si se debe borrar un documento. Para usar esta función, establece el modo de inserción masiva en INDEX y proporciona una función de ID de documento.

Parámetros de la plantilla:

  • javaScriptIsDeleteFnGcsPath: Es el URI de Cloud Storage del archivo JavaScript.
  • javaScriptIsDeleteFnName: Es el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Resultado: Muestra la cadena "true" para borrar el documento o "false" a fin de actualizar el documento.

Función de tipo de asignación

Muestra el tipo de asignación del documento.

Parámetros de la plantilla:

  • javaScriptTypeFnGcsPath: Es el URI de Cloud Storage del archivo JavaScript.
  • javaScriptTypeFnName: Es el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Resultado: El valor del campo de metadatos _type del documento.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Elasticsearch template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • ERROR_OUTPUT_TOPIC: Es tu tema de Pub/Sub para resultados de error
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • CONNECTION_URL: Es tu URL de Elasticsearch
  • DATASET: Es tu tipo de registro
  • NAMESPACE: Es el espacio de nombres para el conjunto de datos
  • APIKEY: Es tu clave de API codificada en Base64 para la autenticación

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • ERROR_OUTPUT_TOPIC: Es tu tema de Pub/Sub para resultados de error
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • CONNECTION_URL: Es tu URL de Elasticsearch
  • DATASET: Es tu tipo de registro
  • NAMESPACE: Es el espacio de nombres para el conjunto de datos
  • APIKEY: Es tu clave de API codificada en Base64 para la autenticación

¿Qué sigue?