自动调优 Spark 工作负载

本文档介绍了如何自动调整 Spark 工作负载。由于 Spark 配置选项众多,并且很难评估这些选项对工作负载的影响,因此优化 Spark 工作负载以提高性能和弹性可能具有挑战性。Dataproc Serverless 自动调整功能可根据 Spark 优化最佳实践和对工作负载运行情况的分析,自动将 Spark 配置设置应用于周期性 Spark 工作负载,从而为手动工作负载配置提供了替代方案。

注册使用 Dataproc Serverless 自动调整

如需注册以使用本页中所述的 Dataproc Serverless 自动调整预览版,请填写并提交 BigQuery 中的 Gemini 预览版注册表单。表单获得批准后,表单中列出的项目便可使用预览版功能。

优势

Dataproc Serverless 自动调整功能可提供以下优势:

  • 效果提升:优化调整以提升效果
  • 更快地进行优化:自动配置,避免耗时的手动配置测试
  • 提高了弹性:自动分配内存以避免与内存相关的故障

限制

Dataproc Serverless 自动调整功能存在以下限制:

  • 系统会计算自动调整设置,并将其应用于工作负载的第二次运行及之后的运行。由于 Dataproc Serverless 自动调优功能会使用工作负载历史记录进行优化,因此周期性工作负载的首次运行不会自动调优。
  • 不支持缩减内存。
  • 自动调整不会向正在运行的工作负载应用,只会应用于新提交的工作负载同类群组。

自动调节同类群组

自动调整会应用于批量工作负载的周期性执行(称为“同类群组”)。 您在提交工作负载时指定的同类群组名称会将其标识为周期性工作负载的连续运行作业之一。我们建议您使用描述工作负载类型或以其他方式帮助识别重复性工作负载中工作负载运行情况的同类群组名称。例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation 作为同类群组名称。

自动调节场景

您可以通过选择以下一个或多个自动调整场景,将 Dataproc Serverless 自动调整功能应用于您的工作负载:

  • MEMORY:自动调整 Spark 内存分配,以预测和避免潜在的工作负载内存不足错误。修复之前因内存不足 (OOM) 错误而失败的工作负载。
  • SCALING:自动调整 Spark 自动扩缩配置设置。
  • BROADCAST_HASH_JOIN:自动调整 Spark 配置设置,以优化 SQL 广播联接性能。

价格

在预览期间,Dataproc Serverless 自动调整功能免费提供。系统会按标准 Dataproc Serverless 价格收费。

区域可用性

您可以将 Dataproc Serverless 自动调整功能与在可用 Compute Engine 区域中提交的批处理结合使用。

使用 Dataproc Serverless 自动调整

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 在工作负载上启用 Dataproc Serverless 自动调整。

控制台

如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请执行以下步骤:

  1. 在 Google Cloud 控制台中,前往 Dataproc 的批次页面。

    转到 Dataproc 批次

  2. 如需创建批处理工作负载,请点击创建

  3. 容器部分,为您的 Spark 工作负载填写以下字段:

    • 同类群组同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会将自动调整应用于使用此同类群组名称提交的第二个工作负载及后续工作负载。例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation 作为同类群组名称。

    • 自动调节场景:一个或多个用于优化工作负载的自动调节场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随时更改场景选择,每次提交批量同类群组时都可以更改。

  4. 根据需要填写创建批量页面的其他部分,然后点击提交。如需详细了解这些字段,请参阅提交批处理工作负载

gcloud

如需在每次提交周期性批处理工作负载时启用 Dataproc 无服务器自动调整功能,请在终端窗口或 Cloud Shell 中本地运行以下 gcloud CLI gcloud dataproc batches submit 命令。

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=SCENARIOS \
    other arguments ...

