La plantilla de Pub/Sub a Elasticsearch es un flujo de procesamiento en streaming que lee mensajes de una suscripción de Pub/Sub, ejecuta una función definida por el usuario (UDF) y los escribe en Elasticsearch como documentos. La plantilla de Dataflow usa la función flujos de datos de Elasticsearch para almacenar datos de series temporales en varios índices y, al mismo tiempo, te ofrece un único recurso con nombre para las solicitudes. Los flujos de datos son adecuados para registros, métricas, trazas y otros datos generados continuamente que se almacenan en Pub/Sub.
La plantilla crea un flujo de datos llamado logs-gcp.DATASET-NAMESPACE
, donde:
- DATASET es el valor del parámetro de plantilla
dataset
opubsub
si no se especifica. - NAMESPACE es el valor del parámetro de plantilla
namespace
odefault
si no se especifica.
Requisitos del flujo de procesamiento
- La suscripción de Pub/Sub de origen debe existir y los mensajes deben estar codificados en un formato JSON válido.
- Un host de Elasticsearch accesible públicamente en una instancia de Google Cloud Platform o en Elastic Cloud con Elasticsearch 7.0 o una versión posterior. Para obtener más información, consulta Integración de Google Cloud con Elastic.
- Un tema de Pub/Sub para la salida de errores.
Parámetros de plantilla
Parámetros obligatorios
- inputSubscription suscripción de Pub/Sub para consumir la entrada. Por ejemplo,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - errorOutputTopic tema de salida de Pub/Sub para publicar los registros fallidos, con el formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. - connectionUrl la URL de Elasticsearch en el formato
https://hostname:[port]
. Si usas Elastic Cloud, especifica el CloudID. Por ejemplo,https://elasticsearch-host:9200
. - apiKey la clave de API codificada en Base64 que se usará para la autenticación.
Parámetros opcionales
- dataset: el tipo de registros enviados mediante Pub/Sub para los que tenemos un panel de control preconfigurado. Los valores de los tipos de registro conocidos son
audit
,vpcflow
yfirewall
. El valor predeterminado espubsub
. - Espacio de nombres: agrupación arbitraria, como un entorno (desarrollo, producción o control de calidad), un equipo o una unidad de negocio estratégica. El valor predeterminado es
default
. - elasticsearchTemplateVersion identificador de la versión de la plantilla de Dataflow, normalmente definido por Google Cloud. El valor predeterminado es 1.0.0.
- javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes especifica la frecuencia con la que se vuelve a cargar la FDU, en minutos. Si el valor es superior a 0, Dataflow comprueba periódicamente el archivo de la función definida por el usuario en Cloud Storage y vuelve a cargar la función si se modifica el archivo. Este parámetro le permite actualizar la función definida por el usuario mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es
0
, se inhabilita la recarga de las funciones definidas por el usuario. El valor predeterminado es0
. - elasticsearchUsername: nombre de usuario de Elasticsearch para autenticarte. Si se especifica, se ignora el valor de
apiKey
. - elasticsearchPassword la contraseña de Elasticsearch para autenticarse. Si se especifica, se ignora el valor de
apiKey
. - batchSize el tamaño del lote en número de documentos. El valor predeterminado es
1000
. - batchSizeBytes tamaño del lote en número de bytes. El valor predeterminado es
5242880
(5 MB). - maxRetryAttempts número máximo de reintentos. Debe ser mayor que cero. El valor predeterminado es
no retries
. - maxRetryDuration la duración máxima de los reintentos en milisegundos. Debe ser mayor que cero. El valor predeterminado es
no retries
. - propertyAsIndex la propiedad del documento que se está indexando cuyo valor especifica los metadatos de
_index
que se incluirán con el documento en las solicitudes masivas. Tiene prioridad sobre una función definida por el usuario_index
. El valor predeterminado esnone
. - javaScriptIndexFnGcsPath la ruta de Cloud Storage al origen de la función UDF de JavaScript que especifica los metadatos de
_index
que se incluirán con el documento en las solicitudes masivas. El valor predeterminado esnone
. - javaScriptIndexFnName nombre de la función JavaScript de UDF que especifica los metadatos de
_index
que se incluirán con el documento en las solicitudes masivas. El valor predeterminado esnone
. - propertyAsId una propiedad del documento que se está indexando cuyo valor especifica los metadatos de
_id
que se incluirán con el documento en las solicitudes masivas. Tiene prioridad sobre una función definida por el usuario_id
. El valor predeterminado esnone
. - javaScriptIdFnGcsPath la ruta de Cloud Storage a la fuente de la función UDF de JavaScript que especifica los metadatos de
_id
que se incluirán con el documento en las solicitudes masivas. El valor predeterminado esnone
. - javaScriptIdFnName nombre de la función JavaScript de UDF que especifica los metadatos
_id
que se incluirán con el documento en las solicitudes masivas. El valor predeterminado esnone
. - javaScriptTypeFnGcsPath la ruta de Cloud Storage al origen de la función UDF de JavaScript de una función que especifica los metadatos de
_type
que se deben incluir con los documentos en las solicitudes masivas. El valor predeterminado esnone
. - javaScriptTypeFnName nombre de la función JavaScript de la UDF que especifica los metadatos de
_type
que se incluirán con el documento en las solicitudes masivas. El valor predeterminado esnone
. - javaScriptIsDeleteFnGcsPath la ruta de Cloud Storage al origen de la función UDF de JavaScript que determina si se debe eliminar el documento en lugar de insertarlo o actualizarlo. La función devuelve un valor de cadena de
true
ofalse
. El valor predeterminado esnone
. - javaScriptIsDeleteFnName el nombre de la función JavaScript de la FDU que determina si se debe eliminar el documento en lugar de insertarlo o actualizarlo. La función devuelve un valor de cadena de
true
ofalse
. El valor predeterminado esnone
. - usePartialUpdate indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar, lo que permite documentos parciales) con las solicitudes de Elasticsearch. El valor predeterminado es
false
. - bulkInsertMethod indica si se debe usar
INDEX
(índice, permite upserts) oCREATE
(crear, errores en _id duplicados) con solicitudes masivas de Elasticsearch. El valor predeterminado esCREATE
. - trustSelfSignedCerts indica si se debe confiar en los certificados autofirmados. Una instancia de Elasticsearch instalada puede tener un certificado autofirmado. Habilita esta opción para omitir la validación del certificado SSL. El valor predeterminado es
false
. - disableCertificateValidation si es
true
, confía en el certificado SSL autofirmado. Una instancia de Elasticsearch puede tener un certificado autofirmado. Para omitir la validación del certificado, asigna el valortrue
a este parámetro. El valor predeterminado esfalse
. - apiKeyKMSEncryptionKey la clave de Cloud KMS para descifrar la clave de API. Este parámetro es obligatorio si
apiKeySource
tiene el valorKMS
. Si se proporciona este parámetro, debe incluir una cadenaapiKey
cifrada. Encripta los parámetros con el endpoint de encriptación de la API KMS. Para la clave, usa el formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Consulta https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt. Por ejemplo,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId el ID de secreto de Secret Manager de la apiKey. Si
apiKeySource
tiene el valorSECRET_MANAGER
, proporcione este parámetro. Usa el formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource la fuente de la clave de API. Los valores permitidos son
PLAINTEXT
,KMS
ySECRET_MANAGER
. Este parámetro es obligatorio cuando se usa Secret Manager o KMS. SiapiKeySource
se define comoKMS
, se deben proporcionarapiKeyKMSEncryptionKey
y la clave de API cifrada. SiapiKeySource
se define comoSECRET_MANAGER
, se debe proporcionarapiKeySecretId
. SiapiKeySource
se define comoPLAINTEXT
, se debe proporcionarapiKey
. El valor predeterminado es PLAINTEXT. - socketTimeout si se define, sobrescribe el tiempo de espera máximo de reintento predeterminado y el tiempo de espera de socket predeterminado (30.000 ms) en Elastic RestClient.
Funciones definidas por el usuario
Esta plantilla admite funciones definidas por el usuario (UDF) en varios puntos del flujo de procesamiento, que se describen a continuación. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.
Función de transformación de texto
Transforma el mensaje de Pub/Sub en un documento de Elasticsearch.
Parámetros de plantilla:
javascriptTextTransformGcsPath
: el URI de Cloud Storage del archivo JavaScript.javascriptTextTransformFunctionName
: el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON.
- Salida: un documento JSON convertido en cadena para insertarlo en Elasticsearch.
Función de índice
Devuelve el índice al que pertenece el documento.
Parámetros de plantilla:
javaScriptIndexFnGcsPath
: el URI de Cloud Storage del archivo JavaScript.javaScriptIndexFnName
: el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Salida: el valor del campo de metadatos
_index
del documento.
Función de ID de documento
Devuelve el ID del documento.
Parámetros de plantilla:
javaScriptIdFnGcsPath
: el URI de Cloud Storage del archivo JavaScript.javaScriptIdFnName
: el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Salida: el valor del campo de metadatos
_id
del documento.
Función de eliminación de documentos
Especifica si se debe eliminar un documento. Para usar esta función, define el modo de inserción masiva en INDEX
y proporciona una función de ID de documento.
Parámetros de plantilla:
javaScriptIsDeleteFnGcsPath
: el URI de Cloud Storage del archivo JavaScript.javaScriptIsDeleteFnName
: el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Salida: devuelve la cadena
"true"
para eliminar el documento o"false"
para insertarlo o actualizarlo.
Función de tipo de asignación
Devuelve el tipo de asignación del documento.
Parámetros de plantilla:
javaScriptTypeFnGcsPath
: el URI de Cloud Storage del archivo JavaScript.javaScriptTypeFnName
: el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Salida: el valor del campo de metadatos
_type
del documento.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub to Elasticsearch template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasREGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: el tema de Pub/Sub para la salida de erroresSUBSCRIPTION_NAME
: nombre de tu suscripción de Pub/SubCONNECTION_URL
: la URL de ElasticsearchDATASET
: tu tipo de registroNAMESPACE
: el espacio de nombres del conjunto de datos.APIKEY
: tu clave de API codificada en Base64 para la autenticación
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasLOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: el tema de Pub/Sub para la salida de erroresSUBSCRIPTION_NAME
: nombre de tu suscripción de Pub/SubCONNECTION_URL
: la URL de ElasticsearchDATASET
: tu tipo de registroNAMESPACE
: el espacio de nombres del conjunto de datos.APIKEY
: tu clave de API codificada en Base64 para la autenticación
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.