The RayDP Python library makes it possible to run Spark on a Ray cluster. This document covers installing, configuring and running RayDP on Ray on Vertex AI (Ray cluster on Vertex AI).
Installation
Ray on Vertex AI enables users to run their applications using the open source Ray framework. RayDP provides APIs for running Spark on Ray. The prebuilt container images available to create a Ray cluster on Vertex AI don't come with RayDP pre-installed. This means you must create a custom Ray cluster on Vertex AI image for your Ray cluster on Vertex AI to run RayDP applications on Ray cluster on Vertex AI. The following section explains how to build a RayDP custom image.
Build a Ray on Vertex AI custom container image
Use this dockerfile to create a custom container image for Ray on Vertex AI that has RayDP installed.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
You can use the latest Ray cluster on Vertex AI prebuilt image for
creating the RayDP custom image. You can also install other Python packages
that you anticipate you'll use in your applications. The pyarrow==14.0 is due
to a dependency constraint of Ray 2.9.3.
Build and push the custom container image
Create a Docker repository in Artifact Registry before you build your custom image (see Work with container images for how to create and configure your Docker repository). After you create the docker repository, build and push the custom container image using the Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Where:
- LOCATION: The Cloud Storage location (for example, us-central1) that you created in your Artifact Registry.
- PROJECT_ID: Your Google Cloud project ID.
- DOCKER_REPOSITORY: The name of the docker repository that you created.
- IMAGE_NAME: The name of your custom container images.
Create a Ray cluster on Vertex AI
Use the custom container image built in the previous step to create a Ray cluster on Vertex AI. You can use the Vertex AI SDK for Python for creating a Ray cluster on Vertex AI.
If you haven't done so yet, install the required Python libraries.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configure Head and Worker nodes and create the cluster using Vertex AI SDK for Python. For example:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Where:
- CUSTOM_CONTAINER_IMAGE_URI: The URI of the custom container image pushed to Artifact Registry.
- CLUSTER_NAME: The name of your Ray cluster on Vertex AI.
Spark on Ray cluster on Vertex AI
Before you run your Spark application, create a Spark session using the RayDP API. You can use the Ray client for doing this interactively or use the Ray job API. The Ray job API is recommended, especially for production and long-running applications. The RayDP API provides parameters to configure the Spark session, as well as supporting Spark Configuration. Learn more about the RayDP API for creating Spark Session see Spark master actors node affinity.
RayDP with Ray client
You can use Ray Task or Actor to create a Spark cluster and session on the Ray cluster on Vertex AI. Ray Task, or Actor, is required to use a Ray Client to create a Spark session on the Ray cluster on Vertex AI. The following code shows how a Ray Actor can create a Spark Session, run a Spark application, and stop a Spark cluster on a Ray cluster on Vertex AI using RayDP.
To interactively connect to the Ray cluster on Vertex AI, see Connect to a Ray cluster through Ray Client
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP with Ray Job API
Ray client is useful for small experiments that require interactive connection with the Ray cluster. The Ray Job API is the recommended way to run long-running and production jobs on a Ray cluster. This also applies to running Spark applications on the Ray cluster on Vertex AI.
Create a Python script that contains your Spark application code. For example:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Submit the job to run the python script using Ray Job API. For example:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Where:
- SCRIPT_NAME: The filename of the script that you created.
Reading Cloud Storage files from Spark application
It's common practice to store data files in a Google Cloud Storage bucket. You can read these files in multiple ways from a Spark application that's running on the Ray cluster on Vertex AI. This section explains two techniques for reading Cloud Storage files from Spark applications running on Ray Cluster on Vertex AI.
Use the Google Cloud Storage Connector
You can use the Google Cloud Connector for Hadoop to read files from a Cloud Storage bucket from your Spark application. After you create a Spark session using RayDP, you can read files using a few configuration parameter. The following code shows how to read a CSV file stored in a Cloud Storage bucket from a Spark application on the Ray cluster on Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Where:
- GCS_FILE_URI: The URI of a file stored in a Cloud Storage bucket. For example: gs://my-bucket/my-file.csv.
Use Ray data
The Google Cloud connector provides a way to read files from a Google Cloud
bucket and it may be sufficient for most use cases. You might want to use
Ray Data to read files from the Google Cloud bucket when you need to use Ray's
distributed processing for reading data, or when you face issues reading
Google Cloud file with Google Cloud connector. This could possibly
happen because of Java dependency conflicts when some other application
dependencies are added to the Spark Java classpath using either
spark.jars.packages or spark.jars.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
Pyspark Pandas UDF on Ray cluster on Vertex AI
Pyspark Pandas UDFs
sometimes require additional code when you use them in your Spark
application running on a Ray cluster on Vertex AI. This is usually
required when the Pandas UDF uses a Python library that isn't available on
the Ray cluster on Vertex AI. You can package the
Python dependencies
of an application using the runtime environment with the Ray Job API. After you
submit the Ray job to the cluster, Ray installs those dependencies in the
Python virtual environment that it creates for running the job. The
Pandas UDFs,
however, don't use the same virtual environment. Instead, they use the default
Python System environment. If that dependency isn't available in the System
environment, you might need to install it within your Pandas UDF. In the
following example, install the statsmodels library within the UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()