将 GPU 与 Dataproc Serverless 搭配使用

您可以将 GPU 加速器挂接到 Dataproc 无服务器批处理工作负载,以实现以下结果:

  • 加快大规模数据分析工作负载的处理速度。

  • 使用 GPU 机器学习库加快基于大型数据集的模型训练。

  • 执行高级数据分析,例如视频或自然语言处理。

所有受支持的 Dataproc Serverless Spark 运行时都将 Spark RAPIDS 库添加到每个工作负载节点。Dataproc Serverless Spark 运行时版本 1.1 还向工作负载节点添加了 XGBoost 库。这些库提供了强大的数据转换和机器学习工具,供您用于加速 GPU 的工作负载。

GPU 的优势

以下是将 GPU 与 Dataproc Serverless Spark 工作负载搭配使用时可获享的一些优势:

  • 性能改进:GPU 加速可显著提升 Spark 工作负载的性能,尤其是对于机器学习和深度学习、图形处理以及复杂分析等计算密集型任务。

  • 更快的模型训练:对于机器学习任务,挂接 GPU 可以大幅缩短训练模型所需的时间,使数据科学家和工程师能够快速进行迭代和实验。

  • 可扩缩性:客户可以为节点添加更多 GPU 节点或更强大的 GPU,以处理日益复杂的处理需求。

  • 成本效益:虽然 GPU 需要初始投资,但随着处理时间的缩短和资源利用率的提高,您可以逐步节省成本。

  • 增强型数据分析:借助 GPU 加速功能,您可以对大型数据集执行高级分析,例如图片和视频分析以及自然语言处理。

  • 改进的产品:更快的处理速度可以加快决策速度,并使应用响应更快。

限制和注意事项

价格

如需了解加速器价格信息,请参阅 Dataproc 无服务器价格

准备工作

创建挂接了 GPU 加速器的无服务器批量工作负载之前,请执行以下操作:

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Dataproc, Compute Engine, and Cloud Storage API。

    启用 API

  5. 安装 Google Cloud CLI。
  6. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  7. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  8. 确保您的 Google Cloud 项目已启用结算功能

  9. 启用 Dataproc, Compute Engine, and Cloud Storage API。

    启用 API

  10. 安装 Google Cloud CLI。
  11. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  12. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”页面

  13. 点击创建存储分区
  14. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
    • 指定存储分区的名称中,输入符合存储分区命名要求的名称。
    • 对于选择数据存储位置,执行以下操作:
      • 选择位置类型选项。
      • 选择位置选项。
    • 对于为数据选择一个默认存储类别,请选择一个存储类别
    • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
    • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
  15. 点击创建

使用 GPU 加速器创建无服务器批量工作负载

提交使用 NVIDIA L4 GPU 运行并行化 PySpark 任务的 Dataproc Serverless 批量工作负载。使用 gcloud CLI 执行以下步骤:

  1. 点击展开,然后使用文本或代码编辑器创建列出的 PySpark 代码并将其保存到本地机器上的 test-py-spark-gpu.py 文件中。

    #!/usr/bin/env python
    
    """S8s Accelerators Example."""
    
    import subprocess
    from typing import Any
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StructType
    
    spark = SparkSession.builder.appName("joindemo").getOrCreate()
    
    def get_num_gpus(_: Any) -> int:
      """Returns the number of GPUs."""
      p_nvidia_smi = subprocess.Popen(
          ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
      )
      p_wc = subprocess.Popen(
          ["wc", "-l"],
          stdin=p_nvidia_smi.stdout,
          stdout=subprocess.PIPE,
          stderr=subprocess.PIPE,
          universal_newlines=True,
      )
      [out, _] = p_wc.communicate()
      return int(out)
    
    num_workers = 5
    result = (
        spark.sparkContext.range(0, num_workers, 1, num_workers)
        .map(get_num_gpus)
        .collect()
    )
    num_gpus = sum(result)
    print(f"Total accelerators: {num_gpus}")
    
    # Run the join example
    schema = StructType([StructField("value", IntegerType(), True)])
    df = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    df2 = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    joined_df = (
        df.select(col("value").alias("a"))
        .join(df2.select(col("value").alias("b")), col("a") == col("b"))
        .explain()
    )
    
    
  2. 使用本地机器上的 gcloud CLI 提交包含 5 个工作器的 Dataproc 无服务器批处理批量作业,每个工作器使用 L4 GPU 进行加速:

    gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_NAME \
        --version=1.1 \
        --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
    

