Puoi utilizzare l'API RunInference
per creare pipeline contenenti più modelli. Multi-modello
le pipeline sono utili per attività come test A/B e creazione di insiemi
risolvere problemi aziendali che richiedono più di un modello ML.
Utilizza più modelli
I seguenti esempi di codice mostrano come utilizzare la trasformazione RunInference
per aggiungere più modelli alla pipeline.
Quando crei pipeline con più modelli, puoi utilizzare uno di due pattern:
- Pattern ramo A/B: una parte dell'input i dati vengono inviati a un modello e il resto a un secondo modello.
- Pattern di sequenza: i dati di input attraversano due modelli, uno dopo l'altro.
motivo A/B
Il codice seguente mostra come aggiungere un pattern A/B alla pipeline con
Trasformazione RunInference
.
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(MODEL_HANDLER_A)
model_b_predictions = data | RunInference(MODEL_HANDLER_B)
MODEL_HANDLER_A e MODEL_HANDLER_B sono il codice di configurazione del gestore del modello.
Il seguente diagramma fornisce una presentazione visiva di questo processo.
Pattern sequenza
Il seguente codice mostra come aggiungere un pattern di sequenza alla pipeline con la trasformazione RunInference
.
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('A_SOURCE')
model_a_predictions = data | RunInference(MODEL_HANDLER_A)
model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(MODEL_HANDLER_B)
MODEL_HANDLER_A e MODEL_HANDLER_B sono il modello di configurazione del gestore.
Il seguente diagramma fornisce una presentazione visiva di questo processo.
Mappa i modelli alle chiavi
Puoi caricare più modelli e mapparli alle chiavi utilizzando una
gestore del modello con chiave.
La mappatura dei modelli alle chiavi consente di utilizzare diversi modelli nella stessa trasformazione RunInference
.
L'esempio seguente utilizza un gestore di modelli con chiave
carica un modello utilizzando CONFIG_1 e un secondo modello utilizzando CONFIG_2.
La pipeline utilizza il modello associato a CONFIG_1 per eseguire l'inferenza sugli esempi associati a KEY_1.
Il modello associato a CONFIG_2 esegue l'inferenza sugli esempi associati a KEY_2 e KEY_3.
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2))
])
with pipeline as p:
data = p | beam.Create([
('KEY_1', torch.tensor([[1,2,3],[4,5,6],...])),
('KEY_2', torch.tensor([[1,2,3],[4,5,6],...])),
('KEY_3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
Per un esempio più dettagliato, consulta Esegui l'inferenza ML con più modelli con addestramento diverso.
Gestisci la memoria
Quando carichi più modelli contemporaneamente, potresti riscontrare mancanza di memoria errori (OOM). Quando utilizzi un gestore del modello con chiave, Apache Beam non limita automaticamente il numero di modelli caricati in memoria. Quando non tutti i modelli rientrano in memoria, si verifica un errore di memoria insufficiente e la pipeline presenta errori.
Per evitare questo problema, utilizza il parametro max_models_per_worker_hint
per limitare il numero di modelli caricati contemporaneamente in memoria. L'esempio seguente utilizza un gestore del modello con chiave con il parametro max_models_per_worker_hint
.
Poiché il valore del parametro max_models_per_worker_hint
è impostato su 2
,
la pipeline carica contemporaneamente un massimo di due modelli su ogni processo di lavoro dell'SDK.
mhs = [
KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2)),
KeyModelMapping(['KEY_4'], PytorchModelHandlerTensor(CONFIG_3)),
KeyModelMapping(['KEY_5', 'KEY_5', 'KEY_6'], PytorchModelHandlerTensor(CONFIG_4)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
Quando progetti la pipeline, assicurati che i worker abbiano memoria sufficiente sia per e le trasformazioni della pipeline. Poiché la memoria utilizzata dai modelli non venga rilasciato immediatamente, per evitare OOM, includi un buffer di memoria aggiuntivo.
Se hai molti modelli e utilizzi un valore basso con la max_models_per_worker_hint
della memoria, potresti riscontrare il thrashing della memoria. Il thrashing della memoria si verifica quando viene utilizzato un tempo di esecuzione eccessivo per scambiare i modelli all'interno e all'esterno della memoria. Da evitare
questo problema, includi un
GroupByKey
nella pipeline prima del passaggio di inferenza. La trasformazione GroupByKey
assicura che gli elementi con la stessa chiave e lo stesso modello si trovino sullo stesso worker.
Scopri di più
- Informazioni Pipeline multi-modello nella documentazione di Apache Beam
- Esegui l'inferenza ML con più modelli addestrati in modo diverso.
- Esegui un blocco note interattivo in Colab.