Compara los modelos de aprendizaje automático para predicciones en canalizaciones de Cloud Dataflow

Last reviewed 2018-09-07 UTC

En esta solución, se describen y comparan los diferentes enfoques de diseño para llamar a un modelo de aprendizaje automático durante una canalización de Dataflow y se examinan las ventajas y desventajas de cada enfoque. Presentamos los resultados de una serie de experimentos que ejecutamos para explorar diferentes enfoques e ilustrar estas ventajas y desventajas en canalizaciones de procesamiento de transmisión y por lotes. Esta solución está diseñada para personas que integran modelos entrenados en canalizaciones de procesamiento de datos, en lugar de científicos de datos que desean crear modelos de aprendizaje automático.

Introducción

Como responsable de integrar este modelo de AA en la canalización de Dataflow, puedes preguntarte cuáles son los distintos enfoques y cuál se adapta mejor a los requisitos del sistema. Debes prestar atención a las siguientes consideraciones:

  • Rendimiento
  • Latencia
  • Costo
  • Implementación
  • Mantenimiento

No siempre es fácil balancear estas consideraciones, pero esta solución puede ayudarte a transitar el proceso de toma de decisiones en función de tus prioridades. En la solución se comparan tres enfoques para realizar predicciones con un modelo de aprendizaje automático (AA) que entrena TensorFlow en canalizaciones de transmisión y por lotes:

  • Usar un modelo implementado como una API de REST o HTTP para las canalizaciones de transmisión
  • Usar trabajos de predicción por lotes de AI Platform para canalizaciones por lotes
  • Usar la predicción de modelos directos de Dataflow para canalizaciones por lotes y de transmisión

Todos los experimentos usan un modelo entrenado existente, llamado el conjunto de datos de natalidad, que predice el peso de los bebés según varias entradas. Debido a que el objetivo de esta solución no es crear un modelo, no se explica cómo se construye o entrena el modelo. Consulta la sección Pasos siguientes para obtener más detalles sobre el conjunto de datos de Natalidad.

Plataforma

Hay varias formas de ejecutar una canalización de datos y llamar a un modelo de AA entrenado. Sin embargo, los requisitos funcionales son siempre los mismos:

  1. Transferir datos de una fuente limitada (por lotes) o ilimitada (de transmisión). Algunos ejemplos de fuentes de las cuales se pueden transferir datos incluyen datos del sensor, interacciones con sitios web y transacciones financieras.
  2. Transformar y enriquecer los datos de entrada mediante llamadas a los modelos del AA para realizar predicciones. Un ejemplo es analizar un archivo JSON para extraer campos relevantes a fin de predecir una fecha de mantenimiento, recomendar un producto o detectar un fraude.
  3. Almacenar los datos transformados y las predicciones para realizar estadísticas o copias de seguridad, o pasar a un sistema de colas a fin de activar un nuevo evento o canalizaciones adicionales. Algunos ejemplos incluyen la detección de posibles fraudes en tiempo real o el almacenamiento de la información del programa de mantenimiento en un almacén al que se accede desde un panel.

Cuando transformas y enriqueces los datos con predicciones en un proceso de ETL por lotes, el objetivo es maximizar los rendimientos a fin de reducir el tiempo total necesario para todo el lote de datos. Por otro lado, cuando procesas datos de transmisión para predicciones en línea, el objetivo es minimizar la latencia a fin de recibir cada predicción en tiempo (casi) real. Por lo tanto, llamar al modelo podría convertirse en un cuello de botella.

Componentes centrales

Los experimentos por lotes y de transmisión en esta solución usan tres tecnologías principales:

  • Apache Beam, que se ejecuta en Dataflow para procesar los datos
  • TensorFlow para implementar y entrenar el modelo de AA
  • Para algunos experimentos, AI Platform como plataforma de hosting para los modelos de AA entrenados a fin de realizar predicciones en línea y por lotes

Elegimos Apache Beam en ejecución en Dataflow para ejecutar canalizaciones de datos en esta solución debido a lo siguiente:

  • Apache Beam es un modelo de programación unificado de código abierto que ejecuta trabajos de procesamiento de datos por lotes y de transmisión.
  • Dataflow es un producto de Google Cloud que puede ejecutar trabajos de Apache Beam sin un servidor.

