自动调节 Spark 工作负载

由于 Spark 配置选项众多,并且很难评估这些选项对工作负载的影响,因此优化 Spark 工作负载以提高性能和弹性可能具有挑战性。Dataproc 无服务器的自动调节功能提供了一种 通过自动应用 Spark 配置设置来手动配置工作负载 基于 Spark 优化最佳实践和周期性 Spark 工作负载 来分析工作负载运行情况

注册使用 Dataproc Serverless 自动调整

如需注册以使用本页中所述的 Dataproc Serverless 自动调整预览版,请填写并提交 BigQuery 中的 Gemini 预览版注册表单。表单获得批准后,表单中列出的项目 使用预览版功能。

优势

Dataproc 无服务器的自动调节功能具有以下优势:

  • 效果提升:优化调整以提升效果
  • 更快地进行优化:自动配置,避免耗时的手动配置测试
  • 提高了弹性:自动分配内存以避免与内存相关的故障

限制

Dataproc Serverless 自动调整功能存在以下限制:

  • 系统会计算自动调整设置,并将其应用于工作负载的第二次运行及之后的运行。由于 Dataproc Serverless 自动调优功能会使用工作负载历史记录进行优化,因此周期性工作负载的首次运行不会自动调优。
  • 不支持内存缩减。
  • 自动调节不会追溯应用于正在运行的工作负载,只会应用于新 提交工作负载同类群组

自动调节同类群组

自动调整会应用于批量工作负载的周期性执行作业(称为“同类群组”)。您在提交工作负载时指定的同类群组名称 将其标识为周期性工作负载的连续运行之一。 我们建议您使用能够描述 或者通过其他方式将工作负载的运行 运行方式例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation 作为同类群组名称。

自动调节场景

您可以通过选择以下一个或多个自动调整场景,将 Dataproc Serverless 自动调整功能应用于您的工作负载:

  • MEMORY:自动调整 Spark 内存分配,以预测并避免可能出现的 工作负载内存不足错误修复之前因内存不足 (OOM) 错误而失败的工作负载。
  • SCALING:自动调整 Spark 自动扩缩配置设置。
  • BROADCAST_HASH_JOIN:自动调整 Spark 配置设置以优化 SQL 广播联接 性能

价格

在预览期间,Dataproc Serverless 自动调整功能免费提供。系统会按标准 Dataproc Serverless 价格收费。

区域可用性

您可以将 Dataproc Serverless 自动调节功能用于批量提交 可用的 Compute Engine 区域

使用 Dataproc 无服务器自动调节

您可以在 使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API。

控制台

如需在每次提交周期性批处理工作负载时启用 Dataproc Serverless 自动调整功能,请执行以下步骤:

  1. 在 Google Cloud 控制台中,前往 Dataproc 的批次页面。

    转到 Dataproc 批次

  2. 如需创建批量工作负载,请点击创建

  3. 容器部分中,填写 以下字段:

    • 同类群组同类群组名称,用于将批次标识为一系列周期性工作负载之一。系统会将自动调整应用于使用此同类群组名称提交的第二个工作负载及后续工作负载。例如,为运行每日销售汇总任务的定期工作负载指定 daily_sales_aggregation 作为同类群组名称。

    • 自动调节场景:一个或多个 自动扩缩场景 优化工作负载,例如 BROADCAST_HASH_JOINMEMORYSCALING。 您可以随时更改场景选择,每次提交批量同类群组时都可以更改。

  4. 根据需要填写创建批量页面的其他部分,然后点击提交。如需详细了解这些字段,请参阅 提交批量工作负载

gcloud

在每次提交时启用 Dataproc 无服务器的自动调节功能 需要运行以下 gcloud CLI gcloud dataproc batches submit 命令行中的命令 Cloud Shell

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=SCENARIOS \
    other arguments ...

替换以下内容:

  • COMMAND:Spark 工作负载类型,例如 SparkPySpark Spark-SqlSpark-R
  • REGION:工作负载将在其中运行的区域
  • COHORT同类群组名称,用于将批次标识为一系列周期性工作负载之一。 系统会将自动调整应用于使用此同类群组名称提交的第二个工作负载及后续工作负载。例如,指定 daily_sales_aggregation 作为 运行每日销售汇总任务的计划工作负载的同类群组名称。

  • SCENARIOS:一个或多个以英文逗号分隔 自动调节场景 优化工作负载,例如 --autotuning-scenarios=MEMORY,SCALING。 每次批量提交同类群组时,您都可以更改场景列表。

