Procesar registros a gran escala con Cloud Dataflow

Google Cloud Platform proporciona la infraestructura escalable que necesitas para administrar operaciones grandes y diversas de análisis de registro. En esta solución, aprenderás a usar Cloud Platform para compilar canalizaciones de análisis que procesan entradas de registro provenientes de varias fuentes. Combinarás los datos de registro de manera que te ayuden a extraer información significativa y mantener estadísticas derivadas de los datos, las cuales se pueden usar para realizar el análisis, la revisión y el informe.

Descripción general

A medida que tu aplicación se vuelve más compleja, las estadísticas obtenidas a partir de los datos capturados en tus registros se vuelven más desafiantes. Los registros provienen de una mayor cantidad de fuentes, por lo que pueden ser difíciles de compilar y consultar para obtener información útil. La compilación, la operación y el mantenimiento de tu propia infraestructura para analizar los datos de registro a gran escala pueden requerir una amplia experiencia en la ejecución de sistemas y almacenamiento distribuidos. Este tipo de infraestructura dedicada a menudo representa un gasto de capital único, lo que da como resultado una capacidad fija. Esto dificulta su escala más allá de la inversión inicial. Estas limitaciones pueden afectar tu negocio porque conducen a demoras en la generación de información significativa y estadísticas prácticas a partir de tus datos.

Esta solución te muestra cómo superar estas limitaciones mediante el uso de Cloud Platform. En esta solución, un conjunto de microservicios de muestra se ejecuta en Google Kubernetes Engine para implementar un sitio web. Stackdriver Logging recopila registros de estos servicios y los guarda en los depósitos de Google Cloud Storage. Luego, Google Cloud Dataflow procesa los registros mediante la extracción de metadatos y el cálculo de agregaciones básicas. La canalización de Cloud Dataflow está diseñada para procesar los elementos de registro diariamente a fin de generar métricas agregadas de los tiempos de respuesta del servidor, en función de los registros de cada día. Finalmente, el resultado de Cloud Dataflow se carga en las tablas de Google BigQuery, en las que se puede analizar este resultado para proporcionar inteligencia empresarial. Esta solución también explica cómo puedes cambiar la canalización a fin de que se ejecute en modo de transmisión, para el procesamiento de registros asíncronos de baja latencia.

La solución usa varios componentes de Cloud Platform

El instructivo incluido proporciona una canalización de Cloud Dataflow de muestra, una aplicación web de muestra, la información sobre la configuración y los pasos para ejecutar la muestra.

Acerca de la aplicación

La implementación de muestra presenta una aplicación de compras. En esta muestra, los usuarios pueden visitar la página principal de un sitio de venta minorista, explorar productos individuales y, luego, intentar localizar los productos en tiendas físicas cercanas. La aplicación consta de tres microservicios: HomeService, BrowseService y LocateService. Cada servicio está disponible en un extremo de API en un espacio de nombres compartidos. Los usuarios acceden a los servicios agregando /home, /browse y /locate a la URL base.

La aplicación está configurada para registrar las solicitudes HTTP entrantes a stdout.

Cómo usar Kubernetes Engine con Stackdriver Logging

En este ejemplo, los microservicios se ejecutan en un clúster de Kubernetes Engine, que es un grupo de instancias de Google Compute Engine, o nodos, que ejecutan Kubernetes. Según la configuración predeterminada, Kubernetes Engine configura cada nodo para ofrecer una variedad de servicios, incluida la supervisión, la verificación de estado y el registro centralizado. Esta solución usa esta compatibilidad integrada con Stackdriver Logging para enviar los registros de cada microservicio a Cloud Storage. Como alternativa para las aplicaciones que registran información en archivos, que no se incluyen en esta solución, puedes configurar el registro a nivel de clúster con Kubernetes.

Cada microservicio se ejecuta en un pod individual en el clúster. Cada pod se ejecuta en un nodo y está expuesto a un extremo HTTP único mediante el uso de los servicios de Kubernetes Engine.

Los microservicios se ejecutan en nodos individuales

