配置 Dataflow Prime 适配功能

概览

Dataflow Prime 是一个无服务器平台,使用横向和纵向扩缩来分配工作器和工作器资源;您无需指定流水线中使用的工作器数量或大小以及形状。要自定义工作器资源,您可以使用 Apache Beam 资源提示来指定整个流水线或特定流水线步骤的资源要求。Dataflow Prime 适配功能使用资源提示来自定义流水线的工作器资源。

限制和要求

在 Dataflow Prime 预览版阶段,资源提示可与 Apache Beam 2.30.0 或更高版本搭配使用。

可用的资源提示

Dataflow Prime 预览版提供以下资源提示:

  1. min_ram="numberGB":分配给工作器的最小 RAM(以 GB 为单位)。在为新工作器(横向扩缩)或现有工作器(纵向扩缩)分配内存时,Dataflow Prime 使用此值作为下限。

    • 使用流水线或流水线步骤所需的工作器内存的最大值来设置它。
    • min_ram 是每工作器的总量规范,而不是每 vCPU 规范。例如,如果您设置 min_ram=15GB,Dataflow 会将工作器中所有 vCPU 的可用总内存设置为至少 15 GB。
  2. accelerator="type:type;count:number;configuration-options":要使用的 GPU 类型、GPU 数量和 GPU 配置选项(要将 NVIDIA GPU 与 Dataflow 搭配使用,您必须设置“install-nvidia-driver”配置选项)。

资源提示嵌套

资源提示在流水线转换层次结构中的应用方式如下:

  • min_ram:转换的取值为在转换本身以及转换在层次结构中的所有父级转换上设置的值中最大的 min_ram 提示值。
    • 示例:如果内层转换提示将 min_ram 设置为 16 GB,并且层次结构中的外层转换提示将 min_ram 设置为 32 GB,则整个转换中的所有步骤都将使用 32 GB 的提示。
    • 示例:如果内层转换提示将 min_ram 设置为 16 GB,并且层次结构中的外层转换提示将 min_ram 设置为 8 GB,则不在内层转换中的所有外层转换步骤将使用 8 GB 的提示,而内层转换中的所有步骤将使用 16 GB 的提示。
  • accelerator:转换层次结构中的最内层值优先。
    • 示例:如果在一个层次结构中,内层转换 accelerator 提示与外层转换 accelerator 提示不同,则内层转换将使用内层转换 accelerator 提示。

使用资源提示

您可以设置整个流水线或流水线步骤的资源提示。

流水线资源提示

通过命令行运行流水线时,您可以设置整个流水线的资源提示。

示例

    python my_pipeline.py \
        --runner=DataflowRunner \
        --resource_hints=min_ram=numberGB \
        --resource_hints=accelerator="type:type;count:number;install-nvidia-driver" \
        ...

流水线步骤资源提示

您可以通过编程方式设置流水线步骤(转换)的资源提示。

Java

您可以使用 ResourceHints 以编程方式设置流水线转换的资源提示。

示例

pcoll.apply(MyCompositeTransform.of(...)
    .setResourceHints(
        ResourceHints.create()
            .withMinRam("15GB")
            .withAccelerator(
     "type:nvidia-tesla-k80;count:1;install-nvidia-driver")))

pcoll.apply(ParDo.of(new BigMemFn())
    .setResourceHints(
        ResourceHints.create().withMinRam("30GB")))

Python

您可以使用 PTransforms.with_resource_hints 以编程方式设置流水线转换的资源提示(另请参阅 ResourceHint)。

示例

pcoll | MyPTransform().with_resource_hints(
    min_ram="4GB",
    accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")

pcoll | beam.ParDo(BigMemFn()).with_resource_hints(
    min_ram="30GB")