TensorFlow es una biblioteca matemática de código abierto de Google que se usa como framework de aprendizaje automático. TensorFlow permite crear, entrenar y entregar modelos en una sola máquina o en entornos distribuidos. Los modelos se pueden usar en varios dispositivos y también pueden aprovechar los recursos de CPU, GPU o TPU disponibles para el entrenamiento y la entrega.

AI Platform es una plataforma sin servidores que puede entrenar, ajustar (mediante la función de ajuste de hiperparámetros) y entregar modelos de TensorFlow a gran escala con una administración mínima requerida por DevOps. AI Platform admite la implementación de modelos entrenados como las API de REST para predicciones en línea, así como el envío de trabajos de predicción por lotes. AI Platform es una de las varias opciones que pueden entregar el modelo como un microservicio.

En los enfoques detallados en esta solución, se usa Dataflow para la canalización de procesamiento de datos y AI Platform a fin de alojar los modelos como extremos HTTP. Sin embargo, estos enfoques pueden ser reemplazados por otras tecnologías. Las comparaciones de rendimiento entre HTTP y un modelo directo de TensorFlow no cambian mucho.

Procesa datos por lotes y datos de transmisión

En los experimentos de esta solución, se incluyen casos prácticos de procesamiento por lotes y de transmisión. En cada experimento, se aprovechan diferentes productos de Google Cloud para la entrada y la salida porque las fuentes no delimitadas y limitadas tienen requisitos operativos diferentes.

Procesamiento por lotes de un conjunto de datos limitado

La Figura 1 muestra que, en las canalizaciones de procesamiento por lotes típicas, los datos de entrada sin procesar se almacenan en un almacenamiento de objetos, como Cloud Storage. Los formatos de almacenamiento de datos estructurados incluyen valores separados por comas (CSV), columnas de filas optimizadas (ORC), Parquet o Avro. Estos formatos suelen usarse cuando los datos provienen de bases de datos o registros.

Arquitectura de canalizaciones de procesamiento por lotes típicas
Figura 1. Arquitectura de procesamiento por lotes

Algunas plataformas analíticas como BigQuery proporcionan almacenamiento además de funciones de consulta. BigQuery usa Capacitor para el almacenamiento. Apache Beam en Dataflow puede leer y escribir en BigQuery y Cloud Storage, además de otras opciones de almacenamiento en canalizaciones de procesamiento por lotes.

Procesamiento de transmisión de un flujo de datos ilimitado

Para la transmisión, las entradas a una canalización de procesamiento de datos suelen ser un sistema de mensajería, como se muestra en la Figura 2. Las tecnologías como Pub/Sub o Kafka suelen usarse para transferir datos individuales en formato JSON, CSV o protobuf.

Arquitectura de canalizaciones de procesamiento de transmisión típicas
Figura 2. Arquitectura de procesamiento de transmisión

Los datos se pueden procesar de forma individual o como grupos en microlotes mediante funciones analíticas para realizar el procesamiento de eventos temporales. Los datos procesados pueden ir a varios destinos, incluidos los siguientes:

  1. BigQuery para estadísticas ad hoc, a través de las API de transmisión
  2. Bigtable para entregar información en tiempo real
  3. Temas de Pub/Sub para activar procesos o canalizaciones posteriores

Puedes encontrar una lista completa de conectores de origen (entrada) y conectores de recepción (salida) para los receptores de fuente de datos ilimitados y limitados en la página de E/S de Apache Beam.

Invoca un modelo de TensorFlow

Un modelo que entrena TensorFlow puede invocarse de tres maneras:

  1. A través de un extremo HTTP para la predicción en línea
  2. Directamente, mediante el archivo de modelo guardado para predicciones en línea y por lotes
  3. A través de un trabajo de predicción por lotes de AI Platform para la predicción por lotes

Extremos HTTP para la predicción en línea

Los modelos de TensorFlow se implementan como extremos HTTP que se invocan y dan predicciones en tiempo real, ya sea a través de una canalización de procesamiento de datos de transmisión o de apps cliente.

Puedes implementar un modelo de TensorFlow como un extremo HTTP para las predicciones en línea mediante TensorFlow Serving o cualquier otro servicio de hosting, como Seldon. Como se muestra en la Figura 3, puedes elegir una de las siguientes opciones:

  1. Implementa el modelo tú mismo en una o más instancias de Compute Engine.
  2. Usa una imagen de Docker en Compute Engine o Google Kubernetes Engine.
  3. Aprovecha Kubeflow para facilitar la implementación en Kubernetes o Google Kubernetes Engine.
  4. Usa App Engine con Endpoints para alojar el modelo en una app web.
  5. Usa AI Platform, el servicio de entrenamiento y entrega de AA completamente administrado en Google Cloud.
