Procesa datos de series temporales: instructivo

Aprende a usar la solución Timeseries Streaming para realizar la detección de anomalías en los datos de series temporales.

Descripción general

La aplicación de referencia de Timeseries Streaming proporciona bibliotecas de Java para procesar datos de series temporales de transmisión a fin de rellenar vacíos y calcular métricas de los datos dentro y los períodos y en varios de ellos. Timeseries Streaming también proporciona bibliotecas de Python que ilustran un caso de uso común para los datos procesados, que se usan para obtener predicciones de un modelo de aprendizaje automático (AA).

En este instructivo, se explica cómo procesar datos de muestra generados con las bibliotecas de Java y mostrar los resultados como archivos TF.Record y como mensajes en un tema de Pub/Sub. Luego, se muestra cómo usar el código en las bibliotecas de Python para obtener predicciones de detección de anomalías de un modelo de memoria a corto plazo (LSTM). Obtienes predicciones por lotes sobre los datos en formato TF.Record, y obtienes predicciones en tiempo real sobre los datos en el tema de Pub/Sub.

Este documento forma parte de una serie:

Este documento está dirigido a desarrolladores e ingenieros de datos, y se da por sentado que tienes los siguientes conocimientos:

  • Conocimientos básicos de programación en Java y Python
  • Conocimientos básicos sobre el desarrollo y el uso del modelo de AA

Objetivos

  • Procesar los datos de la serie temporal de muestra y generarlos como archivos TF.Record
  • Procesar los datos de la serie temporal de muestra y generarlos como mensajes en un tema de Pub/Sub
  • Obtener predicciones por lotes a partir del modelo LSTM sobre los datos de TF.Records
  • Obtener predicciones en tiempo real a partir del modelo LSTM sobre los datos publicados en Pub/Sub.

Costos

En este instructivo, se usan componentes facturables de Google Cloud, que incluyen los siguientes:

  • Compute Engine
  • Dataflow
  • Pub/Sub

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Google Cloud pueden ser elegibles para obtener una prueba gratuita.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Compute Engine, Dataflow, and Pub/Sub.

    Habilita las API

  5. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  6. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  7. Habilita las API de Compute Engine, Dataflow, and Pub/Sub.

    Habilita las API

Crea recursos de Pub/Sub

Crea un tema y una suscripción de Pub/Sub. Más adelante en el instructivo, escribirás datos de series temporales procesados en el tema y, luego, harás referencia a la suscripción para obtener predicciones en tiempo real sobre esos datos.

Cree un tema

  1. Ir a la página de Cloud Pub/Sub
  2. Haz clic en Crear tema.
  3. En ID del tema, escribe time-series.
  4. Haz clic en Guardar.

Cree una suscripción

  1. Ir a la página de suscripciones de Pub/Sub.
  2. Haz clic en Crear suscripción.
  3. En ID de la suscripción, escribe time-series.
  4. Para seleccionar un tema de Cloud Pub/Sub, selecciona projects/<myProject>/topics/time-series.
  5. Desplázate hasta el final de la página y haz clic en Crear.

Cree un bucket de Cloud Storage

Crea un bucket de Cloud Storage Más adelante en el instructivo, usarás este bucket como la ubicación de archivo temporal para una canalización de Dataflow.

  1. Abre el navegador de Cloud Storage.
  2. Haga clic en Crear bucket.
  3. En el nombre del bucket, escribe time-series-myProject. Reemplaza myProject por el ID del proyecto que estás usando para completar este instructivo.
  4. Haga clic en Crear.

Crea la instancia de máquina virtual

Crea una instancia de máquina virtual de Compute Engine en la que puedas instalar la solución Timeseries Streaming.

  1. Ir a la página Instancias de VM
  2. Haga clic en Crear instancia.
  3. En Nombre (Name), escribe time-series.
  4. En Disco de arranque, haz clic en Cambiar.
  5. En Tamaño (GB), escribe 20 y, luego, haz clic en Seleccionar.
  6. En Permiso de acceso en la sección Identidad y acceso a la API, elige Permitir acceso completo a todas las API de Cloud.
  7. En Firewall, selecciona Permitir tráfico HTTP. Si seleccionas esta opción, se agregará una etiqueta de red a la VM, que asociará la regla de firewall con la VM. Luego, Compute Engine crea la regla de firewall de entrada correspondiente que permite todo el tráfico entrante en tcp:80 (HTTP).
  8. Haz clic en el botón Crear para crear e iniciar la instancia.

