Implementa exportaciones de registros listas para la producción en Splunk con Dataflow

En este instructivo, crearás un mecanismo de exportación de registros escalable y tolerante a errores con Cloud Logging, Pub/Sub y Dataflow.

Este instructivo está dirigido a los administradores que desean transmitir sus registros y eventos de recursos en Google Cloud en Sphere Enterprise o Splunk Cloud para casos de uso de seguridad u operaciones de TI. En este instructivo, se usa la plantilla de Splunk Dataflow proporcionada por Google para transmitir registros al colector de eventos HTTP (HEC) de Splunk de manera confiable y a gran escala. También se analiza la planificación de la capacidad de canalización de Dataflow y cómo manejar las posibles fallas de entrega cuando hay problemas transitorios de las redes o de los servidores.

En el instructivo, se da por sentado que una jerarquía de recursos de organización es similar al siguiente diagrama, que muestra un receptor agregado a nivel de la organización para exportar registros a Splunk. Crea la canalización de exportación de registros en un proyecto de ejemplo llamado Splunk Export Project, en el que los registros de todos los proyectos de Google Cloud en el nodo de la organización se recopilan, procesan y entregan de forma segura.

Receptor agregado de la organización para la exportación de registros a Splunk

Arquitectura

En el siguiente diagrama de arquitectura, se muestra el proceso de exportación de registros que compilas en este instructivo:

Exportación de registros a Splunk

  • Al comienzo del proceso, un receptor de registros a nivel de la organización enruta los registros a un solo tema y suscripción de Pub/Sub.
  • En el centro del proceso, la canalización principal de Dataflow es una canalización de transmisión de Pub/Sub a Splunk que extrae registros de la suscripción a Pub/Sub y los envía a Splunk.
  • De forma paralela a la canalización principal de Dataflow, la segunda canalización de Dataflow es una canalización de transmisión de Pub/Sub a Pub/Sub para volver a reproducir mensajes si falla una entrega.
  • Al final del proceso, el destino del registro es el extremo de HEC de Splunk Enterprise o Splunk Cloud.

Objetivos

  • Crear un receptor de registros agregado en un proyecto dedicado.
  • Planificar la capacidad de canalización de Splunk Dataflow para que coincida con la tasa de registros de tu organización.
  • Implementar la canalización de Splunk Dataflow para exportar los registros a Splunk.
  • Transformar registros o eventos en tránsito con funciones definidas por el usuario (UDF) dentro de la canalización de Splunk Dataflow.
  • Manejar fallas de entrega para evitar la pérdida de datos debido a posibles problemas de red temporales o configuraciones incorrectas.

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud sean aptos para obtener una prueba gratuita.

Cuando finalices este instructivo, podrás borrar los recursos creados para evitar que se te siga facturando. Para obtener más información, consulta cómo hacer una limpieza.

Antes de comenzar

  1. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  2. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  3. Habilita las API de Cloud Monitoring API, Cloud Key Management Service, Compute Engine, Pub/Sub, and Dataflow.

    Habilita las API

Obtén permisos de IAM

  1. En Cloud Console, comprueba si tienes los siguientes permisos de Identity and Access Management (IAM) en la organización y los recursos del proyecto. Para obtener más información, consulta Otorga, cambia y revoca el acceso a los recursos.
    Permisos Funciones predefinidas Recurso
    • logging.sinks.create
    • logging.sinks.get
    • logging.sinks.update
    • Escritor de configuración de registros (roles/logging.configWriter)
    organización
    • cloudkms.keyRings.create
    • cloudkms.cryptoKeys.*
    • Administrador de Cloud KMS (roles/cloudkms.admin)
    proyecto
    • compute.networks.*
    • compute.routers.*
    • compute.firewalls.*
    • networkservices.*
    • Administrador de red de Compute (roles/compute.networkAdmin)
    • Administrador de seguridad de Compute (roles/compute.securityAdmin)
    proyecto
  2. Si no tienes los permisos de IAM correctos, crea una función personalizada. Una función personalizada te dará el acceso que necesitas y te permitirá seguir el principio de menor privilegio.

Configura tu entorno

  1. En Cloud Console, activa Cloud Shell.

    Activar Cloud Shell

  2. En Cloud Shell, crea variables para los ID de tu proyecto y de la organización. Utilizarás estas variables durante todo el instructivo.

    export PROJECT_ID=project-id
    export ORG_ID=organization-id
    
    • project-id: el ID de tu proyecto
    • organization-id: El ID de tu organización.
  3. En este instructivo, crearás recursos en la región us-central1:

    export REGION_ID=us-central1
    
  4. Configura el proyecto para tu sesión activa de Cloud Shell:

    gcloud config set project $PROJECT_ID
    