Opciones en Dataflow para entregar un modelo como un extremo HTTP
Figura 3. Diferentes opciones en Dataflow para entregar un modelo como un extremo HTTP

AI Platform es un servicio completamente administrado, por lo que es más fácil de implementar que las otras opciones. Por lo tanto, en nuestros experimentos lo usamos como la opción para entregar el modelo como un extremo HTTP. Luego, podemos enfocarnos en el rendimiento de un modelo directo en comparación con un extremo HTTP en AI Platform, en lugar de comparar las diferentes opciones de entrega de modelos de HTTP.

Entrega predicciones en línea mediante AI Platform Prediction

Se requieren dos tareas para entregar predicciones en línea:

  1. Implementar un modelo
  2. Interactuar con el modelo implementado para la inferencia (es decir, realizar predicciones)

Para implementar un modelo como un extremo HTTP mediante AI Platform Prediction, se requieren los siguientes pasos:

  1. Asegúrate de que los archivos del modelo entrenado estén disponibles en Cloud Storage.
  2. Crea un modelo con el comando gcloud ml-engine models create.
  3. Implementa una versión del modelo mediante el comando gcloud ml-engine versions create, con los archivos del modelo en Cloud Storage.

Puedes implementar un modelo con comandos como los siguientes:


PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

Mediante el código, se crea un modelo de AI Platform Prediction llamado babyweight_estimator en el proyecto de Google Cloud, con la versión del modelo v1.

Una vez implementado el modelo, puedes invocarlo. Con el siguiente código de Python, se muestra una forma de invocar una versión del modelo en AI Platform Prediction como una API de REST:

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

Si tienes un gran conjunto de datos disponible en BigQuery, Cloud Storage o similares, y deseas maximizar el rendimiento del proceso en general, no se recomienda entregar tu modelo de AA como un extremo HTTP para la predicción por lotes. Hacer esto genera una solicitud HTTP para cada uno de los datos, lo que da como resultado un gran volumen de solicitudes HTTP. En la siguiente sección, se presentan mejores opciones para la predicción por lotes.

Modelo directo para predicciones por lotes y en línea

La técnica de predicción de modelos directos aprovecha un SavedModel de TensorFlow local en las instancias de Dataflow. El modelo guardado es una copia de los archivos de salida creados después de terminar la creación y el entrenamiento del modelo de TensorFlow. En cuanto al SavedModel de TensorFlow, ten en cuenta esto:

  • Puede ser parte del código fuente de la canalización que se envía como un trabajo de Dataflow.
  • Se puede descargar de Cloud Storage, como se muestra en la Figura 4.
Predicción de modelo directo en Dataflow
Figura 4. Predicción de modelo directo en Dataflow

En esta solución, usamos un SavedModel que forma parte del código fuente en GitHub. Para cargar un modelo en las instancias, haz lo siguiente:

  1. Cuando crees el trabajo de Dataflow, especifica las dependencias de la canalización que se cargarán, incluido el archivo del modelo. Mediante el siguiente código de Python, se muestra el archivo setup.py que incluye los archivos del modelo que se enviarán con el trabajo de Dataflow.

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. Llama a los archivos de modelo locales durante la canalización. Esto produce la predicción para las instancias dadas. El siguiente código de Python muestra cómo hacer esto.

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

Consulta la página sobre dependencias de varios archivos de Apache Beam para obtener más detalles.

Trabajo de predicción por lotes de AI Platform

Además de implementar el modelo como un extremo HTTP, AI Platform te permite ejecutar un trabajo de predicción por lotes mediante una versión del modelo implementado o un SavedModel de TensorFlow en Cloud Storage.

Un trabajo de predicción por lotes de AI Platform toma la ubicación de Cloud Storage de los archivos de datos de entrada como parámetro. El trabajo usa el modelo para obtener predicciones sobre esos datos y, luego, almacena los resultados de la predicción en otra ubicación de salida de Cloud Storage que también se proporciona como parámetro. En el siguiente ejemplo, se muestran los comandos de gcloud que envían un trabajo de predicción por lotes de AI Platform.

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

Comparación entre las predicciones en línea dato por dato y por microlotes

