您可以使用 RunInference
API 构建包含多个模型的流水线。多模型流水线非常适合 A/B 测试和构建集成学习等任务,可解决需要使用多个机器学习模型的业务问题。
使用多个模型
以下代码示例展示了如何使用 RunInference
转换向流水线添加多个模型。
构建包含多个模型的流水线时,您可以使用以下两种模式中的一种:
- A/B 分支模式:一部分输入数据用于一个模型,剩余数据用于另一个模型。
- 序列模式:输入数据依次遍历两个模型。
A/B 模式
以下代码展示了如何使用 RunInference
转换向流水线添加 A/B 模式。
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(MODEL_HANDLER_A)
model_b_predictions = data | RunInference(MODEL_HANDLER_B)
MODEL_HANDLER_A 和 MODEL_HANDLER_B 是模型处理程序的设置代码。
下图直观呈现了这一过程。
序列模式
以下代码展示了如何使用 RunInference
转换向流水线添加序列模式。
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('A_SOURCE')
model_a_predictions = data | RunInference(MODEL_HANDLER_A)
model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(MODEL_HANDLER_B)
MODEL_HANDLER_A 和 MODEL_HANDLER_B 是模型处理程序的设置代码。
下图直观呈现了这一过程。
将模型映射到键
您可以加载多个模型,并使用键控模型处理程序将它们映射到键。通过将模型映射到键,您可以在同一 RunInference
转换中使用不同的模型。以下示例使用一个键控模型处理程序,该处理程序使用 CONFIG_1 加载一个模型,并使用 CONFIG_2 加载第二个模型。流水线使用与 CONFIG_1 关联的模型对与 KEY_1 关联的示例运行推理。与 CONFIG_2 关联的模型会在与 KEY_2 和 KEY_3 关联的示例上运行推理。
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2))
])
with pipeline as p:
data = p | beam.Create([
('KEY_1', torch.tensor([[1,2,3],[4,5,6],...])),
('KEY_2', torch.tensor([[1,2,3],[4,5,6],...])),
('KEY_3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
如需查看更详细的示例,请参阅使用多个不同训练的模型运行机器学习推理。
管理内存
同时加载多个模型时,您可能会遇到内存不足错误 (OOM)。使用键控模型处理程序时,Apache Beam 不会自动限制加载到内存中的模型数量。当模型没有全部加载到内存中时,就会发生内存不足错误,并且流水线会失败。
为了避免此问题,请使用 max_models_per_worker_hint
参数来限制同时加载到内存中的模型数量。以下示例使用带有 max_models_per_worker_hint
参数的键控模型处理程序。由于 max_models_per_worker_hint
参数值设为 2
,因此流水线会在每个 SDK 工作器进程中同时加载最多两个模型。
mhs = [
KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2)),
KeyModelMapping(['KEY_4'], PytorchModelHandlerTensor(CONFIG_3)),
KeyModelMapping(['KEY_5', 'KEY_5', 'KEY_6'], PytorchModelHandlerTensor(CONFIG_4)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
设计流水线时,请确保工作器有足够的内存来处理模型和流水线转换。由于模型使用的内存可能不会立即释放,因此为了避免 OOM,请添加额外的内存缓冲区。
如果您有多个模型,并且使用 max_models_per_worker_hint
参数的较低值,则可能会遇到内存过度使用问题。当使用过多的执行时间将模型换入内存以及从内存中换出模型时,就会发生内存抖动。为避免此问题,请在推理步骤之前在流水线中添加 GroupByKey
转换。GroupByKey
转换可确保具有相同键和模型的元素位于同一 worker 上。
了解详情
- 请参阅 Apache Beam 文档中的多模型流水线部分。
- 使用多个不同训练的模型运行机器学习推理。
- 在 Colab 中运行交互式笔记本。