Configura redes seguras

En este paso, configurarás herramientas de redes seguras antes de procesar y exportar registros a Splunk Enterprise.

  1. Crea una red y subred de VPC:

    gcloud compute networks create export-network --subnet-mode=custom
    gcloud compute networks subnets create export-network-us-central \
         --network=export-network \
         --region=$REGION_ID \
         --range=192.168.1.0/24
    
  2. Crea una regla de firewall para que las máquinas virtuales (VM) de trabajador de Dataflow se comuniquen entre sí:

    gcloud compute firewall-rules create allow-internal-dataflow \
         --network=export-network \
         --action=allow \
         --direction=ingress \
         --target-tags=dataflow \
         --source-tags=dataflow \
         --priority=0 \
         --rules=tcp:12345-12346
    

    Esta regla permite el tráfico interno entre las VM de Dataflow que usan los puertos TCP 12345-12346 y tienen la etiqueta dataflow configurada por el servicio de Dataflow.

  3. Crea una puerta de enlace de Cloud NAT:

    gcloud compute routers create nat-router \
           --network=export-network \
           --region=$REGION_ID
    
    gcloud compute routers nats create nat-config \
       --router=nat-router \
       --nat-custom-subnet-ip-ranges=export-network-us-central \
       --auto-allocate-nat-external-ips \
       --region=$REGION_ID
    

    Por motivos de seguridad, implementa las VM de trabajador de canalización de Dataflow sin direcciones IP públicas. Para permitir que las VM de trabajador de Dataflow lleguen al servicio externo de HEC de Splunk, con el siguiente comando, debes configurar una Cloud NAT asignada a la subred de las VM de Dataflow o, en este caso, export-network-us-central. Esta configuración permite que las VM de trabajador de Dataflow accedan a Internet y realicen solicitudes HTTPS a Splunk sin la necesidad de direcciones IP externas en cada VM de trabajador de Dataflow.

    La puerta de enlace de Cloud NAT asigna automáticamente direcciones IP según la cantidad de VM de Dataflow que estén en uso.

    Si deseas restringir el tráfico de HEC de Splunk a un subconjunto de direcciones IP conocidas, puedes reservar direcciones IP estáticas y asignarlas manualmente a la puerta de enlace de Cloud NAT. Sin embargo, esto está fuera del alcance de este instructivo.

    Para obtener más información, consulta la documentación de Direcciones IP de Cloud NAT y Reserva de puerto de Cloud NAT.

  4. Habilite el Acceso privado a Google

     gcloud compute networks subnets update export-network-us-central \
         --enable-private-ip-google-access \
         --region=$REGION_ID
    

    El Acceso privado a Google se habilita de forma automática cuando creas una puerta de enlace de Cloud NAT. Sin embargo, para permitir que los trabajadores de Dataflow con direcciones IP privadas accedan a las direcciones IP externas que usan las API y los servicios de Google Cloud, también debes habilitar de forma manual el Acceso privado a Google para la subred.

Crea un receptor de registros

En esta sección, crearás el receptor de registros en toda la organización y su destino de Pub/Sub, junto con los permisos necesarios.

  1. En Cloud Shell, crea un tema de Pub/Sub y una suscripción asociada como el destino del receptor de registros nuevo:

    gcloud pubsub topics create org-logs-all
    gcloud pubsub subscriptions create \
        --topic org-logs-all org-logs-all-sub
    
  2. Crea el receptor de registros de la organización:

    gcloud logging sinks create org-logs-all-sink \
      pubsub.googleapis.com/projects/$PROJECT_ID/topics/org-logs-all \
      --organization=$ORG_ID \
      --include-children \
      --log-filter='NOT logName:projects/$PROJECT_ID/logs/dataflow.googleapis.com'
    

    El comando consta de las siguientes opciones:

    • La opción --organization especifica que este es un receptor de registros a nivel de la organización.
    • La opción --include-children es obligatoria para garantizar que el receptor de registros a nivel de la organización incluya todos los registros en todas las subcarpetas y proyectos.
    • La opción --log-filter especifica los registros que se deben enrutar. En este ejemplo, excluyes los registros de operaciones de Dataflow específicamente para el proyecto $PROJECT_ID, ya que la canalización de exportación de registros de Dataflow genera más registros a medida que procesa los registros. El filtro evita que la canalización exporte sus propios registros, lo que evita un ciclo exponencial potencial.

      El resultado incluye una cuenta de servicio en el formato o#####-####@gcp-sa-logging.iam.gserviceaccount.com.

  3. Guarda la cuenta de servicio en $LOG_SINK_SA como la siguiente variable:

     export LOG_SINK_SA=[MY_SA]@gcp-sa-logging.iam.gserviceaccount.com
    
  4. Otorga permisos a la cuenta de servicio del receptor de registros:

    gcloud pubsub topics add-iam-policy-binding org-logs-all \
        --member=serviceAccount:$LOG_SINK_SA \
        --role=roles/pubsub.publisher
    

    El comando otorga la función de IAM de publicador de Pub/Sub a la cuenta de servicio del receptor de registros en el tema de Pub/Sub org-logs-all, lo que permite que la cuenta de servicio del receptor de registros publique mensajes en el tema.