En las canalizaciones de predicción en tiempo real, ya sea que el modelo se entregue como un extremo HTTP o se use de forma directa desde los trabajadores, tienes dos opciones para obtener predicciones sobre los datos entrantes:

  • Dato individual. La opción obvia es enviar cada dato al modelo de forma individual y obtener una predicción.
  • Microlotes. Una opción más optimizada es usar una función analítica para crear microlotes que agrupen datos dentro de un período específico, por ejemplo, cada 5 segundos. El microlote se envía al modelo a fin de obtener predicciones para todas las instancias a la vez.

El siguiente código de Python muestra cómo crear microlotes basados en el tiempo mediante una función analítica en una canalización de Apache Beam.

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

El enfoque de microlotes usa modelos implementados como extremos HTTP, lo que reduce mucho el número de solicitudes HTTP y la latencia. Incluso cuando la técnica de microlotes se usa con el modelo directo, enviar al modelo un tensor con N instancias para la predicción es más eficiente que enviar un tensor con una longitud de 1 debido a las operaciones vectorizadas.

Experimentos por lotes

En los experimentos por lotes, queremos estimar los pesos de los bebés en el conjunto de datos de natalidad en BigQuery mediante un modelo de regresión de TensorFlow. Luego, queremos guardar los resultados de la predicción en Cloud Storage como archivos CSV mediante una canalización por lotes de Dataflow. En la siguiente sección, se describen diferentes experimentos que probamos para realizar esta tarea.

Enfoque 1: Dataflow con predicción de modelo directo

En este enfoque, los trabajadores de Dataflow alojan el SavedModel de TensorFlow, al que se llama directamente para la predicción durante la canalización de procesamiento por lotes para cada registro. En la Figura 5, se muestra la arquitectura de alto nivel de este enfoque.

Enfoque de predicción por lotes 1: Dataflow con predicción de modelo directo
Figura 5. Enfoque de predicción por lotes 1: Dataflow con predicción de modelo directo

La canalización de Dataflow realiza los siguientes pasos:

  1. Lee datos desde BigQuery.
  2. Prepara el registro de BigQuery para la predicción.
  3. Llama al SavedModel de TensorFlow local a fin de obtener una predicción para cada registro.
  4. Convierte el resultado (registro de entrada y peso estimado del bebé) en un archivo CSV.
  5. Escribe el archivo CSV en Cloud Storage.

En este enfoque, no hay llamadas a servicios remotos, por ejemplo, un modelo implementado en AI Platform como un extremo HTTP. La predicción se realiza de forma local dentro de cada trabajador de Dataflow mediante el SavedModel de TensorFlow.

Enfoque 2: Dataflow con la predicción por lotes de AI Platform

En este enfoque, el SavedModel de TensorFlow se almacena en Cloud Storage, y AI Platform lo usa para la predicción. Sin embargo, en lugar de hacer una llamada a la API dirigida al modelo implementado para cada registro, como en el enfoque anterior, los datos se preparan a fin de realizar la predicción y se envían como un lote.

Este enfoque tiene dos fases:

  1. Dataflow prepara los datos de BigQuery para la predicción y, luego, los almacena en Cloud Storage.
  2. El trabajo de predicción por lotes de AI Platform se envía con los datos preparados y los resultados de la predicción se almacenan en Cloud Storage.

En la Figura 6, se muestra la arquitectura general de este enfoque de dos fases.

Enfoque de predicción por lotes 2: Dataflow con la predicción por lotes de AI Platform
Figura 6. Enfoque de predicción por lotes 2: Dataflow con la predicción por lotes de AI Platform

Los pasos del flujo de trabajo, incluida la canalización de Dataflow, son los siguientes:

  1. Lee datos desde BigQuery.
  2. Preparar el registro de BigQuery para la predicción
  3. Escribe datos JSON en Cloud Storage. La función serving_fn en el modelo espera instancias JSON como entrada.
  4. Envía un trabajo de predicción por lotes de AI Platform con los datos preparados en Cloud Storage. El trabajo también escribe los resultados de la predicción en Cloud Storage.

El trabajo de Dataflow prepara los datos para la predicción en lugar de enviar el trabajo de predicción de AI Platform. En otras palabras, las tareas de preparación de datos y de predicción por lotes no están estrechamente vinculadas. Cloud Functions, Airflow o cualquier programador pueden organizar el flujo de trabajo si ejecutan el trabajo de Dataflow y, luego, envían el trabajo de AI Platform para la predicción por lotes.

