运行 Apache Spark 批处理工作负载

设置

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  4. 启用 Dataproc API。

    启用 API

  5. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  6. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  7. 启用 Dataproc API。

    启用 API

提交 Spark 批处理工作负载

控制台

在 Google Cloud Console 中转到 Dataproc Batch。点击“创建”以打开创建批处理页面。

选择并填写页面上的以下字段以提交用于计算 pi 近似值的 Spark 批处理工作负载:

  • 批处理信息
    • 批处理 ID:指定批处理工作负载的 ID。此值必须为 4-63 个小写字符。有效字符为 /[az][0-9]-/。
    • 区域:选择将在其中运行工作负载的区域
  • 容器
    • 批处理类型:Spark
    • 主类:
      org.apache.spark.examples.SparkPi
    • Jar 文件:
      file:///usr/lib/spark/examples/jars/spark-examples.jar
    • 参数:1000
  • 执行配置 您可以指定要用于运行工作负载的服务帐号。如果您未指定服务帐号,则工作负载将在 Compute Engine 默认服务帐号下运行。
  • 网络配置 执行无服务器 Spark 工作负载的 VPC 子网必须满足 Dataproc Serverless for Spark 网络配置中列出的要求。子网列表显示所选网络中启用了专用 Google 访问通道的子网。
  • 属性:输入您希望 Spark 批处理工作负载使用的任何受支持的 Spark 属性的键(属性名称)值。注意:与 Dataproc on Compute Engine 集群属性不同,Dataproc for Spark 工作负载属性不包含“spark:”前缀。
  • 其他解决方案

点击提交以运行 Spark 批处理工作负载。

gcloud

如需提交 Spark 批量工作负载以计算 pi 的大致值,请在终端窗口或 Cloud Shell 中本地运行以下 gcloud CLI gcloud dataproc batches submit spark 命令。

gcloud dataproc batches submit spark \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    -- 1000

