The RayDP Python library makes it possible to run Spark on a Ray cluster. This document covers installing, configuring and running RayDP on Ray on Vertex AI (Ray cluster on Vertex AI).
Installation
Ray on Vertex AI enables users to run their applications using the open source Ray framework. RayDP provides APIs for running Spark on Ray. The prebuilt container images available to create a Ray cluster on Vertex AI don't come with RayDP pre-installed, which means you need to create a custom Ray cluster on Vertex AI image for your Ray cluster on Vertex AI to run RayDP applications on Ray cluster on Vertex AI. The following section explains how a RayDP custom image can be built.
Build a Ray on Vertex AI custom container image
Use this dockerfile to create a custom container image for Ray on Vertex AI that has RayDP installed.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
You can use the latest Ray cluster on Vertex AI prebuilt image for
creating the RayDP custom image. You can also install other Python packages
that you anticipate you'll use in your applications. The pyarrow==14.0
is due
to a dependency constraint of Ray 2.33.0.
Build and push the custom container image
You need to create a Docker Repository in Artifact Registry before you can build your custom image (see Work with container images for how to create and configure your Docker repository). Once you have the docker repository created, build and push the custom container image using the dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Where:
LOCATION
: The Cloud Storage location (for example, us-central1) that you created in your Artifact Registry.PROJECT_ID
: Your Google Cloud project ID.DOCKER_REPOSITORY
: The name of the docker repository that you created.IMAGE_NAME
: The name of your custom container images.
Create a Ray cluster on Vertex AI
Use the custom container image built in the previous step to create a Ray cluster on Vertex AI. You can use the Vertex AI SDK for Python for creating a Ray cluster on Vertex AI.
If you haven't done so yet,install the required Python libraries.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configure Head and Worker nodes and create the cluster using Vertex AI SDK for Python. For example:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Where:
CUSTOM_CONTAINER_IMAGE_URI
: The URI of the custom container image pushed to Artifact Registry.CLUSTER_NAME
: The name of your Ray cluster on Vertex AI.
Spark on Ray cluster on Vertex AI
Before you can run your Spark application, you need to create a Spark session using the RayDP API. You can use the Ray client for doing this interactively or use the Ray job API. The Ray job API is recommended, especially for production and long-running applications. The RayDP API provides parameters to configure the Spark session, as well as supporting Spark Configuration. Learn more about the RayDP API for creating Spark Session see Spark master actors node affinity.
RayDP with Ray client
You can use Ray Task or Actor to create a Spark cluster and session on the Ray cluster on Vertex AI. Ray Task, or Actor, is required to use a Ray Client to create a Spark session on the Ray cluster on Vertex AI. The following code shows how a Ray Actor can be used for creating a Spark Session, running a Spark application and stopping a Spark cluster on a Ray cluster on Vertex AI using RayDP.
For how to interactively connect to the Ray cluster on Vertex AI, see Connect to a Ray cluster through Ray Client
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP with Ray Job API
Ray client is useful for small experiments that require interactive connection with the Ray cluster. The Ray Job API is the recommended way to run long-running and production jobs on a Ray cluster. This also applies to running Spark applications on the Ray cluster on Vertex AI.
Create a Python script that contains your Spark application code. For example:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Submit the job to run the python script using Ray Job API. For example:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Where:
SCRIPT_NAME
: The filename of the script that you created.
Reading Cloud Storage files from Spark application
It's common practice to store data files in a Google Cloud Storage bucket. There are multiple ways to read these files from a Spark application running on the Ray cluster on Vertex AI. This section explains two techniques for reading Cloud Storage files from Spark applications running on Ray Cluster on Vertex AI.
Use the Google Cloud Storage Connector
You can use the Google Cloud Connector for Hadoop to read files from a Cloud Storage bucket from your Spark application. This is done using a few configuration parameters when a Spark session is created using RayDP. The following code shows how a CSV file stored in a Cloud Storage bucket can be read from a Spark application on the Ray cluster on Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Where:
GCS_FILE_URI
: The URI of a file stored in a Cloud Storage bucket. For example: gs://my-bucket/my-file.csv.
Use Ray data
The Google Cloud Connector provides a way to read files from a Google Cloud
bucket and it may be sufficient for most of the use cases. You may want to use
Ray Data to read files from the Google Cloud bucket when you need to use Ray's
distributed processing for reading data, or when you face issues reading
Google Cloud file with Google Google Cloud connector, which could possibly
happen because of Java dependency conflicts when some other application
dependencies are added to the Spark Java classpath using either
spark.jars.packages
or spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
Pyspark Pandas UDF on Ray cluster on Vertex AI
The
Pyspark Pandas UDFs
may sometimes require additional code when you're using them in your Spark
application running on a Ray cluster on Vertex AI. This is usually
required when the Pandas UDF is using a Python library that's not available on
the Ray cluster on Vertex AI. It's possible to package the
Python dependencies
of an application using Runtime Environment with Ray job API and when the
Ray job is submitted to the cluster, Ray installs those dependencies in the
Python virtual environment that it creates for running the job. The
Pandas UDFs,
however, don't use the same virtual environment. Instead, they use the default
Python System environment. If that dependency isn't available in the System
environment, you may need to install it within your Pandas UDF. In the
following example, the statsmodels
library must be installed within the UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()