Configura un extremo de HEC de Splunk

En este paso, configurarás un extremo de HEC de Splunk y encriptarás el token de HEC recién creado.

Configura HEC de Splunk

  1. Si todavía no tienes un extremo de HEC de Splunk, consulta la documentación de Splunk para obtener información sobre cómo configurar el HEC de Splunk. El HEC de Splunk se puede ejecutar en el servicio de Splunk Cloud o en tu propia instancia de Splunk Enterprise.
  2. En tu sesión de Cloud Shell, después de crear un token de HEC de Splunk, copia el valor del token.
  3. Guarda el valor del token en un archivo llamado splunk-hec-token-plaintext.

Crea una clave de Cloud KMS para la encriptación

La URL y el token del HEC de Splunk son parámetros necesarios para la canalización de Splunk Dataflow que implementas. Para una seguridad adicional, encripta el token con una clave de Cloud KMS y solo aprueba el token encriptado cuando crees el trabajo de Dataflow. Esto evita que se filtre el token del HEC de Splunk desde la consola de Dataflow o los detalles del trabajo.

  1. En Cloud Shell, crea un llavero de claves de Cloud KMS con el nombre :

    # Create a key ring in same location
    gcloud kms keyrings create export-keys \
      --location=$REGION_ID
    
  2. Crea una clave de Cloud KMS en el nuevo llavero de claves:

    # Create a key on the new key ring
    gcloud kms keys create hec-token-key \
        --keyring=export-keys \
        --location=$REGION_ID \
        --purpose="encryption"
    

Agrega un permiso para encriptar y desencriptar el token HEC de Splunk

Antes de encriptar el token del HEC de Splunk, debes tener las funciones de IAM de encriptador y desencriptador para usar la clave. También necesitas las funciones de IAM de encriptador y desencriptador para la cuenta de servicio del controlador de Dataflow, ya que los trabajadores de la canalización de Dataflow deben desencriptar el parámetro de token de Splunk de forma local.

Los trabajadores de la canalización de Dataflow son instancias de Compute Engine y, de forma predeterminada, usan la cuenta de servicio de Compute Engine de tu proyecto: project-number-compute@developer.gserviceaccount.com.

La cuenta de servicio de Compute Engine se crea automáticamente cuando habilitas la API de Compute Engine para tu proyecto. La cuenta de servicio de Compute Engine actúa como la cuenta de servicio del controlador de Dataflow que usa Dataflow para acceder a los recursos y ejecutar operaciones.

  1. En Cloud Console, ve a la página Seguridad.

    Ir a la página de seguridad

  2. Selecciona la pestaña Claves criptográficas.

  3. Selecciona la casilla de verificación que se encuentra junto al llavero de claves que creaste.

  4. Si el panel para editar los permisos aún no está abierto, haz clic en Mostrar panel de información.

  5. En el panel de información, en la pestaña Permisos, haz clic en Agregar miembro.

  6. Agrega tu cuenta de proyecto y project-number-compute@developer.gserviceaccount.com como miembros.

  7. Selecciona la función Encriptador/Desencriptador de CryptoKey de Cloud KMS.

  8. Haz clic en Guardar.

Agrega la función de encriptador y desencriptador.

Encripta el token de HEC de Splunk

En Cloud Shell, encripta el token de HEC de Splunk mediante la clave hec-token-key de Cloud KMS que creaste cuando configuraste el extremo del HEC de Splunk:

    gcloud kms encrypt \
        --key=hec-token-key \
        --keyring=export-keys \
        --location=$REGION_ID \
        --plaintext-file=./splunk-hec-token-plaintext \
        --ciphertext-file=./splunk-hec-token-encrypted