Se recomienda la predicción por lotes de AI Platform para el rendimiento y la facilidad de uso si tus datos cumplen con los siguientes criterios:

  • Los datos están disponibles en Cloud Storage, en el formato esperado para la predicción, a partir de un proceso de transferencia de datos anterior.
  • No tienes el control de la primera fase del flujo de trabajo, como la canalización de Dataflow que prepara los datos en Cloud Storage para la predicción.

Configuración de los experimentos

Usamos los siguientes parámetros de configuración en tres experimentos:

  • Tamaños de datos: filas de 10K100K, 1M y 10M
  • Clase de Cloud Storage: Regional Storage
  • Ubicación de Cloud Storage: europe-west1-b
  • Región de Dataflow: europe-west1-b
  • Tipo de máquina de trabajador de Dataflow: n1-standard-1
  • Ajuste de escala automático de Dataflow para datos por lotes de hasta 1 millón de registros
  • Dataflow num_worker: 20 para datos por lotes de hasta 10 millones de registros
  • Configuración max-worker-count de predicción por lotes de AI Platform: 20

La ubicación de Cloud Storage y la región de Dataflow deben ser las mismas. En esta solución, se usa la región europe-west1-b como un valor arbitrario.

Resultados

En la siguiente tabla, se resumen los resultados (tiempos) de realizar las predicciones por lotes y las predicciones de modelo directo con conjuntos de datos de diferentes tamaños.

Tamaño de datos por lotes Métrica Dataflow y, luego, predicción por lotes de AI Platform Dataflow con predicción de modelo directo
Filas de 10,000 Duración de la ejecución 15 min 30 s

(Dataflow: 7 min 47 s + 
AI Platform: 7 min 43 s)
8 min 24 s
Tiempo total de CPU virtuales 0.301 h

(Dataflow: 0.151 h + 
AI Platform: 0.15 h)
0.173 h
Filas de 100,000 Duración de la ejecución 16 min 37 s

(Dataflow: 8 min 39 s + 
AI Platform: 7 min 58 s)
10 min 31 s
Tiempo total de CPU virtuales 0.334 h 

(Dataflow: 0.184 h + 
AI Platform: 0.15 h)
0.243 h
Filas de 1,000,000 Duración de la ejecución 21 min 11 s
(Dataflow: 11 min 07 s + 
AI Platform: 10 min 04 s)
17 min 12 s
Tiempo total de CPU virtuales 0.446 h

(Dataflow: 0.256 h + 
AI Platform: 0.19 h)
1.115 h
Filas de 10,000,000 Duración de la ejecución 33 min 8 s
(Dataflow: 12 min 15 s + 
AI Platform: 20 min 53 s)
25 min 2 s
Tiempo total de CPU virtuales 5.251 h

(Dataflow: 3.581 h +
AI Platform: 1.67 h)
7.878 h

En la Figura 7, se muestra un gráfico de estos resultados.

Gráfico en el que se muestran los tiempos de 3 enfoques para 4 tamaños de conjuntos de datos diferentes
Figura 7. Gráfico en el que se muestran los tiempos de 3 enfoques para 4 tamaños de conjuntos de datos diferentes

Como muestran los resultados, el trabajo de predicción por lotes de AI Platform tarda menos en generar predicciones para los datos de entrada, ya que los datos ya están en Cloud Storage en el formato usado para la predicción. Sin embargo, cuando el trabajo de predicción por lotes se combina con un paso de procesamiento previo (extracción y preparación de los datos de BigQuery a Cloud Storage para la predicción) y un paso de procesamiento posterior (almacenamiento de los datos de nuevo en BigQuery), el enfoque de modelo directo produce mejores tiempos de ejecución de extremo a extremo. Además, el rendimiento del enfoque de predicción de modelo directo puede optimizarse aún más mediante microlotes (que veremos más adelante en los experimentos de transmisión).

Experimentos de transmisión

En los experimentos de transmisión, la canalización de Dataflow lee los datos de un tema de Pub/Sub y los escribe en BigQuery mediante las API de transmisión. La canalización de transmisión de Dataflow procesa los datos y obtiene predicciones mediante el modelo de estimación de peso de bebés de TensorFlow.

