La bibliothèque Python RayDP permet d'exécuter Spark sur un cluster Ray. Ce document explique comment installer, configurer et exécuter RayDP sur Ray sur Vertex AI (cluster Ray sur Vertex AI).
Installation
Ray sur Vertex AI permet aux utilisateurs d'exécuter leurs applications à l'aide du framework Ray Open Source. RayDP fournit des API permettant d'exécuter Spark sur Ray. Les images de conteneurs prédéfinies disponibles pour créer un cluster Ray sur Vertex AI ne sont pas préinstallées avec RayDP. Vous devez donc créer une image de cluster Ray personnalisé sur Vertex AI pour que votre cluster Ray sur Vertex AI puisse exécuter des applications RayDP sur le cluster Ray sur Vertex AI. La section suivante explique comment créer une image personnalisée RayDP.
Créer une image de conteneur personnalisée Ray sur Vertex AI
Utilisez ce fichier Dockerfile pour créer une image de conteneur personnalisée pour Ray sur Vertex AI sur laquelle RayDP est installé.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Vous pouvez utiliser la dernière image prédéfinie de cluster Ray sur Vertex AI pour créer l'image personnalisée RayDP. Vous pouvez également installer d'autres packages Python que vous prévoyez d'utiliser dans vos applications. pyarrow==14.0
est dû à une contrainte de dépendance de Ray 2.33.0.
Créer et transférer une image de conteneur personnalisée
Vous devez créer un dépôt Docker dans Artifact Registry avant de pouvoir créer votre image personnalisée (consultez la section Utiliser des images de conteneur pour savoir comment créer et configurer votre dépôt Docker). Une fois le dépôt Docker créé, compilez et transférez l'image de conteneur personnalisée à l'aide du fichier Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Où :
LOCATION
: emplacement Cloud Storage (par exemple, us-central1) que vous avez créé dans votre registre d'artefacts.PROJECT_ID
: ID de votre projet Google Cloud.DOCKER_REPOSITORY
: nom du dépôt Docker que vous avez créé.IMAGE_NAME
: nom de vos images de conteneurs personnalisées.
Créer un cluster Ray sur Vertex AI
Utilisez l'image de conteneur personnalisée créée à l'étape précédente pour créer un cluster Ray sur Vertex AI. Vous pouvez utiliser le SDK Vertex AI pour Python pour créer un cluster Ray sur Vertex AI.
Si vous ne l'avez pas déjà fait,installez les bibliothèques Python requises.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configurez les nœuds principaux et de calcul, puis créez le cluster à l'aide du SDK Vertex AI pour Python. Exemple :
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Où :
CUSTOM_CONTAINER_IMAGE_URI
: URI de l'image de conteneur personnalisée transférée vers Artifact Registry.CLUSTER_NAME
: nom de votre cluster Ray sur Vertex AI.
Spark sur un cluster Ray sur Vertex AI
Avant de pouvoir exécuter votre application Spark, vous devez créer une session Spark à l'aide de l'API RayDP. Vous pouvez utiliser le client Ray pour effectuer cette opération de manière interactive ou utiliser l'API Ray Job. L'API Ray Job est recommandée, en particulier pour les applications de production et de longue durée. L'API RayDP fournit des paramètres pour configurer la session Spark, et prend en charge la configuration Spark. Pour en savoir plus sur l'API RayDP permettant de créer des sessions Spark, consultez la section Affinité des nœuds acteurs principaux Spark.
RayDP avec le client Ray
Vous pouvez utiliser une Tâche ou un Acteur RAY pour créer un cluster et une session Spark sur le cluster Ray sur Vertex AI. Une tâche ou un acteur RAY est nécessaire pour utiliser un client Ray afin de créer une session Spark sur le cluster Ray sur Vertex AI. Le code suivant montre comment un acteur Ray peut être utilisé pour créer une session Spark, exécuter une application Spark et arrêter un cluster Spark sur un cluster Ray sur Vertex AI à l'aide de RayDP.
Pour savoir comment vous connecter de manière interactive au cluster Ray sur Vertex AI, consultez la section Se connecter à un cluster Ray via le client Ray.
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP avec l'API Ray Job
Le client Ray est utile pour les petits tests qui nécessitent une connexion interactive avec le cluster Ray. L'API Ray Job est la méthode recommandée pour exécuter des jobs de production et de longue durée sur un cluster Ray. Cela s'applique également à l'exécution d'applications Spark sur le cluster Ray sur Vertex AI.
Créez un script Python contenant le code de votre application Spark. Exemple :
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Envoyez la tâche pour exécuter le script Python à l'aide de l'API Ray Job. Exemple :
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Où :
SCRIPT_NAME
: nom du fichier du script que vous avez créé.
Lire des fichiers Cloud Storage à partir de l'application Spark
Il est courant de stocker des fichiers de données dans un bucket Google Cloud Storage. Il existe plusieurs façons de lire ces fichiers à partir d'une application Spark exécutée sur le cluster Ray sur Vertex AI. Cette section explique deux techniques permettant de lire des fichiers Cloud Storage à partir d'applications Spark exécutées sur un cluster Ray sur Vertex AI.
Utiliser le connecteur Google Cloud Storage
Vous pouvez utiliser le connecteur Google Cloud pour Hadoop afin de lire des fichiers à partir d'un bucket Cloud Storage depuis votre application Spark. Pour ce faire, quelques paramètres de configuration sont utilisés lorsqu'une session Spark est créée à l'aide de RayDP. Le code suivant montre comment un fichier CSV stocké dans un bucket Cloud Storage peut être lu à partir d'une application Spark sur le cluster Ray sur Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP GCS Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Où :
GCS_FILE_URI
: URI d'un fichier stocké dans un bucket Cloud Storage. Exemple : gs://my-bucket/my-file.csv.
Utiliser Ray Data
Le connecteur Google Cloud permet de lire des fichiers à partir d'un bucket Google Cloud. Il peut suffire pour la plupart des cas d'utilisation. Vous pouvez utiliser Ray Data pour lire des fichiers à partir du bucket Google Cloud lorsque vous devez utiliser le traitement distribué de Ray pour lire des données ou lorsque vous rencontrez des problèmes de lecture de fichiers Google Cloud avec le connecteur Google Cloud. Ceci peut se produire en raison de conflits de dépendances Java lorsque d'autres dépendances d'application sont ajoutées au classpath Spark Java à l'aide de spark.jars.packages
ou spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP GCS Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the GCS connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF Pyspark Pandas sur un cluster Ray sur Vertex AI
Les UDF Pyspark Pandas peuvent parfois nécessiter du code supplémentaire lorsque vous les utilisez dans votre application Spark s'exécutant sur un cluster Ray sur Vertex AI. Cette opération est généralement nécessaire lorsque l'UDF Pandas utilise une bibliothèque Python qui n'est pas disponible sur le cluster Ray sur Vertex AI. Il est possible de créer un package des dépendances Python d'une application à l'aide de l'environnement d'exécution avec l'API Ray Job. Lorsque le job Ray est envoyé au cluster, Ray installe ces dépendances dans l'environnement virtuel Python qu'il crée pour exécuter le job. Cependant, les UDF Pandas n'utilisent pas le même environnement virtuel. Ils utilisent plutôt l'environnement système Python par défaut. Si cette dépendance n'est pas disponible dans l'environnement système, vous devrez peut-être l'installer dans votre UDF Pandas. Dans l'exemple suivant, la bibliothèque statsmodels
doit être installée dans l'UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()