备注:

  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • REGION:用于运行工作负载的可用 Compute Engine 区域
  • BUCKET_NAME:Cloud Storage 存储桶的名称。在运行批量工作负载之前,Spark 会将工作负载依赖项上传到此存储桶中的 /dependencies 文件夹。
  • --version::所有受支持的 Dataproc 无服务器运行时都会将 RAPIDS 库添加到由 GPU 加速的工作负载的每个节点中。目前,只有运行时版本 1.1 才能将 XGBoost 库添加到由 GPU 加速的工作负载的每个节点中。
  • --properties(请参阅 Spark 资源分配属性):

    • spark.dataproc.driverEnv.LANG=C.UTF-8spark.executorEnv.LANG=C.UTF-8(对于 2.2 之前的运行时版本是必需的):这些属性将默认字符集设置为 C.UTF-8。
    • spark.dataproc.executor.compute.tier=premium(必需):对 GPU 加速的工作负载使用付费数据计算单元 (DCU) 进行计费。请参阅 Dataproc Serverless Accelerator 价格

    • spark.dataproc.executor.disk.tier=premium(必需):具有 A100-40、A100-80 或 L4 加速器的节点必须使用高级磁盘层级。

    • spark.dataproc.executor.resource.accelerator.type=l4(必需):只能指定一种 GPU 类型。示例作业选择 L4 GPU。可以使用以下参数名称指定以下加速器类型:

      GPU 类型 参数名称
      A100 40GB a100-40
      A100 80GB a100-80

    • spark.executor.instances=5(必需):必须至少包含 2 个。在本示例中,该值设置为 5。

    • spark.executor.cores(可选):您可以设置此属性来指定核心 vCPU 的数量。L4 GPU 的有效值包括 4、默认值、81216。A100 GPU 唯一有效的默认值为 12

    • spark.dataproc.executor.disk.size(对于 A100-40A100-80 GPU 是可选的):Dataproc Serverless 将默认 GPU SSD 磁盘大小设置为 375 GB。使用 A100 40 或 A100 80 GPU 时,您可以更改大小(请参阅 Spark 资源分配属性)。L4 GPU 需要 375 GB 大小。如果您在提交 L4 加速的工作负载时将此属性设置为其他值,则会发生错误。

    • spark.executor.memory(可选)和 spark.executor.memoryOverhead(可选):您可以设置其中一个属性,但不能同时设置两者。系统会将 set 属性未占用的可用内存量应用于未设置属性。默认情况下,对于 PySpark 批量工作负载,spark.executor.memoryOverhead 设置为可用内存的 40%,对于其他工作负载,则设置为 10%(请参阅 Spark 资源分配属性)。

      下表显示了可以为不同的 A100 和 L4 GPU 配置设置的最大内存量。任一属性的最小值为 1024 MB。

      A100(40 GB) A100(80 GB) L4(4 核) L4(8 核) L4(12 核) L4(16 核)
      总内存上限 (MB) 78040 165080 13384 26768 40152 53536
    • Spark RAPIDS 属性(可选):默认情况下,Dataproc Serverless 设置以下 Spark RAPIDS 属性值:

      • spark.plugins=com.nvidia.spark.SQLPlugin
      • spark.executor.resource.gpu.amount=1
      • spark.task.resource.gpu.amount=1/$spark_executor_cores
      • spark.shuffle.manager=''。默认情况下,此属性未设置。 不过,NVIDIA 建议在使用 GPU 时启用 RAPIDS Shuffle 管理器来提高性能。为此,请在提交工作负载时设置 spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager

      如需设置 Spark RAPIDS 属性,请参阅适用于 Apache Spark 配置的 RAPIDS Accelerator;如需设置 Spark 高级属性,请参阅适用于 Apache Spark 高级配置的 RAPIDS Accelerator