API

在每次提交时启用 Dataproc 无服务器的自动调节功能 周期性批处理工作负载 提交 batches.create 请求,其中包含以下字段:

  • RuntimeConfig.cohort:同类群组名称。 将批次标识为一系列周期性工作负载中的一个。 自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,指定 daily_sales_aggregation 作为 运行每日销售汇总任务的计划工作负载的同类群组名称。
  • AutotuningConfig.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。每次批量提交同类群组时,您都可以更改场景列表。

示例:

...
runtimeConfig:
  cohort: daily_sales_aggregation
  autotuningConfig:
    scenarios:
    - BROADCAST_HASH_JOIN
    - MEMORY
    - SCALING
...

Java

在尝试此示例之前,请按照《Dataproc 无服务器快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Dataproc Serverless Java API 参考文档

如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

在周期性批次每次提交时启用 Dataproc Serverless 自动调节功能 工作负载,调用 BatchControllerClient.createBatchCreateBatchRequest 搭配使用 包含以下字段:

  • Batch.RuntimeConfig.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,您可以将 daily_sales_aggregation 指定为运行每日销售汇总任务的定期工作负载的同类群组名称。
  • Batch.RuntimeConfig.AutotuningConfig.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随每次提交的批量同类群组一起更改场景列表。有关完整的情景列表,请参阅 AutotuningConfig.Scenario Javadoc。

示例:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.SCALING))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

如需使用该 API,您必须使用 google-cloud-dataproc 客户端库版本 4.43.0 或更新版本。您可以使用以下配置之一将库添加到项目中。

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

在尝试此示例之前,请按照Python Dataproc 无服务器快速入门: 客户端库。 如需了解详情,请参阅 Dataproc Serverless Python API 参考文档

如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

在周期性批次每次提交时启用 Dataproc Serverless 自动调节功能 工作负载,调用 BatchControllerClient.create_batch 使用 Batch 包含以下字段:

  • batch.runtime_config.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,您可以将 daily_sales_aggregation 指定为运行每日销售汇总任务的定期工作负载的同类群组名称。
  • batch.runtime_config.autotuning_config.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随每次提交批量同类群组而更改场景列表。有关完整的情景列表,请参阅 场景 参考。

示例:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.SCALING
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

如需使用该 API,您必须使用 google-cloud-dataproc 客户端库版本 5.10.1 或更新版本。如需将其添加到您的项目中,您可以使用以下要求:

google-cloud-dataproc>=5.10.1

Airflow

在周期性批次每次提交时启用 Dataproc Serverless 自动调节功能 工作负载,调用 BatchControllerClient.create_batch 使用 Batch 包含以下字段:

  • batch.runtime_config.cohort同类群组名称,用于将批次标识为一系列周期性工作负载之一。自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,您可以将 daily_sales_aggregation 指定为 运行每日销售汇总任务的计划工作负载的同类群组名称。
  • batch.runtime_config.autotuning_config.scenarios:一个或多个用于优化工作负载的自动调整场景,例如 BROADCAST_HASH_JOINMEMORYSCALING。您可以随每次提交批量同类群组而更改场景列表。如需查看场景的完整列表,请参阅场景参考文档。

示例:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.SCALING,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

如需使用该 API,您必须使用 google-cloud-dataproc 客户端库版本 5.10.1 或更新版本。您可以使用以下 Airflow 环境要求:

google-cloud-dataproc>=5.10.1

如需在 Cloud Composer 中更新该软件包,请参阅为 Cloud Composer 安装 Python 依赖项

查看自动调整更改

如需查看 Dataproc Serverless 对批处理工作负载的自动调整更改,请运行 gcloud dataproc batches describe 命令。

示例:gcloud dataproc batches describe 输出类似于以下内容:

...
runtimeInfo:
   propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties
      spark.driver.memory:
        annotation: Driver OOM was detected
        value: 11520m
      spark.driver.memoryOverhead:
        annotation: Driver OOM was detected
        value: 4608m
    # Old overwritten properties.
    userProperties
...

您可以查看已应用的最新自动调节更改 正在运行的、已完成或失败的工作负载 Google Cloud 控制台中的批次详情页面。 在调查标签页下。

“自动调整调查”面板。