Instala los paquetes obligatorios

Instala los paquetes que necesita Timeseries Streaming y, luego, clona el repositorio de Timeseries Streaming.

  1. Haz clic en SSH en la columna Conectar para la instancia time-series.
  2. En la ventana de la terminal que se abre para la instancia de VM, actualiza la herramienta de apt-get con el siguiente comando:

    sudo apt-get update
    
  3. Instala los paquetes que necesita Timeseries Streaming:

    sudo apt-get install -y default-jdk gradle git python3-dev python3-pip python3-venv
    
  4. Instala la herramienta de pip:

    python3 -m pip install --upgrade pip
    
  5. Instala el módulo virtualenv:

    pip3 install virtualenv
    
  6. Actualiza la variable $PATH para incluir la ubicación de instalación virtualenv. Reemplaza virtualenv-path con la ruta de instalación para virtualenv, que es /home/<your_home_directory>/.local/bin de forma predeterminada.

    export PATH="virtualenv-path":$PATH
    

Clona el repositorio de GitHub

Clona el repositorio de Timeseries Streaming:

sudo git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git

Copia el modelo de ejemplo en Cloud Storage

Copia los artefactos del modelo de la solución en un bucket de Cloud Storage. Reemplaza myBucket por el bucket de Cloud Storage que creaste.

Usa este modelo para procesar datos de series temporales en la canalización de Dataflow.

  1. Copia la carpeta del modelo:

    gsutil cp -r dataflow-sample-applications/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/saved_model_example/serving_model_dir gs://myBucket/serving_model_dir
    
  2. Copia la carpeta del gráfico de transformación:

    gsutil cp -r dataflow-sample-applications/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/tf_transform_graph_dir gs://myBucket/tf_transform_graph_dir
    

Compila las bibliotecas de Java

Compila las bibliotecas de Java de Timeseries Streaming a fin de que puedas usarlas para procesar datos.

  1. Cambia al directorio de bibliotecas de Java de Timeseries Streaming mediante el siguiente comando:

    cd dataflow-sample-applications/timeseries-streaming/timeseries-java-applications
    
  2. Compila las bibliotecas:

    sudo ./gradlew build
    

    El proceso de compilación tarda aproximadamente 5 minutos en completarse.

Procesa datos de series temporales con las bibliotecas de Java

Sigue los pasos de esta sección para procesar datos de muestra generados automáticamente.

Para usar datos reales en lugar de datos generados, deberás crear un adaptador de datos para convertirlos en un objeto TSDataPoint como se ilustra en la biblioteca SimpleDataStreamGenerator.

Genera el resultado TF.Record

  1. Crea un directorio que contenga los archivos TF.Record generados mediante la ejecución del siguiente comando:

    sudo mkdir /tmp/simple-data
    
  2. Ejecuta los siguientes archivos para generar archivos TF.Record. Deja que el comando se ejecute durante unos minutos y, luego, escribe Ctrl+C para detener el proceso.

    sudo ./gradlew run_example --args='--enablePrintMetricsToLogs --interchangeLocation=/tmp/simple-data --withOutliers=true'
    
  3. Revisa el resultado del comando:

    ls /tmp/simple-data/data
    

    Deberías ver resultados similares a los siguientes:

    'seconds: 1602707513'$'\n''-seconds: 1602707518'$'\n''-1602707513000-1602707518000-
    2020-10-14T20:31:53.000Z-2020-10-14T20:31:58.000Z-00001-of-00005.tfrecord'
    'seconds: 1602707514'$'\n''-seconds: 1602707519'$'\n''-1602707514000-1602707519000-
    2020-10-14T20:31:54.000Z-2020-10-14T20:31:59.000Z-00001-of-00005.tfrecord'
    'seconds: 1602707515'$'\n''-seconds: 1602707520'$'\n''-1602707515000-1602707520000-
    2020-10-14T20:31:55.000Z-2020-10-14T20:32:00.000Z-00002-of-00005.tfrecord'
    'seconds: 1602707516'$'\n''-seconds: 1602707521'$'\n''-1602707516000-1602707521000-
    2020-10-14T20:31:56.000Z-2020-10-14T20:32:01.000Z-00003-of-00005.tfrecord'
    