Este comando crea un archivo nuevo con el token del HEC de Splunk encriptado llamado splunk-hec-token-encrypted. Ahora puedes borrar el archivo temporal splunk-hec-token-plaintext.

Planifica la capacidad de canalización de Dataflow

Antes de implementar la canalización de Dataflow, debes determinar su tamaño y capacidad de procesamiento máximos. Determinar estos valores garantiza que la canalización pueda controlar el volumen máximo de registros diarios (GB por día) y la tasa de mensajes de registro (eventos por segundo, o EPS) de la suscripción ascendente de Pub/Sub sin causar ninguna de estas opciones:

  • Demoras debido a la acumulación de mensajes o a la regulación de mensajes.
  • Costos adicionales por aprovisionamiento excesivo de una canalización (para obtener más detalles, consulta la nota al final de esta sección).

Los valores de ejemplo de este instructivo se basan en una organización con las siguientes características:

  • Genera 1 TB de registros por día.
  • Tiene un tamaño promedio de mensajes de 1 KB.
  • Tiene una tasa de mensajes máxima y constante dos veces la tasa promedio.

Puedes reemplazar los valores de ejemplo con valores de tu organización a medida que avanzas con los pasos en Establece el tamaño máximo de canalización y Establece parámetros de control de velocidad.

Establece el tamaño máximo de canalización

  1. Determina el EPS promedio con la siguiente fórmula:

    \( {AverageEventsPerSecond}\simeq\frac{TotalDailyLogsInTB}{AverageMessageSizeInKB}\times\frac{10^9}{24\times3600} \)

    En este ejemplo, la tasa promedio de registros generados es de 11.5k EPS.

  2. Determina el EPS continuo con la siguiente fórmula, en la que el multiplicador N representa la naturaleza inestable del registro. En este ejemplo, N=2, por lo que la tasa máxima de registros generados es de 23,000 EPS.

    \( {PeakEventsPerSecond = N \times\ AverageEventsPerSecond} \)

  3. Después de calcular el EPS máximo, puedes usar los siguientes lineamientos de tamaño para determinar la cantidad máxima de CPU virtuales requeridas. También puedes usar este número para calcular la cantidad máxima de trabajadores de Dataflow, o maxNumWorkers, suponiendo que el tipo de máquina es n1-standard-4.

    \( {maxCPUs = ⌈PeakEventsPerSecond / 3k ⌉\\ maxNumWorkers = ⌈maxCPUs / 4 ⌉} \)

    En este ejemplo, necesitas un máximo de ⌈23 / 3⌉ = 8 núcleos de CPU virtual, que es un máximo de 2 trabajadores de VM del tipo de máquina predeterminado n1-standard-4.

  4. En Cloud Shell, configura el tamaño de la canalización con las siguientes variables de entorno:

    export DATAFLOW_MACHINE_TYPE="n1-standard-4"
    export DATAFLOW_MACHINE_COUNT=2
    

Establece parámetros de control de velocidad

La canalización de Splunk Dataflow tiene parámetros de control de velocidad. Estos parámetros ajustan su tasa de EPS de salida y evitan que el extremo descendente del HEC de Splunk se sobrecargue.

  1. Para maximizar la tasa de EPS, determina la cantidad total de conexiones paralelas al HEC de Splunk en todos los trabajadores de VM mediante el siguiente lineamiento:

    \( {parallelism = maxCPUs * 2} \)

    Anula la configuración parallelism para dar cuenta de las conexiones paralelas de 2 a 4 por CPU virtual, con la cantidad máxima de trabajadores implementados. El valor predeterminado parallelism de 1 inhabilita el paralelismo, lo que limita de forma artificial la tasa de salida.

    En este ejemplo, la cantidad de conexiones paralelas se calcula para que sea 2 x 8 = 16.

  2. Para aumentar los EPS y reducir la carga en el HEC de Splunk, usa la agrupación de eventos:

    \( {batchCount >= 10} \)

    Con un mensaje de registro promedio de alrededor de 1 KB, te recomendamos que agrupes al menos 10 eventos por solicitud. Establecer esta cantidad mínima de eventos ayuda a evitar una carga excesiva en el HEC de Splunk y, al mismo tiempo, aumentar la tasa efectiva de EPS.

  3. En Cloud Shell, configura las siguientes variables de entorno para los controles de frecuencia mediante los valores calculados de parallelism y batchCount:

    export DATAFLOW_PARALLELISM=16
    export DATAFLOW_BATCH_COUNT=10
    