Cada nodo del clúster ejecuta un agente de Stackdriver Logging que captura los mensajes de registro. Después de que los registros estén disponibles en Stackdriver Logging, una secuencia de comandos automáticamente exporta los registros a un depósito de Cloud Storage, con la asistencia de Stackdriver Logging disponible en el SDK de Cloud. En logging.sh, la muestra ejecuta comandos similares a los siguientes ejemplos:

# Create a Cloud Storage Bucket
gsutil -q mb gs://BUCKET_NAME

# Allow Stackdriver Logging access to the bucket
gsutil -q acl ch -g cloud-logs@google.com:O gs://BUCKET_NAME

# For each microservice, set up Stackdriver Logging exports
gcloud beta logging sinks create SINK_NAME \
  storage.googleapis.com/BUCKET_NAME \
  --log=”kubernetes.home_service…” --project=PROJECT_ID

Ten en cuenta que también puedes configurar los registros para que se exporten a Cloud Storage, mediante el uso del Visor de registros. Esta solución usa el SDK porque es obligatorio al momento de exportar varios registros.

Con el uso de Cloud Storage como un destino de las exportaciones de registros, las entradas de registro de tipo LogEntry se guardan en lotes por hora en archivos JSON individuales. Estas entradas estructuradas de Cloud Logging incluyen metadatos adicionales que especifican cuándo se creó cada mensaje de registro, qué recurso o instancia lo generó, cuál es su nivel de gravedad, etcétera. En el siguiente ejemplo de una entrada de Stackdriver Logging, en el elemento structPayload.log puedes ver el mensaje de registro original que generó el microservicio:

{
  "metadata": {
    "projectId": "...",
    "serviceName": "compute.googleapis.com",
    "zone": "us-central1-f",
    "labels": {
      "compute.googleapis.com/resource_id": "4154944251817867710",
      "compute.googleapis.com/resource_type": "instance"
    },
    "timestamp": "2015-09-22T20:01:13Z"
  },
  "insertId": "2015-09-22|13:01:17.636360-07|10.106.196.103|1124257302",
  "log": "kubernetes.browse-service-iaus6_default_sample-browse-service",
  "structPayload": {
    "stream": "stdout",
    "log": "2015/09/22 - 20:01:13 | 404 | 176ns | 10.160.0.1:34790 | GET /browse/46"
  }
}

Cómo crear la canalización de Cloud Dataflow

Cloud Dataflow es un sistema simple, pero potente que se puede usar para muchos tipos de tareas de procesamiento de datos. El SDK de Dataflow ofrece un modelo de datos unificado que puede representar un conjunto de datos de cualquier tamaño, incluido un conjunto de datos ilimitado o infinito de una fuente de datos que se actualiza de manera continua; es ideal para trabajar con los datos de registro en esta solución. El servicio administrado de Dataflow puede ejecutar trabajos en lotes y de transmisión. Esto significa que puedes usar una base de código única para el procesamiento síncrono o asíncrono de datos basado en eventos y en tiempo real.

El SDK de Dataflow ofrece representaciones simples de datos por medio de una clase de colecciones especializada denominada PCollection. El SDK proporciona transformaciones de datos integradas y personalizadas mediante la clase de PTransform. En Cloud Dataflow, las transformaciones representan la lógica de procesamiento de una canalización. Las transformaciones se pueden usar para una variedad de operaciones de procesamiento, como unir datos, calcular valores matemáticamente, filtrar la salida de datos o convertir datos de un formato a otro. Para obtener más información sobre las canalizaciones, las PCollections, las transformaciones, las fuentes de E/S y los receptores, consulta el Modelo de programación de Dataflow.

El siguiente diagrama muestra las operaciones de canalización para los datos de registro almacenados en Cloud Storage:

La canalización de Cloud Dataflow tiene varios pasos

Aunque el diagrama podría parecer complejo, Cloud Dataflow facilita la compilación y el uso de la canalización. Las siguientes secciones describen las operaciones específicas en cada etapa de la canalización.

Cómo recibir los datos

La canalización comienza con el consumo de entrada de los depósitos de Cloud Storage que contienen los registros de los tres microservicios. Cada colección de registros se convierte en una PCollection de elementos String, en la que cada elemento corresponde a un objeto de LogEntry único. En el siguiente fragmento, homeLogs, browseLogs y locateLogs son de tipo PCollection<String>:

homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));

Para enfrentar los desafíos de un conjunto de datos que se actualiza de manera continua, el SDK de Dataflow utiliza una técnica llamada sistema de ventanas. Este sistema funciona subdividiendo de manera lógica los datos en una PCollection, según las marcas de tiempo de sus elementos individuales. Debido a que el tipo de fuente es TextIO en este caso, todos los objetos se leen inicialmente en una ventana global única, que es el comportamiento predeterminado.

Cómo recopilar los datos en objetos

El próximo paso combina las PCollections de microservicios individuales con una PCollection única mediante el uso de la operación Flatten.

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

Esta operación es útil porque cada PCollection fuente contiene el mismo tipo de datos y usa la misma estrategia global del sistema de ventanas. Aunque las fuentes y la estructura de cada registro son las mismas en esta solución, podrías extender este enfoque a uno en el que la fuente y la estructura sean diferentes.

Con una PCollection única creada, ahora puedes procesar los elementos individuales de String mediante el uso de una transformación personalizada que realiza varios pasos en la entrada de registros. A continuación, se indican los pasos que debes seguir:

Una transformación procesa mensajes de string para crear mensajes de registro.

  • Deserializa la string JSON en un objeto Java LogEntry para Stackdriver Logging.
  • Extrae la marca de tiempo de los metadatos de LogEntry.
  • Extrae los siguientes campos individuales del mensaje de registro mediante el uso de expresiones normales: timestamp, responseTime, httpStatusCode, httpMethod, dirección IP source y extremo destination. Usa estos campos para crear un objeto personalizado LogMessage con marca de tiempo.
  • Envía los objetos LogMessage en una PCollection nueva.

El siguiente código realiza los pasos:

PCollection<LogMessage> allLogMessages = allLogs
  .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

Cómo agregar los datos por días

Recuerda que el objetivo es procesar los elementos diariamente para generar métricas agregadas basadas en los registros de cada día. Para lograr esta agregación, se requiere una función del sistema de ventanas que subdivida los datos por día, lo cual es posible porque cada LogMessage en la PCollection tiene una marca de tiempo. Después de que Cloud Dataflow divida la PCollection junto con los límites diarios, las operaciones que admiten las PCollections con ventanas respetarán el esquema del sistema de ventanas.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply(Window.named("allLogMessageToDaily").<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

Con una PCollection con ventanas única, ahora puedes calcular métricas diarias en las tres fuentes de registro de varios días, mediante la ejecución de un solo trabajo de Cloud Dataflow.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Combine.<String,Double,Double>perKey(new Max.MaxDoubleFn()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

Primero, una transformación toma objetos de LogMessage como entrada y luego produce una PCollection de pares clave-valor que asignan los extremos de destino como claves para los valores de los tiempos de respuesta. Mediante el uso de esa PCollection, puedes calcular dos métricas agregadas: el tiempo de respuesta máximo por destino y el tiempo de respuesta promedio por destino. Debido a que la PCollection aún se divide por día, el resultado de cada cálculo representará los datos de registro de un solo día. Esto significa que tendrás dos PCollections finales: una que contiene el tiempo de respuesta máximo por destino y otra que contiene el tiempo de respuesta promedio por destino por día.

Cálculo de las métricas agregadas diarias

Cómo cargar datos en BigQuery

El último paso de la canalización produce las PCollections resultantes para BigQuery a fin de realizar el análisis posterior y el almacenamiento de datos.

Primero, la canalización transforma la PCollection que contiene los objetos de LogMessage para todas las fuentes de registro en una PCollection de objetos de TableRow de BigQuery. Este paso es necesario a fin de aprovechar la compatibilidad integrada en Cloud Dataflow para usar BigQuery como un receptor de una canalización.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));

Las tablas de BigQuery requieren esquemas definidos. Para esta solución, los esquemas se definen en LogAnalyticsPipelineOptions.java mediante el uso de una anotación de valor predeterminado. Por ejemplo, el esquema para la tabla del tiempo de respuesta máximo se define de la siguiente manera:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

Una operación en las PCollections que contienen los valores de tiempo de respuesta agregados las convierte en PCollections de objetos de TableRow, mediante la aplicación de esquemas adecuados y la creación de tablas, si faltan.

