本文档介绍了如何自动调整 Spark 工作负载。由于 Spark 配置选项众多,并且很难评估这些选项对工作负载的影响,因此优化 Spark 工作负载以提高性能和弹性可能具有挑战性。Dataproc Serverless 自动调整功能可根据 Spark 优化最佳实践和对工作负载运行情况的分析,自动将 Spark 配置设置应用于周期性 Spark 工作负载,从而为手动工作负载配置提供了替代方案。
注册使用 Dataproc Serverless 自动调整
如需注册以使用本页中所述的 Dataproc Serverless 自动调整预览版,请填写并提交 BigQuery 中的 Gemini 预览版注册表单。表单获得批准后,表单中列出的项目便可使用预览版功能。
优势
Dataproc Serverless 自动调整功能可提供以下优势:
- 效果提升:优化调整以提升效果
- 更快地进行优化:自动配置,避免耗时的手动配置测试
- 提高了弹性:自动分配内存以避免与内存相关的故障
限制
Dataproc Serverless 自动调整功能存在以下限制:
- 系统会计算自动调整设置,并将其应用于工作负载的第二次运行及之后的运行。由于 Dataproc Serverless 自动调优功能会使用工作负载历史记录进行优化,因此周期性工作负载的首次运行不会自动调优。
- 不支持缩减内存。
- 自动调整不会向正在运行的工作负载应用,只会应用于新提交的工作负载同类群组。
自动调节同类群组
自动调整会应用于批量工作负载的周期性执行(称为“同类群组”)。您在提交工作负载时指定的同类群组名称会将其标识为周期性工作负载的连续运行作业之一。我们建议您使用描述工作负载类型或以其他方式帮助识别重复性工作负载中工作负载运行情况的同类群组名称。例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation
作为同类群组名称。
自动调节场景
您可以通过选择以下一个或多个自动调整场景,将 Dataproc Serverless 自动调整功能应用于您的工作负载:
MEMORY
:自动调整 Spark 内存分配,以预测和避免潜在的工作负载内存不足错误。修复之前因内存不足 (OOM) 错误而失败的工作负载。SCALING
:自动调整 Spark 自动扩缩配置设置。BROADCAST_HASH_JOIN
:自动调整 Spark 配置设置,以优化 SQL 广播联接性能。
价格
在预览期间,Dataproc Serverless 自动调整功能免费提供。系统会按标准 Dataproc Serverless 价格收费。
区域可用性
您可以将 Dataproc Serverless 自动调整功能与在可用 Compute Engine 区域中提交的批处理结合使用。
使用 Dataproc Serverless 自动调整
您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 为工作负载启用 Dataproc Serverless 自动调整功能。
控制台
如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请执行以下步骤:
在 Google Cloud 控制台中,前往 Dataproc 的批次页面。
如需创建批处理工作负载,请点击创建。
在容器部分,为您的 Spark 工作负载填写以下字段:
根据需要填写创建批量页面的其他部分,然后点击提交。如需详细了解这些字段,请参阅提交批处理工作负载。
gcloud
如需在每次提交周期性批处理工作负载时启用 Dataproc 无服务器自动调整功能,请在终端窗口或 Cloud Shell 中本地运行以下 gcloud CLI gcloud dataproc batches submit
命令。
gcloud dataproc batches submit COMMAND \ --region=REGION \ --cohort=COHORT \ --autotuning-scenarios=SCENARIOS \ other arguments ...
替换以下内容:
- COMMAND:Spark 工作负载类型,例如
Spark
、PySpark
、Spark-Sql
或Spark-R
。 - REGION:工作负载将在其中运行的区域。
COHORT:同类群组名称,用于将批次标识为一系列周期性工作负载之一。 系统会将自动调整应用于使用此同类群组名称提交的第二个工作负载及后续工作负载。例如,为运行每日销售汇总任务的定期工作负载指定
daily_sales_aggregation
作为同类群组名称。SCENARIOS:一个或多个以英文逗号分隔的自动调整方案,用于优化工作负载,例如
--autotuning-scenarios=MEMORY,SCALING
。您可以随每次提交批量同类群组而更改场景列表。
API
如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请提交包含以下字段的 batches.create 请求:
RuntimeConfig.cohort
:同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,为运行每日销售汇总任务的定期工作负载指定daily_sales_aggregation
作为同类群组名称。AutotuningConfig.scenarios
:一个或多个用于优化工作负载的自动调整方案,例如BROADCAST_HASH_JOIN
、MEMORY
和SCALING
。您可以随每次提交批量同类群组而更改场景列表。
示例:
...
runtimeConfig:
cohort: daily_sales_aggregation
autotuningConfig:
scenarios:
- BROADCAST_HASH_JOIN
- MEMORY
- SCALING
...
Java
在尝试此示例之前,请按照《Dataproc 无服务器快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Dataproc Serverless Java API 参考文档。
如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请使用包含以下字段的 CreateBatchRequest 调用 BatchControllerClient.createBatch:
Batch.RuntimeConfig.cohort
:同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,您可以将daily_sales_aggregation
指定为运行每日销售汇总任务的定期工作负载的同类群组名称。Batch.RuntimeConfig.AutotuningConfig.scenarios
:一个或多个用于优化工作负载的自动调整场景,例如BROADCAST_HASH_JOIN
、MEMORY
、SCALING
。您可以随每次提交批量同类群组而更改场景列表。如需查看场景的完整列表,请参阅 AutotuningConfig.Scenario Javadoc。
示例:
...
Batch batch =
Batch.newBuilder()
.setRuntimeConfig(
RuntimeConfig.newBuilder()
.setCohort("daily_sales_aggregation")
.setAutotuningConfig(
AutotuningConfig.newBuilder()
.addScenarios(Scenario.SCALING))
...
.build();
batchControllerClient.createBatch(
CreateBatchRequest.newBuilder()
.setParent(parent)
.setBatchId(batchId)
.setBatch(batch)
.build());
...
如需使用此 API,您必须使用 google-cloud-dataproc
客户端库 4.43.0
或更高版本。您可以使用以下配置之一将库添加到项目中。
Maven
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-dataproc</artifactId>
<version>4.43.0</version>
</dependency>
</dependencies>
Gradle
implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'
SBT
libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"
Python
在尝试此示例之前,请按照《Dataproc 无服务器快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Dataproc Serverless Python API 参考文档。
如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请使用包含以下字段的 Batch 调用 BatchControllerClient.create_batch:
batch.runtime_config.cohort
:同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,您可以将daily_sales_aggregation
指定为运行每日销售汇总任务的定期工作负载的同类群组名称。batch.runtime_config.autotuning_config.scenarios
:一个或多个用于优化工作负载的自动调整场景,例如BROADCAST_HASH_JOIN
、MEMORY
、SCALING
。您可以随每次提交批量同类群组而更改场景列表。如需查看场景的完整列表,请参阅场景参考文档。
示例:
# Create a client
client = dataproc_v1.BatchControllerClient()
# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
Scenario.SCALING
]
request = dataproc_v1.CreateBatchRequest(
parent="parent_value",
batch=batch,
)
# Make the request
operation = client.create_batch(request=request)
如需使用此 API,您必须使用 google-cloud-dataproc
客户端库 5.10.1
或更高版本。如需将其添加到您的项目中,您可以使用以下要求:
google-cloud-dataproc>=5.10.1
Airflow
如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请使用包含以下字段的 Batch 调用 BatchControllerClient.create_batch:
batch.runtime_config.cohort
:同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,您可以将daily_sales_aggregation
指定为运行每日销售汇总任务的定期工作负载的同类群组名称。batch.runtime_config.autotuning_config.scenarios
:一个或多个用于优化工作负载的自动调整场景,例如BROADCAST_HASH_JOIN
、MEMORY
、SCALING
。您可以随每次提交批量同类群组而更改场景列表。如需查看场景的完整列表,请参阅场景参考文档。
示例:
create_batch = DataprocCreateBatchOperator(
task_id="batch_create",
batch={
"pyspark_batch": {
"main_python_file_uri": PYTHON_FILE_LOCATION,
},
"environment_config": {
"peripherals_config": {
"spark_history_server_config": {
"dataproc_cluster": PHS_CLUSTER_PATH,
},
},
},
"runtime_config": {
"cohort": "daily_sales_aggregation",
"autotuning_config": {
"scenarios": [
Scenario.SCALING,
]
}
},
},
batch_id="BATCH_ID",
)
如需使用此 API,您必须使用 google-cloud-dataproc
客户端库 5.10.1
或更高版本。您可以使用以下 Airflow 环境要求:
google-cloud-dataproc>=5.10.1
如需在 Cloud Composer 中更新该软件包,请参阅为 Cloud Composer 安装 Python 依赖项 。
查看自动调整更改
如需查看 Dataproc Serverless 对批处理工作负载的自动调整更改,请运行 gcloud dataproc batches describe
命令。
示例:gcloud dataproc batches describe
输出类似于以下内容:
...
runtimeInfo:
propertiesInfo:
# Properties set by autotuning.
autotuningProperties
spark.driver.memory:
annotation: Driver OOM was detected
value: 11520m
spark.driver.memoryOverhead:
annotation: Driver OOM was detected
value: 4608m
# Old overwritten properties.
userProperties
...
您可以在 Google Cloud 控制台的批处理作业详情页面上的调查标签页下,查看应用于正在运行、已完成或已失败的工作负载的最新自动调整更改。