Resumen de los parámetros de capacidad de canalización

En la siguiente tabla, se muestran los valores de capacidad de la canalización usados para los siguientes pasos de este instructivo, junto con las prácticas recomendadas para configurar estos parámetros de canalización.

Parámetro Valor del instructivo Prácticas recomendadas generales
DATAFLOW_MACHINE_TYPE n1-standard-4 Configurarlo como tamaño de máquina de referencia n1-standard-4 para obtener el mejor rendimiento en relación con el costo
DATAFLOW_MACHINE_COUNT

2

Configurado como la cantidad de trabajadores que maxNumWorkers necesita para manejar el EPS estimado previsto como se calcula más arriba
DATAFLOW_PARALLELISM

16

Configúralo en 2 × CPU virtuales/trabajador × maxNumWorkers para maximizar la cantidad de conexiones de HEC paralelas
DATAFLOW_BATCH_COUNT

10

Configurarlo como 10 a 50 eventos o solicitudes para registros, siempre que la demora máxima del almacenamiento en búfer (dos segundos) sea aceptable

Una canalización de ajuste de escala automático implementa un disco persistente de datos (de forma predeterminada, 400 GB) para cada trabajador de transmisión potencial, suponiendo que el número máximo de trabajadores es maxNumWorkers. Estos discos se activan entre los trabajadores en ejecución en cualquier momento, incluido el inicio.

Debido a que cada instancia de trabajador tiene un límite de 15 discos persistentes, la cantidad mínima de trabajadores iniciales es de ⌈maxNumWorkers/15⌉. Por lo tanto, si el valor predeterminado es maxNumWorkers=20, el uso (y el costo) de la canalización es el siguiente:

  • Almacenamiento: estático con 20 discos persistentes.
  • Procesamiento: dinámico con un mínimo de dos instancias de trabajador (⌈20/15⌉ = 2) y un máximo de 20.

Este valor es equivalente a 8 TB de disco persistente, lo que puede generar costos innecesarios si los discos no se usan por completo, en especial si uno o dos trabajadores se ejecutan la mayoría del tiempo.

Exporta registros mediante la canalización de Dataflow

En esta sección, implementarás la canalización de Dataflow que entrega mensajes de registro de Google Cloud al HEC de Splunk. También implementarás recursos dependientes, como temas no procesados (también conocidos como temas de mensajes no entregados) y suscripciones para contener cualquier mensaje no entregado.

Implementa la canalización de Dataflow

  1. En Cloud Shell, crea un tema y una suscripción de Pub/Sub para usarlos como suscripción no procesada:

     gcloud pubsub topics create org-logs-all-dl
     gcloud pubsub subscriptions create --topic org-logs-all-dl org-logs-all-dl-sub
    
  2. Establece las siguientes variables de entorno para configurar los parámetros de la plantilla:

    # Splunk HEC endpoint values
    export SPLUNK_HEC_URL=YOUR_SPLUNK_HEC_URL
    export SPLUNK_HEC_TOKEN=`cat ./splunk-hec-token-encrypted | base64`
    # Dataflow pipeline input subscription and dead-letter topic
    export DATAFLOW_INPUT_SUB="org-logs-all-sub"
    export DATAFLOW_DEADLETTER_TOPIC="org-logs-all-dl"
    

    Reemplaza YOUR_SPLUNK_HEC_URL por la URL de HEC de Splunk con el formulario protocol://host[:port], en el que se muestra lo siguiente:

    • protocol es http o https.
    • host es el nombre de dominio completamente calificado (FQDN) o la dirección IP de tu instancia de HEC de Splunk o, si tienes varias instancias de HEC, el balanceador de cargas HTTP(S) asociado (o basado en DNS).
    • port es el número de puerto HEC. Es opcional y depende de la configuración del extremo del HEC de Splunk.

    YOUR_SPLUNK_HEC_URL no debe incluir la ruta de acceso del extremo HEC, por ejemplo, /services/collector. Por el momento, la plantilla de Dataflow de Splunk solo admite el extremo /services/collector para eventos con formato JSON y agrega de forma automática esa ruta a la entrada de URL de HEC de Splunk. Para obtener más información sobre ese extremo del HEC, consulta la documentación de Splunk para services/collector endpoint.

    Un ejemplo de una entrada de URL de HEC válida de Splunk es https://splunk-hec.example.com:8088. Si envías datos a HEC en Splunk Cloud, consulta Enviar datos a HEC en Splunk Cloud para determinar las partes anteriores host y port de tu URL específica de HEC de Splunk.

  3. Implementa la canalización de Dataflow:

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    tokenKMSEncryptionKey=projects/${PROJECT_ID}/locations/${REGION_ID}/keyRings/export-keys/cryptoKeys/hec-token-key,\
    url=${SPLUNK_HEC_URL},\
    token=${SPLUNK_HEC_TOKEN},\
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://splk-public/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    Copia el ID de trabajo nuevo que se muestra en el resultado.

    De forma predeterminada, la canalización de Splunk Dataflow valida el certificado SSL para el extremo del HEC de Splunk. Si quieres usar certificados autofirmados para el desarrollo y las pruebas, debes inhabilitar la validación de SSL. Para obtener más información, consulta los parámetros de la plantilla de Pub/Sub a Splunk Dataflow (disableCertificateValidation).

  4. Guarda el ID de trabajo nuevo en la variable de entorno DATAFLOW_JOB_ID. Usarás esta variable en un paso posterior.

    export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"
    

