在 Vertex AI 上运行 Spark on Ray 集群

借助 RayDP Python 库,您可以在 Ray 集群上运行 Spark。本文档介绍了如何在 Ray on Vertex AI(Ray on Vertex AI 集群)上安装、配置和运行 RayDP。

安装

Ray on Vertex AI 可让用户使用开源 Ray 框架运行其应用。RayDP 提供了用于运行 Spark on Ray 的 API。可用于创建 Ray on Vertex AI 集群的预构建容器映像未预安装 RayDP,这意味着您需要为 Ray on Vertex AI 集群创建自定义 Ray on Vertex AI 集群映像,以在 Ray on Vertex AI 集群上运行 RayDP 应用。以下部分介绍如何构建 RayDP 自定义映像。

构建 Ray on Vertex AI 自定义容器映像

使用此 Dockerfile 为安装了 RayDP 的 Ray on Vertex AI 创建自定义容器映像。

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

您可以使用最新的 Ray on Vertex AI 集群预构建映像来创建 RayDP 自定义映像。您还可以安装预计在应用中使用的其他 Python 软件包。pyarrow==14.0 是 Ray 2.9.3 的依赖项限制条件导致的。

构建并推送自定义容器映像

您需要先在 Artifact Registry 中创建 Docker 代码库,然后才能构建自定义映像(请参阅使用容器映像,了解如何创建和配置 Docker 代码库)。创建 Docker 代码库后,使用 Dockerfile 构建并推送自定义容器映像。

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

其中:

  • LOCATION:您在 Artifact Registry 中创建的 Cloud Storage 位置(例如 us-central1)。
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • DOCKER_REPOSITORY:您创建的 Docker 代码库的名称。
  • IMAGE_NAME:自定义容器映像的名称。

创建 Ray on Vertex AI 集群

使用上一步中构建的自定义容器映像创建 Ray on Vertex AI 集群。您可以使用 Vertex AI SDK for Python 创建 Ray on Vertex AI 集群。

如果您尚未安装所需的 Python 库,请进行安装。

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

配置头节点和工作器节点,并使用 Vertex AI SDK for Python 创建该集群。例如:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

其中:

  • CUSTOM_CONTAINER_IMAGE_URI:推送到 Artifact Registry 的自定义容器映像的 URI。
  • CLUSTER_NAME:Ray on Vertex AI 集群的名称。

Vertex AI 上的 Spark on Ray 集群

您需要先使用 RayDP API 创建 Spark 会话,然后才能运行 Spark 应用。您可以使用 Ray 客户端以交互方式执行此操作,也可以使用 Ray Job API。建议使用 Ray Job API,尤其是对于生产应用和长时间运行的应用。RayDP API 提供用于配置 Spark 会话的参数,并支持 Spark 配置。如需详细了解用于创建 Spark 会话的 RayDP API,请参阅 Spark 主 Actor 节点亲和性

将 RayDP 与 Ray 客户端搭配使用

您可以使用 Ray TaskActor 在 Ray on Vertex AI 集群上创建 Spark 集群和会话。Ray Task 或 Actor 是使用 Ray 客户端在 Ray on Vertex AI 集群上创建 Spark 会话所必需的。以下代码展示了如何使用 Ray Actor 通过 RayDP 在 Ray on Vertex AI 集群上创建 Spark 会话、运行 Spark 应用以及停止 Spark 集群。

如需了解如何以交互方式连接到 Ray on Vertex AI 集群,请参阅通过 Ray 客户端连接到 Ray 集群

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

将 RayDP 与 Ray Job API 搭配使用

Ray 客户端可用于需要与 Ray 集群进行交互式连接的小型实验。建议使用 Ray Job API 在 Ray 集群上运行长时间运行的生产作业。这也适用于在 Ray on Vertex AI 集群上运行 Spark 应用。

创建一个包含 Spark 应用代码的 Python 脚本。例如:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

提交作业以使用 Ray Job API 运行该 Python 脚本。例如:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

其中:

  • SCRIPT_NAME:您创建的脚本的文件名。

从 Spark 应用读取 Cloud Storage 文件

常见做法是将数据文件存储在 Google Cloud Storage 存储桶中。您可以通过多种方式从 Ray on Vertex AI 集群上运行的 Spark 应用读取这些文件。本部分介绍了从 Ray on Vertex AI 集群上运行的 Spark 应用读取 Cloud Storage 文件的两种方法。

使用 Google Cloud Storage 连接器

您可以使用适用于 Hadoop 的 Google Cloud 连接器从 Spark 应用读取 Cloud Storage 存储桶中的文件。使用 RayDP 创建 Spark 会话时,需要使用一些配置参数来完成此操作。以下代码展示了如何从 Ray on Vertex AI 集群上的 Spark 应用读取 Cloud Storage 存储桶中存储的 CSV 文件。

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

其中:

  • GCS_FILE_URI:存储在 Cloud Storage 存储桶中的文件的 URI。例如:gs://my-bucket/my-file.csv。

使用 Ray 数据

Google Cloud 连接器提供了一种从 Google Cloud 存储桶读取文件的方法,对于大多数应用场景来说,这种方法可能已足够。如果您需要使用 Ray 的分布式处理读取数据,或者在使用 Google Cloud 连接器读取 Google Cloud 文件时遇到问题(使用 spark.jars.packagesspark.jars 将一些其他应用依赖项添加到 Spark Java 类路径时,可能会因 Java 依赖项冲突而发生此问题),建议您使用 Ray Data 从 Google Cloud 存储桶读取文件。

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Ray Vertex AI 集群上的 Pyspark Pandas UDF

在 Ray Vertex AI 集群上运行的 Spark 应用中使用 Pyspark Pandas UDF 时,有时可能需要额外的代码。如果 Pandas UDF 使用的 Python 库在 Ray Vertex AI 集群上不可用,额外的代码通常是必需的。您可以将运行时环境与 Ray Job API 结合使用的应用的 Python 依赖项打包,当 Ray 作业提交到集群时,Ray 会将这些依赖项安装到为运行作业而创建的 Python 虚拟环境。但是,Pandas UDF 不使用相同的虚拟环境。它们使用默认的 Python 系统环境。如果该依赖项在系统环境中不可用,您可能需要将其安装在 Pandas UDF 中。在以下示例中,statsmodels 库必须安装在该 UDF 中。

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()