Dataflow ML 简介

您可以将 Dataflow ML 的规模数据处理功能用于预测和推断流水线,以及为训练准备数据

Dataflow ML 工作流图。

图 1:完整的 Dataflow ML 工作流。

要求和限制

  • Dataflow ML 支持批处理和流处理流水线。
  • Apache Beam 2.40.0 及更高版本支持 RunInference API。
  • Apache Beam 2.53.0 及更高版本支持 MLTransform API。
  • 模型处理程序适用于 PyTorch、scikit-learn、TensorFlow、ONNX 和 TensorRT。对于不受支持的框架,您可以使用自定义模型处理程序。

为训练准备数据

预测和推断流水线

Dataflow ML 结合了 Dataflow 和 Apache Beam 的 RunInference API 的强大功能。使用 RunInference API,您可以定义模型的特征和属性,并将该配置传递给 RunInference 转换。此功能可让用户在其 Dataflow 流水线中运行模型,而无需了解模型的实现细节。您可以选择最适合您数据的框架,例如 TensorFlow 和 PyTorch。

在流水线中运行多个模型

使用 RunInference 转换向 Dataflow 流水线添加多个推理模型。如需了解详情(包括代码详情),请参阅 Apache Beam 文档中的多模型流水线

构建跨语言流水线

如需将 RunInference 与 Java 流水线搭配使用,请创建跨语言 Python 转换。该流水线会调用转换,后者会执行预处理、后处理和推理。

如需详细说明和示例流水线,请参阅在 Java SDK 中使用 RunInference

将 GPU 与 Dataflow 搭配使用

对于需要使用加速器的批量或流式处理流水线,您可以在 NVIDIA GPU 设备上运行 Dataflow 流水线。如需了解详情,请参阅使用 GPU 运行 Dataflow 流水线

排查 Dataflow ML 问题

本部分介绍了使用 Dataflow ML 时可能有用的排查问题策略和链接。

堆栈要求每个张量具有相同大小

如果您在使用 RunInference API 时提供大小不同的图片或长度不同的字词嵌入,则可能会发生以下错误:

File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']

发生此错误的原因是 RunInference API 无法批量处理大小不同的张量元素。如需了解解决方法,请参阅 Apache Beam 文档中的无法批量处理张量元素

后续步骤