Visualiza registros en Splunk

Los trabajadores de la canalización de Dataflow tardan unos minutos en aprovisionarse y estar listos para entregar los registros al HEC de Splunk. Puedes confirmar que los registros se reciben y se indexan correctamente en la interfaz de búsqueda de Splunk Enterprise o Splunk Cloud.

Los trabajadores de la canalización de Dataflow deberían tardar solo unos minutos en aprovisionarse y estar listos para entregar los registros al HEC de Splunk. Puedes confirmar que los registros se reciben y se indexan correctamente en la interfaz de búsqueda de Splunk Enterprise o Splunk Cloud. Para ver la cantidad de registros por tipo de recurso supervisado, sigue estos pasos:

  1. En Splunk, abre Informes y búsqueda de Splunk.
  2. Ejecuta el index=[MY_INDEX] | stats count by resource.type de búsqueda donde se configura el índice MY_INDEX para tu token del HEC de Splunk.

    Visualiza registros en Splunk.

  3. Si no ves ningún evento, consulta Controla los errores en la entrega.

Transforma eventos en tránsito con UDF

La plantilla de Splunk Dataflow es compatible con UDF para la transformación de eventos personalizados. La canalización que implementaste usa una UDF de muestra, especificada por los parámetros opcionales javascriptTextTransformGcsPath y javascriptTextTransformFunctionName. La UDF de muestra incluye ejemplos de código para el enriquecimiento de eventos, que incluye agregar campos nuevos o configurar metadatos de HEC de Splunk según el evento. La UDF de muestra también incluye lógica de decodificación para volver a reproducir entregas con errores, que aprendes a hacer en Modifica la UDF de muestra.

En esta sección, editarás la función UDF de muestra para agregar un nuevo campo de evento. En este campo nuevo, se especifica el valor de la suscripción de Pub/Sub de origen como información contextual adicional.

Modifica la UDF de muestra

  1. En Cloud Shell, descarga el archivo JavaScript que contiene la función UDF de muestra:

    wget https://storage.googleapis.com/splk-public/js/dataflow_udf_messages_replay.js
    
  2. Abre el archivo JavaScript en el editor que prefieras. Quita los comentarios de la línea que agrega un campo nuevo inputSubscription a la carga útil del evento:

    // event.inputSubscription = "splunk-dataflow-pipeline";
    
  3. Configura el campo de evento nuevo inputSubscription como "org-logs-all-sub" para realizar un seguimiento de la suscripción de Pub/Sub de entrada de donde proviene el evento:

    event.inputSubscription = "org-logs-all-sub";
    
  4. Guarda el archivo.

  5. En Cloud Shell, crea un bucket nuevo de Cloud Storage:

    # Create a new Cloud Storage bucket
    gsutil mb -b on gs://${PROJECT_ID}-dataflow/
    
  6. Sube el archivo al bucket de Cloud Storage.

    # Upload JavaScript file
    gsutil cp ./dataflow_udf_messages_replay.js gs://${PROJECT_ID}-dataflow/js/
    

Actualiza la canalización de Dataflow con la UDF nueva

  1. En Cloud Shell, detén la canalización con la Opción de desvío para asegurarte de que no se pierdan los registros que ya se extrajeron de Pub/Sub:

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    
  2. Implementa una canalización nueva con la UDF actualizada:

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    tokenKMSEncryptionKey=projects/${PROJECT_ID}/locations/${REGION_ID}/keyRings/export-keys/cryptoKeys/hec-token-key,\
    url=${SPLUNK_HEC_URL},\
    token=${SPLUNK_HEC_TOKEN},\
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://${PROJECT_ID}-dataflow/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    Copia el ID de trabajo nuevo que se muestra en el resultado.

  3. Guarda el ID de trabajo en la variable de entorno DATAFLOW_JOB_ID.export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"

