Ejecuta Spark en el clúster de Ray en Vertex AI

La biblioteca de Python RayDP permite ejecutar Spark en un clúster de Ray. En este documento, se explica cómo instalar, configurar y ejecutar RayDP en Ray en Vertex AI (clúster de Ray en Vertex AI).

Instalación

Ray en Vertex AI permite a los usuarios ejecutar sus aplicaciones con el framework de Ray de código abierto. RayDP proporciona APIs para ejecutar Spark en Ray. Las imágenes de contenedor compiladas previamente disponibles para crear un clúster de Ray en Vertex AI no vienen con RayDP preinstalado, lo que significa que debes crear un clúster de Ray personalizado en la imagen de Vertex IA para tu clúster de Ray en Vertex AI para ejecutar aplicaciones de RayDP en el clúster de Ray en Vertex AI. En la siguiente sección, se explica cómo se puede compilar una imagen personalizada de RayDP.

Compila una imagen de contenedor personalizada de Ray en Vertex AI

Usa este Dockerfile para crear una imagen de contenedor personalizada para Ray en Vertex AI que tenga RayDP instalado.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Puedes usar la imagen compilada previamente más reciente del clúster de Ray en Vertex AI para crear la imagen personalizada de RayDP. También puedes instalar otros paquetes de Python que crees que usarás en tus aplicaciones. El pyarrow==14.0 se debe a una restricción de dependencia de Ray 2.33.0.

Compila y envía la imagen del contenedor personalizado

Debes crear un repositorio de Docker en Artifact Registry antes de poder compilar tu imagen personalizada (consulta Trabaja con imágenes de contenedor para obtener información sobre cómo crear y configurar tu repositorio de Docker). Una vez que hayas creado el repositorio de Docker, compila y envía la imagen de contenedor personalizada con el Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Donde:

  • LOCATION: Es la ubicación de Cloud Storage (por ejemplo, us-central1) que creaste en Artifact Registry.
  • PROJECT_ID es el ID del proyecto de Google Cloud.
  • DOCKER_REPOSITORY: Es el nombre del repositorio de Docker que creaste.
  • IMAGE_NAME: Es el nombre de tus imágenes de contenedor personalizadas.

Crea un clúster de Ray en Vertex AI

Usa la imagen de contenedor personalizada compilada en el paso anterior para crear un clúster de Ray en Vertex AI. Puedes usar el SDK de Vertex AI para Python para crear un clúster de Ray en Vertex AI.

Si aún no lo has hecho, instala las bibliotecas de Python necesarias.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configura los nodos principales y trabajadores, y crea el clúster con el SDK de Vertex AI para Python Por ejemplo:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Donde:

  • CUSTOM_CONTAINER_IMAGE_URI: Es el URI de la imagen del contenedor personalizado que se envió a Artifact Registry.
  • CLUSTER_NAME: El nombre de tu clúster de Ray en Vertex AI.

Spark en el clúster de Ray en Vertex AI

Antes de ejecutar tu aplicación de Spark, debes crear una sesión de Spark con la API de RayDP. Puedes usar el cliente de Ray para hacerlo de forma interactiva o usar la API de trabajo de Ray. Se recomienda usar la API de trabajo de Ray, en especial para aplicaciones de producción y de larga duración. La API de RayDP proporciona parámetros para configurar la sesión de Spark, además de admitir la configuración de Spark. Obtén más información sobre la API de RayDP para crear sesiones de Spark; consulta Afinidad de nodos de actores principales de Spark.

RayDP con el cliente de Ray

Puedes usar Ray Task o Actor para crear un clúster y una sesión de Spark en el clúster de Ray en Vertex AI. Se requiere la tarea de Ray o Actor para usar un cliente de Ray para crear una sesión de Spark en el clúster de Ray en Vertex AI. En el siguiente código, se muestra cómo se puede usar un actor de Ray para crear una sesión de Spark, ejecutar una aplicación de Spark y detener un clúster de Spark en un clúster de Ray en Vertex AI con RayDP.

Para obtener información sobre cómo conectarte de forma interactiva al clúster de Ray en Vertex AI, consulta Cómo conectarse a un clúster de Ray a través del cliente de Ray.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP con la API de Ray Job

El cliente de Ray es útil para experimentos pequeños que requieren una conexión interactiva con el clúster de Ray. La API de Ray Job es la forma recomendada de ejecutar trabajos de producción y de larga duración en un clúster de Ray. Esto también se aplica a la ejecución de aplicaciones de Spark en el clúster de Ray en Vertex AI.

Crea una secuencia de comandos de Python que contenga el código de tu aplicación de Spark. Por ejemplo:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envía el trabajo para ejecutar la secuencia de comandos de Python con la API de Ray Job. Por ejemplo:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Donde:

  • SCRIPT_NAME: Es el nombre del archivo de la secuencia de comandos que creaste.

Cómo leer archivos de Cloud Storage desde la aplicación de Spark

Es una práctica común almacenar archivos de datos en un bucket de Google Cloud Storage. Existen varias formas de leer estos archivos desde una aplicación de Spark que se ejecuta en el clúster de Ray en Vertex AI. En esta sección, se explican dos técnicas para leer archivos de Cloud Storage desde aplicaciones de Spark que se ejecutan en el clúster de Ray en Vertex AI.

Usa el conector de Google Cloud Storage

Puedes usar Google Cloud Connector para Hadoop para leer archivos de un bucket de Cloud Storage desde tu aplicación Spark. Esto se hace con algunos parámetros de configuración cuando se crea una sesión de Spark con RayDP. En el siguiente código, se muestra cómo se puede leer un archivo CSV almacenado en un bucket de Cloud Storage desde una aplicación de Spark en el clúster de Ray en Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Donde:

  • GCS_FILE_URI: Es el URI de un archivo almacenado en un bucket de Cloud Storage. Por ejemplo: gs://my-bucket/my-file.csv.

Usa datos de Ray

Google Cloud Connector proporciona una forma de leer archivos de un bucket de Google Cloud y puede ser suficiente para la mayoría de los casos de uso. Recomendamos usar Ray Data para leer archivos del bucket de Google Cloud cuando necesites usar el procesamiento distribuido de Ray para leer datos o cuando tengas problemas para leer el archivo de Google Cloud con el conector de Google Cloud, lo que podría suceder debido a conflictos de dependencia de Java cuando se agregan otras dependencias de la aplicación a la ruta de clase de Java de Spark mediante spark.jars.packages o spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF de Pyspark Pandas en el clúster de Ray en Vertex AI

Las UDF de Pyspark Pandas a veces pueden requerir código adicional cuando las usas en tu aplicación de Spark que se ejecuta en un clúster de Ray en Vertex AI. Por lo general, esto es obligatorio cuando la UDF de Pandas usa una biblioteca de Python que no está disponible en el clúster de Ray en Vertex AI. Es posible empaquetar las dependencias de Python de una aplicación mediante el entorno de ejecución con la API de trabajo de Ray y, cuando el trabajo de Ray se envía al clúster, Ray instala esas dependencias en el entorno virtual de Python que crea para ejecutar el trabajo. Sin embargo, las UDF de Pandas no usan el mismo entorno virtual. En su lugar, usan el entorno del sistema de Python predeterminado. Si esa dependencia no está disponible en el entorno del sistema, es posible que debas instalarla en tu UDF de Pandas. En el siguiente ejemplo, la biblioteca statsmodels se debe instalar dentro de la UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()