La libreria Python RayDP consente di eseguire Spark su un cluster Ray. Questo documento tratta l'installazione, configurazione ed esecuzione di RayDP su Ray on Vertex AI (cluster RayDP Vertex AI).
Installazione
Ray on Vertex AI consente agli utenti di eseguire le proprie applicazioni utilizzando il framework Ray open source. RayDP fornisce API per l'esecuzione di Spark su Ray. Le immagini dei container predefiniti disponibili per creare un cluster Ray su Vertex AI non sono dotate di RayDP preinstallato, il che significa che devi creare un'immagine di cluster Ray personalizzato su Vertex AI per il tuo cluster Ray su Vertex AI per eseguire le applicazioni RayDP sul cluster Ray su Vertex AI. La sezione seguente spiega come un RayDP un'immagine personalizzata.
Crea un'immagine container personalizzata Ray on Vertex AI
Utilizza questo dockerfile per creare un'immagine container personalizzata per Ray on Vertex AI su cui è installato RayDP.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Puoi utilizzare l'immagine predefinita del cluster Ray su Vertex AI più recente per creare l'immagine personalizzata RayDP. Puoi anche installare altri pacchetti Python
che prevedi di usare nelle tue applicazioni. pyarrow==14.0
è dovuto
a un vincolo di dipendenza di Ray 2.33.0.
Crea ed esegui il push dell'immagine del container personalizzato
Devi creare un repository Docker Artifact Registry prima di poter creare un'immagine personalizzata (vedi Utilizzare le immagini container per scoprire come creare e configurare il repository Docker). Dopo aver creato il repository Docker, crea ed esegui il push dell'immagine container personalizzata utilizzando il Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Dove:
LOCATION
: la posizione Cloud Storage (ad es. us-central1) che hai creato nel tuo Artifact Registry.PROJECT_ID
: l'ID del tuo progetto Google Cloud.DOCKER_REPOSITORY
: il nome del repository Docker che hai creato.IMAGE_NAME
: il nome delle immagini container personalizzate.
Creare un cluster Ray su Vertex AI
Utilizza l'immagine container personalizzata creata nel passaggio precedente per creare un Ray su Vertex AI. Puoi utilizzare l'SDK Vertex AI per Python per creare un cluster Ray su Vertex AI.
Se non l'hai ancora fatto,installa le librerie Python necessarie.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configura i nodi Head e Worker e crea il cluster utilizzando l'SDK Vertex AI per Python. Ad esempio:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Dove:
CUSTOM_CONTAINER_IMAGE_URI
: il URI dell'immagine container personalizzata di cui è stato eseguito il push Artifact Registry.CLUSTER_NAME
: il nome del cluster Ray su Vertex AI.
Cluster Spark su Ray su Vertex AI
Prima di poter eseguire l'applicazione Spark, devi creare una sessione Spark usando l'API RayDP. Puoi utilizzare il client Ray per eseguire questa operazione in modo interattivo o utilizzare l'API job Ray. È consigliata l'API Ray job, in particolare per di produzione e applicazioni a lunga esecuzione. L'API RayDP fornisce parametri configurare la sessione Spark, oltre a supportare la configurazione di Spark. Per scoprire di più sull'API RayDP per la creazione di sessioni Spark, consulta Affinità dei nodi degli attori principali di Spark.
RayDP con il client Ray
Puoi usare Ray Attività o Attore per creare Cluster e sessione Spark sul cluster Ray su Vertex AI. Ray Task oppure L'attore deve utilizzare un Client Ray per creare una sessione Spark sul cluster Ray su Vertex AI. Il codice seguente mostra come un attore Ray può essere utilizzato per creare una sessione Spark, eseguire un'applicazione Spark e arrestare un cluster Spark su un cluster Ray su Vertex AI utilizzando RayDP.
Per sapere come connetterti in modo interattivo al cluster Ray su Vertex AI, vedi Connettiti a un cluster Ray tramite Ray Client
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP con l'API Ray Job
Il client Ray è utile per piccoli esperimenti che richiedono query con il cluster Ray. L'API Ray Job è il modo consigliato per eseguire job di produzione e a lungo termine su un cluster Ray. Questo vale anche per l'esecuzione di applicazioni Spark sul cluster Ray su Vertex AI.
Crea uno script Python contenente il codice dell'applicazione Spark. Ad esempio:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Invia il job per eseguire lo script Python utilizzando l'API Ray Job. Ad esempio:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Dove:
SCRIPT_NAME
: il nome del file dello script che hai creato.
Lettura dei file di Cloud Storage dall'applicazione Spark
È pratica comune archiviare i file di dati in un bucket Google Cloud Storage. Esistono diversi modi per leggere questi file da un'applicazione Spark in esecuzione sul cluster Ray su Vertex AI. Questa sezione illustra due tecniche per lettura dei file di Cloud Storage dalle applicazioni Spark in esecuzione su Ray Cluster su Vertex AI.
Utilizzare il connettore Google Cloud Storage
Puoi utilizzare Google Cloud Connector for Hadoop per leggere i file da un dal bucket Cloud Storage dall'applicazione Spark. Per farlo, usa di configurazione quando si crea una sessione Spark utilizzando RayDP. La il seguente codice mostra come è possibile eseguire un file CSV archiviato in un bucket Cloud Storage da un'applicazione Spark sul cluster Ray su Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP GCS Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Dove:
GCS_FILE_URI
: l'URI di un file archiviato in un bucket Cloud Storage. Ad esempio: gs://my-bucket/my-file.csv.
Utilizzare i dati di Ray
Il connettore Google Cloud consente di leggere i file da un account Google Cloud
e può essere sufficiente per la maggior parte dei casi d'uso. È consigliabile utilizzare
Ray Data per leggere i file dal bucket Google Cloud quando devi utilizzare
elaborazione distribuita per la lettura dei dati o in caso di problemi di lettura
file Google Cloud con il connettore Google Cloud, che potrebbe
si verificano a causa di conflitti di dipendenza Java quando un'altra applicazione
vengono aggiunte al classpath di Spark Java utilizzando
spark.jars.packages
o spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP GCS Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the GCS connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF Pyspark Pandas sull'ammasso Ray su Vertex AI
Le
UDF di Pandas di Pyspark
a volte possono richiedere codice aggiuntivo quando le utilizzi nella tua applicazione Spark eseguita su un cluster Ray su Vertex AI. In genere, questo è necessario quando l'UDF Pandas utilizza una libreria Python non disponibile sul cluster Ray su Vertex AI. È possibile pacchettizzare le
dipendenze Python
di un'applicazione utilizzando l'ambiente di runtime con l'API job Ray e, quando il
job Ray viene inviato al cluster, Ray installa queste dipendenze nell'ambiente virtuale Python che crea per l'esecuzione del job. Tuttavia, le UDF di Pandas non utilizzano lo stesso ambiente virtuale. Utilizzano invece l'ambiente di sistema Python predefinito. Se la dipendenza non è disponibile nel riquadro
potrebbe essere necessario installarlo nella tua funzione definita dall'utente Pandas. Nel
seguente esempio, la libreria statsmodels
deve essere installata all'interno della UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()