Usar varios modelos en las canalizaciones

Puedes usar la API RunInference para crear pipelines que contengan varios modelos. Las canalizaciones multimodelo son útiles para tareas como las pruebas A/B y la creación de conjuntos para resolver problemas empresariales que requieren más de un modelo de aprendizaje automático.

Usar varios modelos

En los siguientes ejemplos de código se muestra cómo usar la transformación RunInference para añadir varios modelos a tu canalización.

Cuando creas una canalización con varios modelos, puedes usar uno de estos dos patrones:

  • Patrón de ramificación A/B: una parte de los datos de entrada se envía a un modelo y el resto, a otro.
  • Patrón de secuencia: los datos de entrada atraviesan dos modelos, uno después del otro.

Patrón A/B

En el siguiente código se muestra cómo añadir un patrón A/B a tu canal con la transformación RunInference.

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = data | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_A y MODEL_HANDLER_B son el código de configuración del controlador del modelo.

En el siguiente diagrama se muestra este proceso de forma visual.

Diagrama que muestra el flujo de trabajo de varios modelos con el patrón A/B.

Patrón de secuencia

En el siguiente código se muestra cómo añadir un patrón de secuencia a tu canal con la transformación RunInference.

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('A_SOURCE')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_A y MODEL_HANDLER_B son el código de configuración del controlador del modelo.

En el siguiente diagrama se muestra este proceso de forma visual.

Diagrama que muestra el flujo de trabajo de varios modelos de patrones de secuencia.

Asignar modelos a claves

Puedes cargar varios modelos y asignarlos a claves mediante un controlador de modelos con claves. Al asignar modelos a claves, se pueden usar diferentes modelos en la misma transformación RunInference. En el siguiente ejemplo se usa un controlador de modelo con clave que carga un modelo mediante CONFIG_1 y un segundo modelo mediante CONFIG_2. La canalización usa el modelo asociado a CONFIG_1 para ejecutar la inferencia en ejemplos asociados a KEY_1. El modelo asociado a CONFIG_2 ejecuta inferencias en ejemplos asociados a KEY_2 y KEY_3.

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2))
])
with pipeline as p:
   data = p | beam.Create([
      ('KEY_1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

Para ver un ejemplo más detallado, consulta Ejecutar inferencia de AA con varios modelos entrenados de forma diferente.

Gestionar la memoria

Cuando cargas varios modelos al mismo tiempo, es posible que se produzcan errores por falta de memoria. Cuando usas un controlador de modelo con clave, Apache Beam no limita automáticamente el número de modelos cargados en la memoria. Si los modelos no caben en la memoria, se produce un error de falta de memoria y la canalización falla.

Para evitar este problema, usa el parámetro max_models_per_worker_hint para limitar el número de modelos que se cargan en la memoria al mismo tiempo. En el siguiente ejemplo se usa un controlador de modelo con clave y el parámetro max_models_per_worker_hint. Como el valor del parámetro max_models_per_worker_hint se ha definido como 2, la canalización carga un máximo de dos modelos en cada proceso de trabajador del SDK al mismo tiempo.

mhs = [
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2)),
  KeyModelMapping(['KEY_4'], PytorchModelHandlerTensor(CONFIG_3)),
  KeyModelMapping(['KEY_5', 'KEY_5', 'KEY_6'], PytorchModelHandlerTensor(CONFIG_4)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

Al diseñar tu flujo de trabajo, asegúrate de que los trabajadores tengan suficiente memoria para los modelos y las transformaciones del flujo. Como es posible que la memoria que usan los modelos no se libere inmediatamente, incluye un búfer de memoria adicional para evitar errores de falta de memoria.

Si tienes muchos modelos y usas un valor bajo con el parámetro max_models_per_worker_hint, es posible que se produzca un thrashing de memoria. El thrashing de memoria se produce cuando se usa un tiempo de ejecución excesivo para intercambiar modelos dentro y fuera de la memoria. Para evitar este problema, incluya una transformación GroupByKey en el flujo de trabajo antes del paso de inferencia. La transformación GroupByKey asegura que los elementos con la misma clave y modelo se encuentren en el mismo trabajador.

Más información