在流水线中使用多个模型

您可以使用 RunInference API 构建包含多个模型的流水线。多模型流水线非常适合 A/B 测试和构建集成学习等任务,可解决需要使用多个机器学习模型的业务问题。

使用多个模型

以下代码示例展示了如何使用 RunInference 转换向流水线添加多个模型。

构建包含多个模型的流水线时,您可以使用以下两种模式中的一种:

  • A/B 分支模式:一部分输入数据用于一个模型,剩余数据用于另一个模型。
  • 序列模式:输入数据依次遍历两个模型。

A/B 模式

以下代码展示了如何使用 RunInference 转换向流水线添加 A/B 模式。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = data | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_AMODEL_HANDLER_B 是模型处理程序的设置代码。

下图直观呈现了这一过程。

展示 A/B 模式多模型工作流的图表。

序列模式

以下代码展示了如何使用 RunInference 转换向流水线添加序列模式。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('A_SOURCE')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_AMODEL_HANDLER_B 是模型处理程序的设置代码。

下图直观呈现了这一过程。

展示序列模式多模型工作流的图表。

将模型映射到键

您可以加载多个模型,并使用键控模型处理程序将它们映射到键。通过将模型映射到键,您可以在同一 RunInference 转换中使用不同的模型。以下示例使用一个键控模型处理程序,该处理程序使用 CONFIG_1 加载一个模型,并使用 CONFIG_2 加载第二个模型。流水线使用与 CONFIG_1 关联的模型对与 KEY_1 关联的示例运行推理。与 CONFIG_2 关联的模型会在与 KEY_2KEY_3 关联的示例上运行推理。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2))
])
with pipeline as p:
   data = p | beam.Create([
      ('KEY_1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

如需查看更详细的示例,请参阅使用多个不同训练的模型运行机器学习推理

管理内存

同时加载多个模型时,您可能会遇到内存不足错误 (OOM)。使用键控模型处理程序时,Apache Beam 不会自动限制加载到内存中的模型数量。当模型没有全部加载到内存中时,就会发生内存不足错误,并且流水线会失败。

为了避免此问题,请使用 max_models_per_worker_hint 参数来限制同时加载到内存中的模型数量。以下示例使用带有 max_models_per_worker_hint 参数的键控模型处理程序。由于 max_models_per_worker_hint 参数值设为 2,因此流水线会在每个 SDK 工作器进程中同时加载最多两个模型。

mhs = [
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2)),
  KeyModelMapping(['KEY_4'], PytorchModelHandlerTensor(CONFIG_3)),
  KeyModelMapping(['KEY_5', 'KEY_5', 'KEY_6'], PytorchModelHandlerTensor(CONFIG_4)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

设计流水线时,请确保工作器有足够的内存来处理模型和流水线转换。由于模型使用的内存可能不会立即释放,因此为了避免 OOM,请添加额外的内存缓冲区。

如果您有多个模型,并且使用 max_models_per_worker_hint 参数的较低值,则可能会遇到内存过度使用问题。当使用过多的执行时间将模型换入内存以及从内存中换出模型时,就会发生内存抖动。为避免此问题,请在推理步骤之前在流水线中添加 GroupByKey 转换。GroupByKey 转换可确保具有相同键和模型的元素位于同一 worker 上。

了解详情