注意

  • 子网:执行无服务器 Spark 工作负载的 VPC 子网必须满足 Dataproc Serverless for Spark 网络配置中列出的要求。如果 gcloud dataproc batches submit 命令中指定的区域的 default 网络子网未启用专用 Google 访问通道,您必须执行以下任一操作:
    1. 为区域的默认网络子网启用专用 Google 访问通道,或
    2. 在命令中使用 --subnet=[SUBNET_URI] 标志来指定已启用专用 Google 访问通道的子网。您可以运行 gcloud compute networks describe [NETWORK_NAME] 命令以列出网络中子网的 URI。
  • --jars:预安装了示例 jar 文件,并且传递给 SparkPi 工作负载的 --1000 命令参数指定了 pi 估算逻辑的 1000 次迭代(工作负载输入参数包含在“--”后面)。
  • --properties:您可以添加 --properties 标志,输入您希望 Spark 批处理工作负载使用的任何受支持的 Spark 属性
  • --deps-bucket:您可以添加此标志以指定一个 Cloud Storage 存储桶,Dataproc Serverless for Spark 将在其中上传工作负载依赖项。存储桶的 gs:// URI 前缀不是必需的;您只能指定存储桶路径/名称,例如“mybucketname”。例外情况:如果您的批处理工作负载引用本地机器上的文件,则 --deps-bucket 标志是必需的;运行批处理工作负载之前,Dataproc Serverless for Spark 会将本地文件上传到存储桶中的 /dependencies 文件夹。
  • --container-image:您可以使用 Docker 映像命名格式“{hostname}/{project-id}/{image}:{tag}”指定自定义容器映像,例如 gcr.io/my-project-id/my-image:1.0.1您必须在 Container Registry 上托管自定义容器
  • 其他选项:
    • 您可以添加其他可选命令标志。例如,以下命令将批处理工作负载配置为使用具有标准 Spark 配置的自行管理式 Hive Metastore
      gcloud beta dataproc batches submit \
          --properties=spark.sql.catalogImplementation=hive,spark.hive.metastore.uris=METASTORE_URI,spark.hive.metastore.warehouse.dir=WAREHOUSE_DIR> \
          other args ...
                 
      如需了解支持的命令标志,请参阅 gcloud dataproc batches submit
    • 使用永久性历史记录服务器
      1. 在单节点 Dataproc 集群上创建永久性历史记录服务器 (PHS)。注意:Cloud Storage bucket-name 必须已存在。
        gcloud dataproc clusters create PHS-cluster-name \
            --region=region \
            --single-node \
            --enable-component-gateway \
            --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/phs/*/spark-job-history
                     
      2. 提交批处理工作负载,指定正在运行的永久性历史记录服务器。
        gcloud dataproc batches submit spark \
            --region=region \
            --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
            --class=org.apache.spark.examples.SparkPi \
            --history-server-cluster=projects/project-id/regions/region/clusters/PHS-cluster-name
            -- 1000
                      

REST 和命令行

本部分介绍如何使用 Dataproc Serverless for Spark batches.create API 创建批处理工作负载以计算 pi 的近似值。

在使用任何请求数据之前,请先进行以下替换:

  • project-id:Google Cloud 项目 ID
  • region区域
  • 注意
    • Custom-container-image:使用 Docker 映像命名格式指定自定义容器映像:{hostname}/{project-id}/{image}:{tag},例如“gcr.io/my-project-id/my-image:1.0.1”。 注意:您必须在 Container Registry 上托管自定义容器。
    • 子网:如果指定的 regiondefault 网络子网未启用专用 Google 访问通道,您必须执行以下任一操作:
      1. region 的默认网络子网启用专用 Google 访问通道,或者
      2. 使用 ExecutionConfig.subnetworkUri 字段指定启用了专用 Google 访问通道的子网。您可以运行 gcloud compute networks describe [NETWORK_NAME] 命令以列出网络中子网的 URI。
    • sparkBatch.jarFileUris:示例 jar 文件预安装在 Spark 执行环境中。“1000”sparkBatch.args 会传递给 SparkPi 工作负载,并指定 pi 估算逻辑的 1000 次迭代。
    • Spark properties:您可以使用 RuntimeConfig.properties 字段输入您希望 Spark 批处理工作负载使用的任何支持的 Spark 属性
    • 其他选项:

    HTTP 方法和网址:

    POST https://dataproc.googleapis.com/v1/projects/project-id/locations/region/batches

    请求 JSON 正文:

    {
      "sparkBatch":{
        "args":[
          "1000"
        ],
        "jarFileUris":[
          "file:///usr/lib/spark/examples/jars/spark-examples.jar"
        ],
        "mainClass":"org.apache.spark.examples.SparkPi"
      }
    }
    

    如需发送您的请求,请展开以下选项之一:

    您应该会收到类似以下内容的 JSON 响应:

    {
    "name":"projects/project-id/locations/region/batches/batch-id",
      "uuid":",uuid",
      "createTime":"2021-07-22T17:03:46.393957Z",
      "sparkBatch":{
        "mainClass":"org.apache.spark.examples.SparkPi",
        "args":[
          "1000"
        ],
        "jarFileUris":[
          "file:///usr/lib/spark/examples/jars/spark-examples.jar"
        ]
      },
      "runtimeInfo":{
        "outputUri":"gs://dataproc-.../driveroutput"
      },
      "state":"SUCCEEDED",
      "stateTime":"2021-07-22T17:06:30.301789Z",
      "creator":"account-email-address",
      "runtimeConfig":{
        "properties":{
          "spark:spark.executor.instances":"2",
          "spark:spark.driver.cores":"2",
          "spark:spark.executor.cores":"2",
          "spark:spark.app.name":"projects/project-id/locations/region/batches/batch-id"
        }
      },
      "environmentConfig":{
        "peripheralsConfig":{
          "sparkHistoryServerConfig":{
          }
        }
      },
      "operation":"projects/project-id/regions/region/operation-id"
    }