Dataflow ML 简介

您可以将 Dataflow ML 的规模数据处理功能用于预测和推断流水线,以及为训练准备数据

Dataflow ML 工作流图。

图 1:完整的 Dataflow ML 工作流。

要求和限制

  • Dataflow ML 支持批处理和流处理流水线。
  • Apache Beam 2.40.0 及更高版本支持 RunInference API。
  • Apache Beam 2.53.0 及更高版本支持 MLTransform API。
  • 模型处理程序适用于 PyTorch、scikit-learn、TensorFlow、ONNX 和 TensorRT。对于不受支持的框架,您可以使用自定义模型处理程序。

为训练准备数据

预测和推断流水线

Dataflow ML 结合了 Dataflow 和 Apache Beam 的 RunInference API 的强大功能。借助 RunInference API,您可以定义模型的特征和属性,并将该配置传递给 RunInference 转换。此功能可让用户在其 Dataflow 流水线中运行模型,而无需了解模型的实现细节。您可以选择最适合您数据的框架,例如 TensorFlow 和 PyTorch。

在流水线中运行多个模型

使用 RunInference 转换向 Dataflow 流水线添加多个推理模型。如需了解详情(包括代码详情),请参阅 Apache Beam 文档中的多模型流水线

构建跨语言流水线

如需将 RunInference 与 Java 流水线搭配使用,请创建跨语言 Python 转换。该流水线会调用转换,后者会执行预处理、后处理和推理。

如需了解详细说明和示例流水线,请参阅从 Java SDK 使用 RunInference

将 GPU 与 Dataflow 搭配使用

对于需要使用加速器的批处理或流式流水线,您可以在 NVIDIA GPU 设备上运行 Dataflow 流水线。如需了解详情,请参阅使用 GPU 运行 Dataflow 流水线

排查 Dataflow ML 问题

本部分提供了在使用 Dataflow ML 时可能有帮助的问题排查策略和链接。

堆栈要求每个张量的大小都相同

如果您在使用 RunInference API 时提供大小不同的图片或长度不同的字词嵌入,则可能会发生以下错误:

File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']

发生此错误的原因是 RunInference API 无法批量处理大小不同的张量元素。如需了解解决方法,请参阅 Apache Beam 文档中的无法批量处理张量元素

避免使用大型模型时出现内存不足错误

加载中型或大型机器学习模型时,您的机器可能会出现内存不足的问题。Dataflow 提供了一些工具,可帮助您在加载机器学习模型时避免内存不足 (OOM) 错误。请使用下表确定适合您情况的方法。

场景 解决方案
模型足够小,可以放入内存中。 使用 RunInference 转换,无需进行任何其他配置。RunInference 转换会跨线程共享模型。如果您的机器上每个 CPU 核心可以放置一个模型,则您的流水线可以使用默认配置。
多个训练方式不同的模型执行相同的任务。 使用按模型键。如需了解详情,请参阅使用多个不同训练的模型运行机器学习推理
一个模型会加载到内存中,所有进程共享此模型。

使用 large_model 参数。如需了解详情,请参阅使用多个不同训练的模型运行机器学习推理

如果您要构建自定义模型处理程序,请替换 share_model_across_processes 参数,而不是使用 large_model 参数。

您需要配置要加载到机器上的模型的确切数量。

如需精确控制要加载的模型数量,请使用 model_copies 参数。

如果您要构建自定义模型处理程序,请替换 model_copies 参数。

如需详细了解如何使用 Dataflow 进行内存管理,请参阅排查 Dataflow 内存不足错误

后续步骤