El tema recibe datos de un simulador de transmisión que genera datos (cada uno de los cuales es una instancia de estimación del peso del bebé) a una velocidad predefinida de eventos por segundo. Esto simula un ejemplo del mundo real de una fuente de datos ilimitada. El siguiente código de Python simula la transmisión de datos enviada a un tema de Pub/Sub.

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

Enfoque 1: Dataflow con la predicción en línea de AI Platform

En este enfoque, el modelo de TensorFlow se implementa y se aloja en AI Platform como una API de REST. La canalización de transmisión de Dataflow llama a la API por cada mensaje consumido desde Pub/Sub para obtener predicciones. La arquitectura de alto nivel de este enfoque se muestra en la Figura 8.

Enfoque de transmisión 1: Dataflow con la predicción en línea de AI Platform
Figura 8. Enfoque de transmisión 1: Dataflow con la predicción en línea de AI Platform La solicitud HTTP puede incluir un único dato o un grupo de datos en un microlote.

En este enfoque, la canalización de Dataflow realiza los siguientes pasos:

  1. Lee mensajes de un tema de Pub/Sub.
  2. Envía una solicitud HTTP a la API del modelo de AI Platform para obtener predicciones de cada mensaje.
  3. Escribe los resultados a BigQuery mediante las API de transmisión.

Los microlotes son un enfoque mejor. Es decir, en lugar de enviar una solicitud HTTP a la API de REST del modelo por cada mensaje leído desde Pub/Sub, Dataflow agrupa los mensajes recibidos durante un período de 1 segundo. A continuación, envía este grupo de mensajes como un microlote en una sola solicitud HTTP a la API del modelo. En este enfoque, la canalización de Dataflow realiza los siguientes pasos:

  1. Lee mensajes del tema de Pub/Sub.
  2. Aplica una operación de 1 segundo para crear un microlote de mensajes.
  3. Envía una solicitud HTTP con el microlote a la API del modelo de AI Platform para obtener predicciones de los mensajes.
  4. Escribe los resultados a BigQuery mediante las API de transmisión.

La lógica tras este enfoque es la siguiente:

  1. Reduce la cantidad de llamadas al servicio remoto, como el modelo de AI Platform.
  2. Se reduce la latencia promedio de entrega de cada mensaje.
  3. Se reduce el tiempo total de procesamiento de la canalización.

En este experimento, el período se estableció en 1 segundo. Sin embargo, el tamaño de los microlotes, que es la cantidad de mensajes enviados como un lote al modo de AI Platform, varía. El tamaño de los microlotes depende de la frecuencia de generación de mensajes, es decir, la cantidad de mensajes por segundo.

En la siguiente sección, se describen experimentos con tres frecuencias diferentes: 50, 100 y 500 mensajes por segundo. Es decir, el tamaño de microlotes es 50, 100 y 500.

Enfoque 2: Dataflow con la predicción de modelo directo

Este enfoque es similar al que se usó en los experimentos por lotes. El SavedModel de TensorFlow se aloja en los trabajadores de Dataflow y se lo llama para la predicción durante la canalización de procesamiento de transmisión para cada registro. En la Figura 9, se muestra la arquitectura de alto nivel de este enfoque.

Enfoque de transmisión 2: Dataflow con la predicción de modelo directo
Figura 9. Enfoque de transmisión 2: Dataflow con la predicción de modelo directo

En este enfoque, la canalización de Dataflow realiza los siguientes pasos:

  1. Lee mensajes del tema de Pub/Sub.
  2. Llama al SavedModel de TensorFlow local para obtener predicciones de cada registro.
  3. Escribe los resultados a BigQuery mediante las API de transmisión.

La técnica de microlotes también se puede usar en la canalización de transmisión con el enfoque de predicción de modelo directo. En lugar de enviar un tensor de una instancia de datos al modelo, podemos enviar un tensor de N instancias, donde N es igual a los mensajes recibidos dentro del período de Dataflow al modelo. Esta técnica usa las operaciones vectorizadas del modelo de TensorFlow y obtiene varias predicciones en paralelo.

Configuración de los experimentos

Usamos los siguientes parámetros de configuración para estos experimentos:

  • Tamaño de datos de transmisión: 10K records (messages)
  • Mensajes simulados por segundo (MPS): 50, 100 y 500
  • Duración del período (en experimentos por lotes): 1 second
  • Región de Dataflow: europe-west1-b
  • Tipo de máquina de trabajador de Dataflow: n1-standard-1
  • num_worker de Dataflow: 5 (sin ajuste de escala automático)
  • Nodos de la API del modelo de AI Platform: 3 (manualScale)