Controla los errores en la entrega

Los errores en la entrega pueden ocurrir debido a errores en el procesamiento de eventos o la conexión con el HEC de Splunk. En esta sección, ingresarás una falla de entrega para demostrar el flujo de trabajo de manejo de errores. También aprenderás a ver y activar la entrega de los mensajes con errores a Splunk.

Descripción general del control de errores

En el siguiente diagrama, se muestra el flujo de trabajo de manejo de errores en la canalización de Splunk Dataflow:

Exportación de registros a Splunk

  1. La canalización de Pub/Sub a Splunk Dataflow (la canalización principal) reenvía automáticamente mensajes que no se pueden entregar al tema no procesado para la investigación del usuario.
  2. El operador investiga los mensajes con errores en la suscripción no procesada, soluciona los problemas y corrige la causa raíz de la falla de entrega, por ejemplo, corrige la configuración incorrecta del token del HEC.
  3. El operador activa una canalización de Pub/Sub a Pub/Sub Dataflow (la canalización secundaria). Esta canalización (destacada en la sección punteada del diagrama anterior) es una canalización temporal que mueve los mensajes con errores de la suscripción no procesada al tema del receptor de registros original.
  4. La canalización principal vuelve a procesar los mensajes con errores anteriores. Este paso requiere que la canalización use la UDF de muestra para la detección y decodificación correctas de las cargas útiles de mensajes con errores. En la siguiente parte de la función, se implementa esta lógica de decodificación condicional, incluido un recuento de intentos de entrega con fines de seguimiento:

    // If message has already been converted to Splunk HEC object  with stringified
     // obj.event JSON payload, then it's a replay of a previously failed delivery:
     // Unnest and parse obj.event. Drop previously injected obj.attributes
     // such as errorMessage and timestamp
     if (obj.event) {
       try {
         event = JSON.parse(obj.event);
         redelivery = true;
       } catch(e) {
         event = obj;
       }
     } else {
       event = obj;
     }
    
     // Keep a tally of delivery attempts
     event.delivery_attempt = event.delivery_attempt || 1;
     if (redelivery) {
       event.delivery_attempt += 1;
     }
    

Activa errores en la entrega

En esta sección, activarás fallas de entrega. Puedes ingresar una falla de entrega de forma manual con cualquiera de los siguientes métodos:

  • Detén el servidor de Splunk (si es una sola instancia) que provoca errores de conexión.
  • Inhabilita el token de HEC correspondiente de tu configuración de entrada de Splunk.

Solución de problemas de los mensajes con errores

Para investigar un mensaje con errores, puedes usar Cloud Console:

  1. En Cloud Console, abre la página Suscripciones a Pub/Sub.

    Ir a Suscripciones de Pub/Sub

  2. Haz clic en la suscripción no procesada que creaste. Si usaste el ejemplo anterior, el nombre de la suscripción es: projects/${PROJECT_ID}/subscriptions/org-logs-all-dl-sub.

  3. Para abrir el visualizador de mensajes, haz clic en Ver mensajes.

  4. Para ver los mensajes, haz clic en Extraer y asegúrate de dejar en blanco la opción Habilitar mensajes de confirmación.

  5. Ahora puedes inspeccionar los mensajes con errores, en particular:

    • La carga útil del evento de Splunk en la columna Message body.
    • El mensaje de error en la columna attribute.errorMessage.
    • La marca de tiempo del error en la columna attribute.timestamp.

La siguiente captura de pantalla es un ejemplo de un mensaje de error que encuentras si el extremo del HEC de Splunk está fuera de servicio temporalmente o no se puede acceder a él. Observa el atributo errorMessage: The target server failed to respond.

Atributos de mensajes con errores.

Tipos de errores de entrega

En la siguiente tabla, se enumeran algunos posibles errores de entrega de Splunk, junto con el atributo errorMessage que la canalización registra con cada mensaje antes de reenviar estos mensajes al tema no procesado:

Tipo de error de entrega ¿La canalización se reintenta de forma automática? Atributo de ejemplo errorMessage
Error de red transitorio Read timed out

o

Connection reset

