La biblioteca de Python RayDP permite ejecutar Spark en un clúster de Ray. En este documento, se explica la instalación, configuración y ejecución de RayDP en Ray en Vertex AI (clúster de Ray en Vertex AI).
Instalación
Ray en Vertex AI permite a los usuarios ejecutar sus aplicaciones con el framework de Ray de código abierto. RayDP proporciona APIs para ejecutar Spark en Ray. Las imágenes de contenedor compiladas previamente disponibles para crear un clúster de Ray en Vertex AI no vienen con RayDP preinstalado, lo que significa que debes crear un clúster de Ray personalizado en la imagen de Vertex IA para tu clúster de Ray en Vertex AI para ejecutar aplicaciones de RayDP en el clúster de Ray en Vertex AI. En la siguiente sección, se explica cómo se puede compilar una imagen personalizada de RayDP.
Compila una imagen de contenedor personalizada de Ray en Vertex AI
Usa este Dockerfile para crear una imagen de contenedor personalizada para Ray en Vertex AI que tenga RayDP instalado.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Puedes usar el clúster de Ray más reciente en la imagen compilada con anterioridad de Vertex AI para crear la imagen personalizada de RayDP. También puedes instalar otros paquetes de Python que prevés que usarás en tus aplicaciones. pyarrow==14.0
se debe a una restricción de dependencia de Ray 2.9.3.
Compila y envía la imagen de contenedor personalizada
Debes crear un repositorio de Docker en Artifact Registry antes de poder compilar tu imagen personalizada (consulta Trabaja con imágenes de contenedor para obtener información sobre cómo crear y configurar tu repositorio de Docker). Una vez que creaste el repositorio de Docker, compila y envía la imagen de contenedor personalizada con el Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Donde:
LOCATION
: La ubicación de Cloud Storage (por ejemplo, us-central1) que creaste en tu Artifact Registry.PROJECT_ID
es el ID del proyecto de Google Cloud.DOCKER_REPOSITORY
: El nombre del repositorio de Docker que creaste.IMAGE_NAME
: El nombre de tus imágenes de contenedor personalizadas
Crea un clúster de Ray en Vertex AI
Usa la imagen de contenedor personalizada compilada en el paso anterior para crear un clúster de Ray en Vertex AI. Puedes usar el SDK de Vertex AI para Python para crear un clúster de Ray en Vertex AI.
Si aún no lo has hecho, instala las bibliotecas de Python necesarias.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configura los nodos principales y trabajadores, y crea el clúster con el SDK de Vertex AI para Python Por ejemplo:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Donde:
CUSTOM_CONTAINER_IMAGE_URI
: El URI de la imagen de contenedor personalizada enviada a Artifact Registry.CLUSTER_NAME
: El nombre de tu clúster de Ray en Vertex AI.
Clúster de Spark en Ray en Vertex AI
Antes de que puedas ejecutar la aplicación Spark, debes crear una sesión de Spark con la API de RayDP. Puedes usar el cliente de Ray para hacerlo de forma interactiva o usar la API de trabajo de Ray. Se recomienda la API de trabajo de Ray, en especial para las aplicaciones de producción y de larga duración. La API de RayDP proporciona parámetros para configurar la sesión de Spark y admite la configuración de Spark. Obtén más información sobre la API de RayDP para crear sesiones de Spark; consulta Afinidad de nodos de actores principales de Spark.
RayDP con el cliente de Ray
Puedes usar Ray Task o Actor para crear un clúster de Spark y una sesión en el clúster de Ray en Vertex AI. Se requiere la tarea de Ray o Actor para usar un cliente de Ray para crear una sesión de Spark en el clúster de Ray en Vertex AI. En el siguiente código, se muestra cómo se puede usar un Ray Actor para crear una sesión de Spark, ejecutar una aplicación de Spark y detener un clúster de Spark en un clúster de Ray en Vertex AI mediante RayDP.
Para obtener información sobre cómo conectarte de forma interactiva al clúster de Ray en Vertex AI, consulta Conéctate a un clúster de Ray a través del cliente de Ray.
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP con la API de Ray Job
El cliente de Ray es útil para experimentos pequeños que requieren una conexión interactiva con el clúster de Ray. La API de trabajo de Ray es la forma recomendada de ejecutar trabajos de producción y de larga duración en un clúster de Ray. Esto también se aplica a la ejecución de aplicaciones de Spark en el clúster de Ray en Vertex AI.
Crea una secuencia de comandos de Python que contenga el código de la aplicación Spark. Por ejemplo:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Envía el trabajo para ejecutar la secuencia de comandos de Python con la API de Ray Job. Por ejemplo:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Donde:
SCRIPT_NAME
: El nombre del archivo de la secuencia de comandos que creaste.
Lee los archivos de Cloud Storage desde la aplicación Spark
Es una práctica común almacenar archivos de datos en un bucket de Google Cloud Storage. Hay varias formas de leer estos archivos desde una aplicación de Spark que se ejecuta en el clúster de Ray en Vertex AI. En esta sección, se explican dos técnicas para leer archivos de Cloud Storage de aplicaciones de Spark que se ejecutan en el clúster de Ray en Vertex AI.
Usa el conector de Google Cloud Storage.
Puedes usar Google Cloud Connector para Hadoop para leer archivos de un bucket de Cloud Storage desde tu aplicación Spark. Esto se hace con algunos parámetros de configuración cuando se crea una sesión de Spark con RayDP. En el siguiente código, se muestra cómo se puede leer un archivo CSV almacenado en un bucket de Cloud Storage desde una aplicación Spark en el clúster de Ray en Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP GCS Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Donde:
GCS_FILE_URI
: El URI de un archivo almacenado en un bucket de Cloud Storage. Por ejemplo: gs://my-bucket/my-file.csv.
Usa datos de Ray
Google Cloud Connector proporciona una forma de leer archivos de un bucket de Google Cloud y puede ser suficiente para la mayoría de los casos de uso. Recomendamos usar Ray Data para leer archivos del bucket de Google Cloud cuando necesites usar el procesamiento distribuido de Ray para leer datos o cuando tengas problemas para leer el archivo de Google Cloud con el conector de Google Cloud, lo que podría suceder debido a conflictos de dependencia de Java cuando se agregan otras dependencias de la aplicación a la ruta de clase de Java de Spark mediante spark.jars.packages
o spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP GCS Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the GCS connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF de Pyspark Pandas en el clúster de Ray en Vertex AI
Las UDF de Pyspark Pandas a veces pueden requerir código adicional cuando las usas en tu aplicación de Spark que se ejecuta en un clúster de Ray en Vertex AI. Por lo general, esto es necesario cuando la UDF de Pandas usa una biblioteca de Python que no está disponible en el clúster de Ray en Vertex AI. Es posible empaquetar las dependencias de Python de una aplicación mediante el entorno de ejecución con la API de trabajo de Ray y, cuando el trabajo de Ray se envía al clúster, Ray instala esas dependencias en el entorno virtual de Python que crea para ejecutar el trabajo. Sin embargo, las UDF de Pandas no usan el mismo entorno virtual. En su lugar, usan el entorno predeterminado del sistema de Python. Si esa dependencia no está disponible en el entorno del sistema, es posible que debas instalarla en tu UDF de Pandas. En el siguiente ejemplo, la biblioteca statsmodels
debe instalarse en la UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()