Resultados

En la siguiente tabla, se resumen los resultados de los experimentos de transmisión con diferentes volúmenes de datos (mensajes por segundo). La frecuencia de mensajes se refiere al número de mensajes enviados por segundo y el tiempo de simulación se refiere a la duración total del envío de todos los mensajes.

Frecuencia de transmisión de mensajes Métrica Dataflow con la predicción en línea de AI Platform   Dataflow con la predicción de modelo directo  
    Mensaje individual Microlotes Mensaje individual Microlotes
50 mensajes por segundo

(Tiempo de simulación: 3 min 20 s)
Tiempo total 9 min 34 s 7 min 44 s 3 min 43 s 3 min 22 s
100 mensajes por segundo

(Tiempo de simulación : 1 min 40 s)
Tiempo total 6 min 03 s 4 min 34 s 1 min 51 s 1 min 41 s
500 mensajes por segundo

(Tiempo de simulación : 20 s)
Tiempo total NA: Cuota de predicción en línea predeterminada de AI Platform 2 min 47 s 1 min 23 s 48 s

En la Figura 10, se muestra un gráfico de estos resultados.

Gráfico en el que se muestran los tiempos para diferentes enfoques y frecuencias
Figura 10. Gráfico en el que se muestran los tiempos para diferentes enfoques y frecuencias

Como se muestra en los resultados, la técnica de microlotes mejora el rendimiento de ejecución en la predicción en línea de AI Platform y en la predicción de modelo directo. Además, el uso del modelo directo con la canalización de transmisión muestra un rendimiento de 2 a 4 veces mejor en comparación con la llamada a una API de REST o HTTP externa para la predicción en línea.

Conclusiones

Según los enfoques descritos y los resultados de los experimentos, recomendamos los siguientes enfoques.

Procesamiento por lotes

  • Si construyes tu canalización de procesamiento de datos por lotes y deseas que la predicción sea parte de la canalización, usa el enfoque de modelo directo para obtener el mejor rendimiento.
  • Mejora el rendimiento del enfoque de modelo directo mediante la creación de microlotes de datos antes de llamar al modelo local para realizar una predicción, a fin de usar la paralelización de las operaciones vectorizadas.
  • Si los datos se propagan a Cloud Storage en el formato esperado para la predicción, usa la predicción por lotes de AI Platform a fin de obtener el mejor rendimiento.
  • Usa AI Platform si deseas usar la potencia de las GPU para la predicción por lotes.
  • No uses la predicción en línea de AI Platform para la predicción por lotes.

Procesamiento de transmisión

  • Usa el modelo directo en la canalización de transmisión para obtener el mejor rendimiento y reducir la latencia promedio. Las predicciones se realizan de forma local, sin llamadas HTTP a servicios remotos.
  • Desacopla tu modelo de las canalizaciones de procesamiento de datos para lograr una mejor capacidad de mantenimiento de los modelos que se usan en las predicciones en línea. El mejor enfoque es entregar el modelo como un microservicio independiente mediante el uso de AI Platform o cualquier otro servicio de hosting web.
  • Implementa el modelo como un servicio web independiente para permitir que múltiples canalizaciones de procesamiento de datos y apps en línea consuman el servicio del modelo como un extremo. Además, los cambios en el modelo son transparentes para las apps y las canalizaciones que lo consumen.
  • Implementa varias instancias del servicio con balanceo de cargas para mejorar la escalabilidad y la disponibilidad del servicio web del modelo. Con AI Platform, solo debes especificar la cantidad de nodos (manualScaling) o minNodes (autoScaling) en el archivo de configuración yaml cuando implementas una versión del modelo.
  • Si implementas tu modelo en un microservicio separado, hay costos adicionales, según la infraestructura de entrega subyacente. Consulta las Preguntas frecuentes sobre los precios de la predicción en línea de AI Platform.
  • Usa microlotes en la canalización de procesamiento de datos de transmisión para obtener un mejor rendimiento con el modelo directo y el servicio de modelo de HTTP. Los microlotes reducen la cantidad de solicitudes HTTP al servicio del modelo y usan las operaciones vectorizadas del modelo de TensorFlow para obtener predicciones.

Pasos siguientes