您可以将 GPU 加速器附加到 Dataproc Serverless 批处理工作负载,以实现以下结果:
加快处理大规模数据分析工作负载的速度。
使用 GPU 机器学习库加快大型数据集上的模型训练速度。
执行高级数据分析,例如视频或自然语言处理。
所有受支持的 Dataproc Serverless Spark 运行时都会向每个工作负载节点添加 Spark RAPIDS 库。Dataproc Serverless Spark 运行时版本 1.1 还向工作负载节点添加了 XGBoost 库。这些库提供了强大的数据转换和机器学习工具,可供您在 GPU 加速型工作负载中使用。
GPU 优势
以下是将 GPU 与 Dataproc Serverless Spark 工作负载搭配使用的一些优势:
性能提升:GPU 加速功能可以显著提升 Spark 工作负载的性能,尤其是对于计算密集型任务(例如机器学习和深度学习、图表处理和复杂分析)。
加快模型训练速度:对于机器学习任务,附加 GPU 可以显著缩短训练模型所需的时间,让数据科学家和工程师能够快速迭代和实验。
可伸缩性:客户可以向节点添加更多 GPU 节点或更强大的 GPU,以处理日益复杂的处理需求。
费用效益:虽然 GPU 需要初始投资,但由于处理时间缩短且资源利用率更高,您可以随着时间的推移节省费用。
增强型数据分析:借助 GPU 加速,您可以对大型数据集执行高级分析,例如图片和视频分析以及自然语言处理。
改进产品:处理速度更快,可加快决策速度并提高应用响应速度。
限制和注意事项
您可以将 NVIDIA A100 或 NVIDIA L4 GPU 附加到 Dataproc Serverless 批处理工作负载。A100 和 L4 加速器的适用区域取决于 Compute Engine GPU 区域性可用性。
仅当使用 Dataproc Serverless Spark 运行时版本 1.x 时,XGBoost 库才会提供给 Dataproc Serverless GPU 加速型工作负载。
使用 XGBoost 的 Dataproc Serverless GPU 加速批处理会使用增加的 Compute Engine 配额。例如,如需运行使用 NVIDIA L4 GPU 的无服务器批处理工作负载,您必须分配 NVIDIA_L4_GPUS 配额。
启用了加速器的作业与
constraints/compute.requireShieldedVm
组织政策不兼容。如果贵组织强制执行此政策,其启用了加速器的作业将无法成功运行。将 RAPIDS GPU 加速与版本
2.2
之前受支持的 Dataproc Serverless 运行时搭配使用时,您必须将默认字符集设置为 UTF-8。如需了解详情,请参阅使用 GPU 加速器创建无服务器批处理工作负载。
价格
如需了解加速器价格信息,请参阅 Dataproc 无服务器价格。
准备工作
在创建挂接 GPU 加速器的无服务器批处理工作负载之前,请执行以下操作:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
使用 GPU 加速器创建无服务器批处理工作负载
提交一个 Dataproc Serverless 批量工作负载,该工作负载使用 NVIDIA L4 GPU 运行并行 PySpark 任务。使用 gcloud CLI 按照以下步骤操作:
点击展开,然后使用文本或代码编辑器创建所列的 PySpark 代码并将其保存到本地机器上的
test-py-spark-gpu.py
文件中。#!/usr/bin/env python """S8s Accelerators Example.""" import subprocess from typing import Any from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import IntegerType from pyspark.sql.types import StructField from pyspark.sql.types import StructType spark = SparkSession.builder.appName("joindemo").getOrCreate() def get_num_gpus(_: Any) -> int: """Returns the number of GPUs.""" p_nvidia_smi = subprocess.Popen( ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE ) p_wc = subprocess.Popen( ["wc", "-l"], stdin=p_nvidia_smi.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) [out, _] = p_wc.communicate() return int(out) num_workers = 5 result = ( spark.sparkContext.range(0, num_workers, 1, num_workers) .map(get_num_gpus) .collect() ) num_gpus = sum(result) print(f"Total accelerators: {num_gpus}") # Run the join example schema = StructType([StructField("value", IntegerType(), True)]) df = ( spark.sparkContext.parallelize(range(1, 10000001), 6) .map(lambda x: (x,)) .toDF(schema) ) df2 = ( spark.sparkContext.parallelize(range(1, 10000001), 6) .map(lambda x: (x,)) .toDF(schema) ) joined_df = ( df.select(col("value").alias("a")) .join(df2.select(col("value").alias("b")), col("a") == col("b")) .explain() )
在本地机器上使用 gcloud CLI 提交包含 5 个工作器的 Dataproc Serverless 无服务器批量作业,每个工作器都使用 L4 GPU 加速:
gcloud dataproc batches submit pyspark test-py-spark-gpu.py \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=1.1 \ --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
注意:
- PROJECT_ID:您的 Google Cloud 项目 ID。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- BUCKET_NAME:Cloud Storage 存储分区的名称。Spark 会先将工作负载依赖项上传到此存储桶中的
/dependencies
文件夹,然后再运行批处理工作负载。 - --version::所有受支持的 Dataproc 无服务器运行时都会将 RAPIDS 库添加到 GPU 加速型工作负载的每个节点。只有运行时版本 1.1 会向 GPU 加速型工作负载的每个节点添加 XGBoost 库。
--properties(请参阅 Spark 资源分配属性):
spark.dataproc.driverEnv.LANG=C.UTF-8
和spark.executorEnv.LANG=C.UTF-8
(对于2.2
之前的运行时版本是必需的):这些属性会将默认字符集设置为 C.UTF-8。spark.dataproc.executor.compute.tier=premium
(必需):高级数据计算单元 (DCU) 用于对采用 GPU 加速的工作负载进行结算。请参阅 Dataproc Serverless 加速器价格。spark.dataproc.executor.disk.tier=premium
(必需):配备 A100-40、A100-80 或 L4 加速器的节点必须使用高级磁盘层级。spark.dataproc.executor.resource.accelerator.type=l4
(必需):必须仅指定一种 GPU 类型。示例作业选择了 L4 GPU。您可以使用以下参数名称指定以下加速器类型:GPU 类型 参数名称 A100 40GB a100-40
A100 80GB a100-80
spark.executor.instances=5
(必填):必须至少有两个。在此示例中,将其设置为 5。spark.executor.cores
(可选):您可以设置此属性来指定核心 vCPU 的数量。L4 GPU 的有效值为4
(默认值),或8
、12
、16
、24
、48
或96
。A100 GPU 的唯一有效默认值为12
。配置了 L4 GPU 和24
、48
或96
核心的配置会在每个执行器上附加2
、4
或8
GPU。所有其他配置都已附加1
GPU。spark.dataproc.executor.disk.size
(必需):L4 GPU 的固定磁盘大小为 375 GB,但配置为24
、48
或96
核心的除外,它们的磁盘大小分别为750
、1,500
或3,000
GB。如果您在提交 L4 加速型工作负载时将此属性设为其他值,则会发生错误。如果您选择 A100 40 或 A100 80 GPU,则有效尺寸为 375g、750g、1500g、3000g、6000g 和 9000g。spark.executor.memory
(可选)和spark.executor.memoryOverhead
(可选):您可以设置其中一个属性,但不能同时设置这两个属性。未被已设置的属性占用的可用内存量会应用于未设置的属性。默认情况下,对于 PySpark 批处理工作负载,spark.executor.memoryOverhead
设置为可用内存的 40%;对于其他工作负载,则设置为 10%(请参阅 Spark 资源分配属性)。下表显示了可为不同 A100 和 L4 GPU 配置设置的最大内存量。这两个属性的最小值均为
1024
MB。A100 (40 GB) A100 (80 GB) L4(4 核) L4(8 核) L4(12 核) L4(16 核) L4(24 核) L4(48 个核心) L4(96 个核心) 总内存上限 (MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216 Spark RAPIDS 属性(可选):默认情况下,Dataproc Serverless 会设置以下 Spark RAPIDS 属性值:
spark.plugins
=com.nvidia.spark.SQLPluginspark.executor.resource.gpu.amount
=1spark.task.resource.gpu.amount
=1/$spark_executor_coresspark.shuffle.manager
=''。默认情况下,此属性未设置。 不过,NVIDIA 建议在使用 GPU 时启用 RAPIDS 洗牌管理器,以提高性能。为此,请在提交工作负载时设置spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
。
如需设置 Spark RAPIDS 属性,请参阅 RAPIDS Accelerator for Apache Spark Configuration;如需设置 Spark 高级属性,请参阅 RAPIDS Accelerator for Apache Spark Advanced Configuration。