logsAsTableRows.apply(BigQueryIO.Write
  .named("allLogsToBigQuery")
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Esta solución siempre agrega datos nuevos a los datos existentes. Esta es una opción adecuada, ya que la canalización se ejecuta periódicamente para analizar nuevos datos de registro. No obstante, es posible truncar los datos de la tabla existente o solo escribir en la tabla si está vacía, si una de estas opciones tiene más sentido en una situación diferente.

Cómo consultar los datos

La consola de BigQuery te permite ejecutar consultas en los datos de salida y conectarte a herramientas de inteligencia empresarial de terceros, como Tableau y QlikView para un análisis adicional.

La consola de BigQuery ejecuta una consulta en los datos de registro

Cómo usar una canalización de transmisión

La muestra incluye compatibilidad para ejecutar la canalización en el lote o el modo de transmisión. Solo se requieren unos pocos pasos para cambiar la canalización por lote a la de transmisión. En primer lugar, la configuración de Stackdriver Logging exporta la información de registro a Cloud Pub/Sub en lugar de Cloud Storage. El próximo paso es cambiar las fuentes de entradas en la canalización de Cloud Dataflow desde Cloud Storage hacia las suscripciones al tema de Cloud Pub/Sub. Necesitas una suscripción por cada fuente de entrada.

La canalización de Cloud Pub/Sub usa suscripciones

Puedes ver los comandos del SDK en uso en logging.sh.

Las PCollections creadas a partir de los datos de entradas de Cloud Pub/Sub usan una ventana global no delimitada. Sin embargo, las entradas individuales ya incluyen marcas de tiempo. Esto significa que no es necesario extraer los datos de las marcas de tiempo del objeto de LogEntry de Stackdriver Logging, solo extraer las marcas de tiempo de registro para crear objetos de LogMessage personalizados.

Con el uso de la canalización de Cloud Pub/Sub, puedes extraer las marcas de tiempo de los registros

El resto de la canalización se mantiene como está, incluidas las operaciones posteriores de transformación flatten, agregación y salida.

Cómo ejecutar la canalización

En el instructivo, se pueden encontrar los pasos para configurar, compilar y también implementar esta canalización de Cloud Dataflow, junto con los pasos necesarios para implementar los microservicios y configurar las exportaciones de Stackdriver Logging. Cuando ejecutas el trabajo de Cloud Dataflow, puedes usar la Google Cloud Platform Console para supervisar el progreso y ver la información sobre cada etapa de la canalización.

La siguiente imagen muestra la interfaz del usuario de la consola mientras ejecuta una canalización de ejemplo:

GCP Console muestra un trabajo de Cloud Dataflow en ejecución

Cómo extender la solución

La canalización y el conjunto de operaciones descritos en esta solución se pueden extender de diversas maneras. La extensión más evidente sería realizar agregaciones adicionales a través de los datos de LogMessage. Por ejemplo, si la sesión o la información del usuario anonimizada se incluyeran en el resultado del registro, podrías crear agregaciones sobre la actividad del usuario. También podrías usar la transformación de ApproximateQuantiles para generar una distribución de los tiempos de respuesta.

Recursos y costo

Este instructivo utiliza varios componentes facturables de Google Cloud Platform, que incluyen los siguientes:

  • Kubernetes Engine para implementar microservicios
  • Stackdriver Logging para recibir y exportar registros
  • Cloud Storage para almacenar registros exportados en modo por lotes
  • Cloud Pub/Sub para trasmitir los registros exportados en modo de transmisión
  • Cloud Dataflow para procesar los datos de registro
  • BigQuery para almacenar la salida de procesamiento y admitir consultas enriquecidas en esa salida

El costo para ejecutar este instructivo variará según el tiempo de ejecución. Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform pueden ser aptos para una prueba gratuita.

Instructivo

Puedes obtener el contenido completo del instructivo, incluidas las instrucciones de configuración y el código fuente, en GitHub en https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.

Próximos pasos

  • Prueba otras funciones de Google Cloud Platform. Revisa nuestros instructivos.
  • Aprende a usar los productos de Google Cloud Platform para crear soluciones de extremo a extremo.
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...