Usa più modelli nelle pipeline

Puoi utilizzare l'API RunInference per creare pipeline contenenti più modelli. Multi-modello le pipeline sono utili per attività come test A/B e creazione di insiemi risolvere problemi aziendali che richiedono più di un modello ML.

Utilizza più modelli

I seguenti esempi di codice mostrano come utilizzare la trasformazione RunInference per aggiungere più modelli alla pipeline.

Quando crei pipeline con più modelli, puoi utilizzare uno di due pattern:

  • Pattern ramo A/B: una parte dell'input i dati vengono inviati a un modello e il resto a un secondo modello.
  • Pattern di sequenza: i dati di input attraversano due modelli, uno dopo l'altro.

motivo A/B

Il codice seguente mostra come aggiungere un pattern A/B alla pipeline con Trasformazione RunInference.

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = data | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_A e MODEL_HANDLER_B sono il codice di configurazione del gestore del modello.

Il seguente diagramma fornisce una presentazione visiva di questo processo.

Un diagramma che mostra il flusso di lavoro multimodello del pattern A/B.

Pattern sequenza

Il seguente codice mostra come aggiungere un pattern di sequenza alla pipeline con la trasformazione RunInference.

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('A_SOURCE')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_A e MODEL_HANDLER_B sono il modello di configurazione del gestore.

Il seguente diagramma fornisce una presentazione visiva di questo processo.

Un diagramma che mostra il flusso di lavoro multimodello del pattern di sequenza.

Mappa i modelli alle chiavi

Puoi caricare più modelli e mapparli alle chiavi utilizzando una gestore del modello con chiave. La mappatura dei modelli alle chiavi consente di utilizzare diversi modelli nella stessa trasformazione RunInference. L'esempio seguente utilizza un gestore di modelli con chiave carica un modello utilizzando CONFIG_1 e un secondo modello utilizzando CONFIG_2. La pipeline utilizza il modello associato a CONFIG_1 per eseguire l'inferenza sugli esempi associati a KEY_1. Il modello associato a CONFIG_2 esegue l'inferenza sugli esempi associati a KEY_2 e KEY_3.

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2))
])
with pipeline as p:
   data = p | beam.Create([
      ('KEY_1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

Per un esempio più dettagliato, consulta Esegui l'inferenza ML con più modelli con addestramento diverso.

Gestisci la memoria

Quando carichi più modelli contemporaneamente, potresti riscontrare mancanza di memoria errori (OOM). Quando utilizzi un gestore del modello con chiave, Apache Beam non limita automaticamente il numero di modelli caricati in memoria. Quando non tutti i modelli rientrano in memoria, si verifica un errore di memoria insufficiente e la pipeline presenta errori.

Per evitare questo problema, utilizza il parametro max_models_per_worker_hint per limitare il numero di modelli caricati contemporaneamente in memoria. L'esempio seguente utilizza un gestore del modello con chiave con il parametro max_models_per_worker_hint. Poiché il valore del parametro max_models_per_worker_hint è impostato su 2, la pipeline carica contemporaneamente un massimo di due modelli su ogni processo di lavoro dell'SDK.

mhs = [
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2)),
  KeyModelMapping(['KEY_4'], PytorchModelHandlerTensor(CONFIG_3)),
  KeyModelMapping(['KEY_5', 'KEY_5', 'KEY_6'], PytorchModelHandlerTensor(CONFIG_4)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

Quando progetti la pipeline, assicurati che i worker abbiano memoria sufficiente sia per e le trasformazioni della pipeline. Poiché la memoria utilizzata dai modelli non venga rilasciato immediatamente, per evitare OOM, includi un buffer di memoria aggiuntivo.

Se hai molti modelli e utilizzi un valore basso con la max_models_per_worker_hint della memoria, potresti riscontrare il thrashing della memoria. Il thrashing della memoria si verifica quando viene utilizzato un tempo di esecuzione eccessivo per scambiare i modelli all'interno e all'esterno della memoria. Da evitare questo problema, includi un GroupByKey nella pipeline prima del passaggio di inferenza. La trasformazione GroupByKey assicura che gli elementi con la stessa chiave e lo stesso modello si trovino sullo stesso worker.

Scopri di più