Genera resultados de Pub/Sub

Genera mensajes de Pub/Sub y publícalos en el tema time-series con la ejecución del siguiente comando: Deja que el comando se ejecute durante unos minutos y, luego, escribe Ctrl+C para detener el proceso. Reemplaza myProject por el ID del proyecto que estás usando para completar este instructivo.

sudo ./gradlew run_example --args='--withOutliers=true --pubSubTopicForTSAccumOutputLocation=projects/myProject/topics/time-series'

Configura el entorno de Python

Crea un entorno virtual de Python y, luego, instala las bibliotecas de Python de Timeseries Streaming que contienen el código de muestra a fin de solicitar predicciones a partir del modelo LSTM.

  1. Cambia al directorio principal mediante la ejecución del siguiente comando:

    cd ~/
    
  2. Configura el entorno virtual de Python:

    virtualenv -p python3.7 streaming-tf-consumer
    
  3. Activa el entorno virtual:

    source streaming-tf-consumer/bin/activate
    
  4. Otorga los permisos necesarios al directorio dataflow-sample-applications:

    sudo chmod -R 777 dataflow-sample-applications
    
  5. Cambia al directorio de bibliotecas de Python de Timeseries Streaming:

    cd dataflow-sample-applications/timeseries-streaming/timeseries-python-applications
    
  6. Instala el código de muestra de Python:

    pip install -e .
    

Obtén predicciones

Usa los procedimientos de esta sección para obtener predicciones por lotes sobre los datos de los archivos TF.Record que generaste y para obtener predicciones en tiempo real sobre los datos que publicaste en el tema de Pub/Sub series temporales.

Obtén predicciones por lotes

Obtén predicciones por lotes sobre los datos de series temporales que generes como archivos TF.Record. Para ello, ejecuta el siguiente comando. Reemplaza myBucket por el bucket de Cloud Storage que creaste.

python ml_pipeline_examples/sin_wave_example/inference/batch_inference.py --saved_model_location=gs://myBucket/serving_model_dir --tf_transform_graph_dir=gs://myBucket/tf_transform_graph_dir --tfrecord_folder=/tmp/simple-data/data/*

Deberías ver resultados similares a los siguientes:

{'span_start_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 26), 'span_end_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 31), 'timeseries_x-value-LAST': {'input_value': 1.2381953, 'output_value': 1.2465595, 'raw_data_array': '[79.86 81.92 83.87 85.72 87.46]', 'diff': 0.008364201, 'anomaly': False}, 'timeseries_x-value-LAST-COS-MINUTE-TIMESTAMP': {'input_value': -1.407084, 'output_value': -1.4521656, 'diff': 0.045081615}, 'timeseries_x-value-LAST-SIN-MINUTE-TIMESTAMP': {'input_value': -0.077018194, 'output_value': -0.036538757, 'diff': 0.040479437}}
{'span_start_timestamp': datetime.datetime(2020, 11, 25, 0, 20, 56), 'span_end_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 1), 'timeseries_x-value-LAST': {'input_value': 0.027333755, 'output_value': -0.02873822, 'raw_data_array': '[-12.19  -8.72  -5.23  -1.75   1.75]', 'diff': 0.056071974, 'anomaly': False}, 'timeseries_x-value-LAST-COS-MINUTE-TIMESTAMP': {'input_value': 1.4148479, 'output_value': 1.4649682, 'diff': 0.050120354}, 'timeseries_x-value-LAST-SIN-MINUTE-TIMESTAMP': {'input_value': 0.0711496, 'output_value': 0.00040806632, 'diff': 0.070741534}}
{'span_start_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 38), 'span_end_timestamp': datetime.datetime(2020, 11, 25, 0, 21, 43), 'timeseries_x-value-LAST': {'input_value': 1.4099848, 'output_value': 1.5344787, 'raw_data_array': '[97.44 98.16 98.77 99.25 99.62]', 'diff': 0.12449384, 'anomaly': False}, 'timeseries_x-value-LAST-COS-MINUTE-TIMESTAMP': {'input_value': -0.36180413, 'output_value': -0.32122365, 'diff': 0.04058048}, 'timeseries_x-value-LAST-SIN-MINUTE-TIMESTAMP': {'input_value': -1.370246, 'output_value': -1.4247555, 'diff': 0.0545094}}

Obtén predicciones en tiempo real

Obtén predicciones en tiempo real sobre los datos de series temporales en el tema de time-series de Pub/Sub. Para ello, ejecuta el siguiente comando. Reemplaza myProject por el ID del proyecto que usas a fin de completar este instructivo y reemplaza myBucket por el bucket de Cloud Storage que creaste.

python ml_pipeline_examples/sin_wave_example/inference/stream_inference.py --saved_model_location=gs://myBucket/serving_model_dir --tf_transform_graph_dir=gs://myBucket/tf_transform_graph_dir --pubsub_subscription=projects/myProject/subscriptions/time-series --runner=DataflowRunner --project=myProject --temp_location=gs://myBucket --region=us-central1 --setup_file=./setup.py

Para confirmar que los mensajes se hayan procesado, sigue estos pasos:

  1. Abre la página Trabajos de Dataflow.
  2. Haz clic en el trabajo en ejecución para ver el gráfico del trabajo.
  3. En el gráfico del trabajo, haz clic en el último paso del trabajo, ParDo(CallableWrapperDoFn).
  4. Revisa las métricas Colecciones de entrada y Actualidad de los datos de salida. Deberías ver resultados similares a los siguientes:

    Métricas de entrada y salida para la canalización de Dataflow que procesan los datos de series temporales.

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto, pero borra solo esos recursos.

De cualquier manera, debes quitar esos recursos para que no se te cobre por ellos en el futuro. En las siguientes secciones, se describe cómo borrar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra los componentes

Si no deseas borrar el proyecto, usa las siguientes secciones para borrar los componentes facturables de este instructivo.

Detenga el trabajo de Dataflow

  1. Abre la página Trabajos de Dataflow.
  2. En la lista de trabajos, haz clic en time-series.
  3. En la página de detalles del trabajo, haz clic en Detener.
  4. Selecciona Cancelar.
  5. Haz clic en Detener trabajo.

Borra el tema y la suscripción de Pub/Sub

  1. Abre la página Suscripciones de Pub/Sub.
  2. Selecciona la casilla de verificación de la suscripción a time-series.
  3. Haz clic en Borrar.
  4. En la ventana de superposición que aparece, haz clic en Borrar para confirmar que deseas borrar la suscripción y su contenido.
  5. Haz clic en Temas.
  6. Selecciona la casilla de verificación del tema time-series.
  7. Haz clic en Borrar.
  8. En la ventana de superposición que aparece, escribe delete y, luego, haz clic en Borrar.

Borra la instancia de VM de Compute Engine

  1. Abre la página Instancias de VM de Compute Engine
  2. Selecciona la casilla de verificación de la instancia de VM de time-series.
  3. Haz clic en Borrar.
  4. En la ventana de superposición que aparece, haz clic en Borrar para confirmar que deseas borrar la suscripción y su contenido.

Borra el bucket de Cloud Storage

  1. Abre el navegador de Cloud Storage.
  2. Selecciona la casilla de verificación del bucket time-series-<myProject>.
  3. Haz clic en Borrar.
  4. En la ventana de superposición que aparece, escribe DELETE y, luego, haz clic en Confirmar.

¿Qué sigue?