在 Vertex AI 的 Ray 叢集中執行 Spark

RayDP Python 程式庫可讓您在 Ray 叢集上執行 Spark。本文說明如何在 Ray on Vertex AI (Vertex AI 上的 Ray 叢集) 安裝、設定及執行 RayDP。

安裝

使用者可以透過 Vertex AI 上的 Ray,使用開放原始碼 Ray 架構執行應用程式。RayDP 提供在 Ray 上執行 Spark 的 API。您可以使用預建的容器映像檔在 Vertex AI 上建立 Ray 叢集,但這些映像檔並未預先安裝 RayDP。也就是說,您必須為 Vertex AI 上的 Ray 叢集建立自訂 Ray 叢集映像檔,才能在 Vertex AI 上的 Ray 叢集執行 RayDP 應用程式。下一節說明如何建構 RayDP 自訂映像檔。

建構 Vertex AI 上的 Ray 自訂容器映像檔

使用這個 Dockerfile,為 Vertex AI 上的 Ray 建立已安裝 RayDP 的自訂容器映像檔。

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

您可以使用 Vertex AI 預建映像檔中的最新 Ray 叢集,建立 RayDP 自訂映像檔。您也可以安裝預計會在應用程式中使用的其他 Python 套件。pyarrow==14.0 是因為 Ray 2.9.3 的依附元件限制所致。

建構及推送自訂容器映像檔

建構自訂映像檔之前,請先在 Artifact Registry 中建立 Docker 存放區 (如要瞭解如何建立及設定 Docker 存放區,請參閱「使用容器映像檔」)。建立 Docker 存放區後,請使用 Dockerfile 建構及推送自訂容器映像檔。

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

其中:

  • LOCATION:您在 Artifact Registry 中建立的 Cloud Storage 位置 (例如 us-central1)。
  • PROJECT_ID:您的 Google Cloud 專案 ID。
  • DOCKER_REPOSITORY:您建立的 Docker 存放區名稱。
  • IMAGE_NAME:自訂容器映像檔的名稱。

在 Vertex AI 上建立 Ray 叢集

使用上一步建構的自訂容器映像檔,在 Vertex AI 上建立 Ray 叢集。您可以使用 Python 適用的 Vertex AI SDK,在 Vertex AI 上建立 Ray 叢集。

如果尚未安裝,請先安裝必要的 Python 程式庫。

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

使用 Python 適用的 Vertex AI SDK 設定 Head 和 Worker 節點,並建立叢集。例如:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

其中:

  • CUSTOM_CONTAINER_IMAGE_URI:推送至 Artifact Registry 的自訂容器映像檔 URI。
  • CLUSTER_NAME:Vertex AI 上的 Ray 叢集名稱。

Vertex AI 上的 Ray 叢集中的 Spark

執行 Spark 應用程式前,請先使用 RayDP API 建立 Spark 工作階段。您可以透過 Ray 用戶端以互動方式執行這項操作,也可以使用 Ray 工作 API。我們建議使用 Ray 工作 API,特別是生產環境和長時間執行的應用程式。RayDP API 提供多項參數,可設定 Spark 工作階段,並支援 Spark 設定。如要進一步瞭解如何使用 RayDP API 建立 Spark 工作階段,請參閱「Spark master actors node affinity」。

搭配 Ray 用戶端的 RayDP

您可以使用 Ray TaskActor,在 Vertex AI 的 Ray 叢集上建立 Spark 叢集和工作階段。如要使用 Ray Client 在 Vertex AI 的 Ray 叢集上建立 Spark 工作階段,必須使用 Ray 工作或 Actor。下列程式碼顯示 Ray Actor 如何使用 RayDP,在 Vertex AI 的 Ray 叢集上建立 Spark 工作階段、執行 Spark 應用程式,以及停止 Spark 叢集。

如要以互動方式連線至 Vertex AI 上的 Ray 叢集,請參閱透過 Ray Client 連線至 Ray 叢集

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

搭配使用 RayDP 與 Ray Job API

如果小型實驗需要與 Ray 叢集建立互動式連線,Ray 用戶端就非常實用。建議使用 Ray Job API 在 Ray 叢集上執行長時間執行的工作和正式版工作。在 Vertex AI 的 Ray 叢集上執行 Spark 應用程式時,也適用這項做法。

建立包含 Spark 應用程式程式碼的 Python 指令碼。例如:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

使用 Ray Job API 提交工作,執行 Python 指令碼。例如:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

其中:

  • SCRIPT_NAME:您建立的指令碼檔案名稱。

從 Spark 應用程式讀取 Cloud Storage 檔案

一般做法是在 Google Cloud Storage 值區中儲存資料檔案。您可以透過多種方式,從在 Vertex AI 的 Ray 叢集上執行的 Spark 應用程式讀取這些檔案。本節說明兩種技術,可從在 Vertex AI 的 Ray 叢集上執行的 Spark 應用程式讀取 Cloud Storage 檔案。

使用 Google Cloud Storage 連接器

您可以使用 Google Cloud Hadoop 專用連接器,從 Spark 應用程式讀取 Cloud Storage 值區中的檔案。使用 RayDP 建立 Spark 工作階段後,您可以使用幾個設定參數讀取檔案。下列程式碼說明如何從 Vertex AI 的 Ray 叢集,讀取儲存在 Cloud Storage 值區中的 CSV 檔案。

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

其中:

  • GCS_FILE_URI:儲存在 Cloud Storage 值區中的檔案 URI。例如:gs://my-bucket/my-file.csv。

使用 Ray 資料

Google Cloud 連結器可從 Google Cloud值區讀取檔案,在大多數情況下都夠用。需要使用 Ray 的分散式處理功能讀取資料,或是使用 Google Cloud 連接器讀取Google Cloud 檔案時發生問題,您可能需要使用 Ray Data 從 Bucket 讀取檔案。 Google Cloud 如果使用 spark.jars.packagesspark.jars 將其他應用程式依附元件新增至 Spark Java 類別路徑,就可能發生 Java 依附元件衝突。

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Vertex AI 上的 Ray 叢集中的 PySpark Pandas UDF

在 Vertex AI 的 Ray 叢集上執行的 Spark 應用程式中使用 Pyspark Pandas UDF 時,有時需要額外程式碼。如果 Pandas UDF 使用的 Python 程式庫在 Vertex AI 的 Ray 叢集上無法使用,通常就需要這麼做。您可以使用 Ray Job API 搭配執行階段環境,將應用程式的 Python 依附元件封裝起來。將 Ray 工作提交至叢集後,Ray 會在為執行工作建立的 Python 虛擬環境中安裝這些依附元件。不過,Pandas UDF 不會使用相同的虛擬環境。而是使用預設的 Python 系統環境。如果系統環境中沒有該依附元件,您可能需要在 Pandas UDF 中安裝。在下列範例中,請在 UDF 中安裝 statsmodels 程式庫。

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()