自动调节 Spark 工作负载

出于性能和弹性的需要,优化 Spark 工作负载可能具有挑战性, Spark 配置选项的数量,以及评估这些选项的难度 影响工作负载。Dataproc 无服务器的自动调节功能提供了一种 通过自动应用 Spark 配置设置来手动配置工作负载 基于 Spark 优化最佳实践和周期性 Spark 工作负载 来分析工作负载运行情况

注册 Dataproc 无服务器自动扩缩

注册以访问 Dataproc 无服务器自动调节 预览版, 填写并提交 BigQuery 中的 Gemini 预览版 注册表单。表单获得批准后,表单中列出的项目 使用预览版功能。

优势

Dataproc 无服务器的自动调节功能具有以下优势:

  • 提升效果:进行优化调整以提升效果
  • 优化更快捷:自动配置功能可避免耗时的手动工作 配置测试
  • 提高弹性:自动分配内存,以避免与内存相关的 失败

限制

Dataproc 无服务器自动调节具有以下限制:

  • 计算自动调节,并将其应用于 工作负载周期性工作负载的首次运行 由于 Dataproc 无服务器自动调节功能使用工作负载 优化历史记录。
  • 不支持内存缩减。
  • 自动调节不会追溯应用于正在运行的工作负载,只会应用于新 提交工作负载同类群组

自动调节同类群组

自动调节适用于批处理工作负载(称为“同类群组”)的周期性执行。 您在提交工作负载时指定的同类群组名称 将其标识为周期性工作负载的连续运行之一。 我们建议您使用能够描述 或以其他方式帮助将工作负载的运行 运行方式例如,指定 daily_sales_aggregation 作为 运行每日销售汇总任务的计划工作负载的同类群组名称。

自动调节场景

您可以将 Dataproc Serverless 自动调节应用于工作负载,具体方法是 选择以下一个或多个自动调节场景:

  • MEMORY:自动调整 Spark 内存分配,以预测并避免可能出现的 工作负载内存不足错误修复之前失败的工作负载 发生内存不足 (OOM) 错误。
  • SCALING:自动调整 Spark 自动扩缩配置设置。
  • BROADCAST_HASH_JOIN:自动调整 Spark 配置设置以优化 SQL 广播联接 性能

价格

预览版期间提供 Dataproc 无服务器的自动调节功能,无需额外付费。标准 需支付 Dataproc 无服务器价格

区域可用性

您可以将 Dataproc Serverless 自动调节功能用于批量提交 可用的 Compute Engine 区域

使用 Dataproc 无服务器自动调节

您可以在 使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API。

控制台

在每次提交时启用 Dataproc 无服务器的自动调节功能 请按照以下步骤进行操作:

  1. 在 Google Cloud 控制台中,前往 Dataproc 批次页面。

    转到 Dataproc 批次

  2. 如需创建批量工作负载,请点击创建

  3. 容器部分中,填写 以下字段:

    • 同类群组:您的同类群组名称, 将批次标识为一系列周期性工作负载中的一个。 自动调节会应用于提交的第二个及后续工作负载 此同类群组名称。例如,指定 daily_sales_aggregation 作为 运行每日销售汇总任务的计划工作负载的同类群组名称。

    • 自动调节场景:一个或多个 自动扩缩场景 优化工作负载,例如 BROADCAST_HASH_JOINMEMORYSCALING。 您可以在每次批量提交同类群组时更改场景选择。

  4. 根据需要填写创建批次页面的其他部分,然后点击 提交。如需详细了解这些字段,请参阅 提交批量工作负载

gcloud

在每次提交时启用 Dataproc 无服务器的自动调节功能 周期性批处理工作负载,请运行以下 gcloud CLI gcloud dataproc batches submit 命令行中的命令 Cloud Shell

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=SCENARIOS \
    other arguments ...

替换以下内容:

  • COMMAND:Spark 工作负载类型,例如 SparkPySpark Spark-SqlSpark-R
  • REGIONregion [地区] 运行工作负载的位置
  • COHORT:同类群组名称。 将批次标识为一系列周期性工作负载中的一个。 自动调节会应用于提交的第二个及后续工作负载 此同类群组名称。例如,指定 daily_sales_aggregation 作为 运行每日销售汇总任务的计划工作负载的同类群组名称。

  • SCENARIOS:一个或多个以英文逗号分隔 自动调节场景 优化工作负载,例如 --autotuning-scenarios=MEMORY,SCALING。 每次批量提交同类群组时,您都可以更改场景列表。

API