替换以下内容:

  • COMMAND:Spark 工作负载类型,例如 SparkPySparkSpark-SqlSpark-R
  • REGION:工作负载将在其中运行的区域
  • COHORT同类群组名称,用于将批次标识为一系列周期性工作负载之一。 系统会将自动调整应用于使用此同类群组名称提交的第二个工作负载及后续工作负载。例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation 作为同类群组名称。

  • SCENARIOS:一个或多个以英文逗号分隔的自动调整方案,用于优化工作负载,例如 --autotuning-scenarios=MEMORY,SCALING。您可以随每次提交批量同类群组而更改场景列表。

API

如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请提交包含以下字段的 batches.create 请求:

  • RuntimeConfig.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation 作为同类群组名称。
  • AutotuningConfig.scenarios:一个或多个用于优化工作负载的自动调整方案,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以在每次提交批量同类群组时更改场景列表。

示例:

...
runtimeConfig:
  cohort: daily_sales_aggregation
  autotuningConfig:
    scenarios:
    - BROADCAST_HASH_JOIN
    - MEMORY
    - SCALING
...

Java

在尝试此示例之前,请按照《Dataproc 无服务器快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Dataproc Serverless Java API 参考文档

如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请使用包含以下字段的 CreateBatchRequest 调用 BatchControllerClient.createBatch

  • Batch.RuntimeConfig.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,您可以将 daily_sales_aggregation 指定为运行每日销售汇总任务的定期工作负载的同类群组名称。
  • Batch.RuntimeConfig.AutotuningConfig.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随每次提交的批量同类群组一起更改场景列表。如需查看场景的完整列表,请参阅 AutotuningConfig.Scenario Javadoc。

示例:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.SCALING))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

如需使用此 API,您必须使用 google-cloud-dataproc 客户端库 4.43.0 或更高版本。您可以使用以下配置之一将库添加到项目中。

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

在尝试此示例之前,请按照《Dataproc 无服务器快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Dataproc Serverless Python API 参考文档

如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请使用包含以下字段的 Batch 调用 BatchControllerClient.create_batch

  • batch.runtime_config.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,您可以将 daily_sales_aggregation 指定为运行每日销售汇总任务的定期工作负载的同类群组名称。
  • batch.runtime_config.autotuning_config.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随每次提交批量同类群组而更改场景列表。如需查看场景的完整列表,请参阅场景参考文档。

示例:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.SCALING
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

如需使用此 API,您必须使用 google-cloud-dataproc 客户端库 5.10.1 或更高版本。如需将其添加到您的项目中,您可以使用以下要求:

google-cloud-dataproc>=5.10.1

Airflow

如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请使用包含以下字段的 Batch 调用 BatchControllerClient.create_batch

  • batch.runtime_config.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会对使用此同类群组名称提交的第二个工作负载及后续工作负载应用自动调整功能。例如,您可以将 daily_sales_aggregation 指定为运行每日销售汇总任务的定期工作负载的同类群组名称。
  • batch.runtime_config.autotuning_config.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随每次提交批量同类群组而更改场景列表。如需查看场景的完整列表,请参阅场景参考文档。

示例:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.SCALING,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

如需使用此 API,您必须使用 google-cloud-dataproc 客户端库 5.10.1 或更高版本。您可以使用以下 Airflow 环境要求:

google-cloud-dataproc>=5.10.1

如需在 Cloud Composer 中更新该软件包,请参阅为 Cloud Composer 安装 Python 依赖项

查看自动调整更改

如需查看 Dataproc Serverless 对批处理工作负载的自动调整更改,请运行 gcloud dataproc batches describe 命令。

示例:gcloud dataproc batches describe 输出类似于以下内容:

...
runtimeInfo:
   propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties
      spark.driver.memory:
        annotation: Driver OOM was detected
        value: 11520m
      spark.driver.memoryOverhead:
        annotation: Driver OOM was detected
        value: 4608m
    # Old overwritten properties.
    userProperties
...

您可以在 Google Cloud 控制台的 Batch details(批处理详情)页面上的 Investigate(调查)标签页下,查看应用于正在运行、已完成或已失败的工作负载的最新自动调整更改。

“自动调整调查”面板。