Error 5xx del servidor de Splunk Splunk write status code: 503
Error 4xx del servidor de Splunk No Splunk write status code: 403
El servidor de Splunk está fuera de servicio No The target server failed to respond
El certificado SSL de Splunk no es válido No Host name X does not match the certificate
Error de sintaxis de JavaScript en UDF No ReferenceError: foo is not defined

En algunos casos, la canalización intenta automáticamente realizar reintentos con una retirada exponencial. Algunos ejemplos incluyen errores 5xx de servidor de Splunk, que ocurren si el extremo del HEC de Splunk está sobrecargado. De manera alternativa, podría haber un problema persistente que impida que se envíe un mensaje al HEC. En este caso, la canalización no intenta realizar un reintento. A continuación, se muestran ejemplos de problemas persistentes:

  • Un error de sintaxis en la función UDF.
  • Un token de HEC no válido que provoca la respuesta 4xx "Prohibido" en el servidor de Splunk.

Vuelve a reproducir mensajes con errores

En esta sección, volverás a reproducir los mensajes no procesados, suponiendo que la causa raíz de la falla de entrega ya se corrigió. Si inhabilitaste el extremo del HEC de Splunk en la sección Activa errores en la entrega, verifica que el extremo del HEC de Splunk ahora funcione.

  1. En Cloud Shell, antes de volver a procesar los mensajes de la suscripción no procesada, te recomendamos que tomes una instantánea de la suscripción no procesada. Esto evita la pérdida de mensajes si hay un error de configuración inesperado.

     gcloud pubsub snapshots create dlt-snapshot-`date +"%Y%m%d-%H%M%S"` \
         --subscription=org-logs-all-dl-sub
    
  2. Usa la plantilla de Pub/Sub a Pub/Sub Dataflow para transferir los mensajes de la suscripción no procesada al tema de entrada con otro trabajo de Dataflow:

      DATAFLOW_INPUT_TOPIC="org-logs-all"
      DATAFLOW_DEADLETTER_SUB="org-logs-all-dl-sub"
    
      JOB_NAME=splunk-dataflow-replay-`date +"%Y%m%d-%H%M%S"`
      gcloud dataflow jobs run $JOB_NAME \
           --gcs-location= gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
           --worker-machine-type=n1-standard-2 \
           --max-workers=1 \
           --region=$REGION_ID \
           --parameters \
      inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_DEADLETTER_SUB},\
      outputTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_INPUT_TOPIC}
    

    Copia el ID de trabajo de Dataflow que muestra este comando.

  3. Guarda el ID de trabajo de Dataflow en la variable de entorno DATAFLOW_JOB_ID.

  4. En Cloud Console, ve a la página Temas de Pub/Sub.

    Ve a la página de suscripciones de Pub/Sub.

  5. Selecciona la suscripción no procesada. Confirma que el recuento de mensajes sin confirmación sea 0.

    Mensajes con errores.

  6. En Cloud Shell, desvía el trabajo de Dataflow que creaste:

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    

    Cuando los mensajes se transfieren de vuelta al tema de entrada original, la canalización principal de Dataflow recoge de forma automática los mensajes con errores y los vuelve a entregar a Splunk.

Confirma mensajes en Splunk

  1. Para confirmar que los mensajes se volvieron a entregar, en Splunk abre Informes y búsqueda de Splunk

  2. Realiza una búsqueda de delivery_attempts > 1. Este es un campo especial que la UDF de muestra agrega a cada evento para realizar un seguimiento de la cantidad de intentos de entrega. Asegúrate de expandir el intervalo de tiempo de búsqueda para incluir eventos que pueden haber ocurrido antes, ya que la marca de tiempo del evento es la hora original de creación, no la hora de la indexación.

En la imagen de ejemplo siguiente, los dos mensajes que originalmente fallaron se entregaron y se indexaron de forma correcta en Splunk con la marca de tiempo adecuada de unos días atrás. Ten en cuenta que el valor del campo insertId es el mismo que el valor encontrado cuando se inspeccionan los mensajes con errores mediante la extracción manual de la suscripción no procesada. insertId es un identificador único para la entrada de registro original que asigna Cloud Logging.

Mensajes con errores en Splunk

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el receptor a nivel de organización

gcloud logging sinks delete org-logs-all-sink --organization=$ORG_ID

Borra el proyecto

Con el receptor de registros borrado, puedes continuar con la eliminación de recursos creados para recibir y exportar registros. La manera más fácil es borrar el proyecto que creaste para el instructivo.

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

¿Qué sigue?