在每次提交时启用 Dataproc 无服务器的自动调节功能 周期性批处理工作负载 提交 batches.create 请求,其中包含以下字段:

  • RuntimeConfig.cohort:同类群组名称。 将批次标识为一系列周期性工作负载中的一个。 自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,指定 daily_sales_aggregation 作为 运行每日销售汇总任务的计划工作负载的同类群组名称。
  • AutotuningConfig.scenarios:一个或多个 自动调节场景 优化工作负载,例如 BROADCAST_HASH_JOINMEMORYSCALING。 每次批量提交同类群组时,您都可以更改场景列表。

示例:

...
runtimeConfig:
  cohort: daily_sales_aggregation
  autotuningConfig:
    scenarios:
    - BROADCAST_HASH_JOIN
    - MEMORY
    - SCALING
...

Java

在尝试此示例之前,请按照Java Dataproc 无服务器快速入门: 客户端库。 有关详情,请参阅 Dataproc 无服务器 Java API 参考文档

如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

在周期性批次每次提交时启用 Dataproc Serverless 自动调节功能 工作负载,调用 BatchControllerClient.createBatchCreateBatchRequest 搭配使用 包含以下字段:

  • Batch.RuntimeConfig.cohort:同类群组名称。 将批次标识为一系列周期性工作负载中的一个。 自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,您可以将 daily_sales_aggregation 指定为 运行每日销售汇总任务的计划工作负载的同类群组名称。
  • Batch.RuntimeConfig.AutotuningConfig.scenarios:一个或多个 自动调节场景 优化工作负载,例如 BROADCAST_HASH_JOINMEMORYSCALING。 每次批量提交同类群组时,您都可以更改场景列表。 有关完整的情景列表,请参阅 AutotuningConfig.Scenario Javadoc。

示例:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.SCALING))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

如需使用该 API,您必须使用 google-cloud-dataproc 客户端库版本 4.43.0 或更高版本。您可以使用以下任一配置将库添加到您的 项目。

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

在尝试此示例之前,请按照Python Dataproc 无服务器快速入门: 客户端库。 有关详情,请参阅 Dataproc 无服务器 Python API 参考文档

如需向 Dataproc Serverless 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

在周期性批次每次提交时启用 Dataproc Serverless 自动调节功能 工作负载,调用 BatchControllerClient.create_batch 使用 Batch 包含以下字段:

  • batch.runtime_config.cohort:同类群组名称。 将批次标识为一系列周期性工作负载中的一个。 自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,您可以将 daily_sales_aggregation 指定为 运行每日销售汇总任务的计划工作负载的同类群组名称。
  • batch.runtime_config.autotuning_config.scenarios:一个或多个 自动调节场景 优化工作负载,例如 BROADCAST_HASH_JOINMEMORYSCALING。 每次批量提交同类群组时,您都可以更改场景列表。 有关完整的情景列表,请参阅 场景 参考。

示例:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.SCALING
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

如需使用该 API,您必须使用 google-cloud-dataproc 客户端库版本 5.10.1 或更高版本。如需将其添加到项目中,您可以遵循以下要求:

google-cloud-dataproc>=5.10.1

Airflow

在周期性批次每次提交时启用 Dataproc Serverless 自动调节功能 工作负载,调用 BatchControllerClient.create_batch 使用 Batch 包含以下字段:

  • batch.runtime_config.cohort:同类群组名称。 将批次标识为一系列周期性工作负载中的一个。 自动调节应用于第二次及后续提交的工作负载 此同类群组名称。例如,您可以将 daily_sales_aggregation 指定为 运行每日销售汇总任务的计划工作负载的同类群组名称。
  • batch.runtime_config.autotuning_config.scenarios:一个或多个 自动调节场景 优化工作负载,例如 BROADCAST_HASH_JOINMEMORYSCALING。 每次批量提交同类群组时,您都可以更改场景列表。 有关完整的情景列表,请参阅 场景 参考。

示例:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.SCALING,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

如需使用该 API,您必须使用 google-cloud-dataproc 客户端库版本 5.10.1 或更高版本。您可以使用以下 Airflow 环境要求:

google-cloud-dataproc>=5.10.1

如需在 Cloud Composer 中更新软件包,请参阅 为 Cloud Composer 安装 Python 依赖项

查看自动调节更改

如需查看对批量工作负载的 Dataproc 无服务器自动扩缩更改,请执行以下操作: 运行 gcloud dataproc batches describe 命令。

示例:gcloud dataproc batches describe 输出类似于以下内容:

...
runtimeInfo:
   propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties
      spark.driver.memory:
        annotation: Driver OOM was detected
        value: 11520m
      spark.driver.memoryOverhead:
        annotation: Driver OOM was detected
        value: 4608m
    # Old overwritten properties.
    userProperties
...

您可以查看已应用的最新自动调节更改 正在运行的、已完成或失败的工作负载 Google Cloud 控制台中的批次详情页面。 在调查标签页下。

自动调节调查面板。