Ejecutar Spark en un clúster de Ray en Vertex AI

La biblioteca de Python RayDP permite ejecutar Spark en un clúster de Ray. En este documento se explica cómo instalar, configurar y ejecutar RayDP en Ray en Vertex AI (clúster de Ray en Vertex AI).

Instalación

Ray en Vertex AI permite a los usuarios ejecutar sus aplicaciones con el framework Ray de código abierto. RayDP proporciona APIs para ejecutar Spark en Ray. Las imágenes de contenedor prediseñadas disponibles para crear un clúster de Ray en Vertex AI no tienen RayDP preinstalado. Esto significa que debes crear una imagen de clúster de Ray personalizada en Vertex AI para que tu clúster de Ray en Vertex AI pueda ejecutar aplicaciones de RayDP en el clúster de Ray en Vertex AI. En la siguiente sección se explica cómo crear una imagen personalizada de RayDP.

Crear una imagen de contenedor personalizada de Ray en Vertex AI

Usa este archivo Dockerfile para crear una imagen de contenedor personalizada para Ray en Vertex AI que tenga instalado RayDP.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Puedes usar el clúster de Ray más reciente en la imagen precompilada de Vertex AI para crear la imagen personalizada de RayDP. También puedes instalar otros paquetes de Python que preveas que vas a usar en tus aplicaciones. El pyarrow==14.0 se debe a una restricción de dependencia de Ray 2.9.3.

Crea y envía la imagen de contenedor personalizada

Crea un repositorio de Docker en Artifact Registry antes de crear tu imagen personalizada (consulta Trabajar con imágenes de contenedor para saber cómo crear y configurar tu repositorio de Docker). Una vez que hayas creado el repositorio de Docker, compila y envía la imagen de contenedor personalizada con el archivo Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Donde:

  • LOCATION: la ubicación de Cloud Storage (por ejemplo, us-central1) que ha creado en Artifact Registry.
  • PROJECT_ID: tu ID de proyecto Google Cloud .
  • DOCKER_REPOSITORY: El nombre del repositorio de Docker que has creado.
  • IMAGE_NAME: el nombre de tus imágenes de contenedor personalizadas.

Crear un clúster de Ray en Vertex AI

Usa la imagen de contenedor personalizada creada en el paso anterior para crear un clúster de Ray en Vertex AI. Puedes usar el SDK de Vertex AI para Python para crear un clúster de Ray en Vertex AI.

Si aún no lo has hecho, instala las bibliotecas de Python necesarias.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configura los nodos Head y Worker, y crea el clúster con el SDK de Vertex AI para Python. Por ejemplo:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Donde:

  • CUSTOM_CONTAINER_IMAGE_URI: el URI de la imagen de contenedor personalizada enviada a Artifact Registry.
  • CLUSTER_NAME: el nombre de tu clúster de Ray en Vertex AI.

Clúster de Spark en Ray en Vertex AI

Antes de ejecutar tu aplicación Spark, crea una sesión de Spark con la API RayDP. Puedes usar el cliente de Ray para hacerlo de forma interactiva o usar la API de trabajo de Ray. Se recomienda usar la API de trabajos de Ray, sobre todo en aplicaciones de producción y de larga duración. La API RayDP proporciona parámetros para configurar la sesión de Spark, así como para admitir la configuración de Spark. Para obtener más información sobre la API de RayDP para crear una sesión de Spark, consulte Afinidad de nodos de actores maestros de Spark.

RayDP con el cliente de Ray

Puedes usar Task o Actor de Ray para crear un clúster y una sesión de Spark en el clúster de Ray en Vertex AI. Se requiere una tarea o un actor de Ray para usar un cliente de Ray y crear una sesión de Spark en el clúster de Ray de Vertex AI. El siguiente código muestra cómo puede un actor de Ray crear una sesión de Spark, ejecutar una aplicación de Spark y detener un clúster de Spark en un clúster de Ray en Vertex AI mediante RayDP.

Para conectarte de forma interactiva al clúster de Ray en Vertex AI, consulta Conectarse a un clúster de Ray mediante Ray Client.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP con la API Ray Job

El cliente de Ray es útil para pequeños experimentos que requieren una conexión interactiva con el clúster de Ray. La API Ray Job es la forma recomendada de ejecutar tareas de producción y de larga duración en un clúster de Ray. Esto también se aplica a la ejecución de aplicaciones de Spark en el clúster de Ray en Vertex AI.

Crea una secuencia de comandos de Python que contenga el código de tu aplicación Spark. Por ejemplo:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envía el trabajo para ejecutar la secuencia de comandos de Python mediante la API Ray Job. Por ejemplo:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Donde:

  • SCRIPT_NAME: el nombre de archivo de la secuencia de comandos que has creado.

Leer archivos de Cloud Storage desde una aplicación Spark

Es habitual almacenar archivos de datos en un segmento de Google Cloud Storage. Puedes leer estos archivos de varias formas desde una aplicación Spark que se ejecute en el clúster de Ray en Vertex AI. En esta sección se explican dos técnicas para leer archivos de Cloud Storage desde aplicaciones de Spark que se ejecutan en un clúster de Ray en Vertex AI.

Usar el conector de Google Cloud Storage

Puedes usar el Google Cloud conector para Hadoop para leer archivos de un segmento de Cloud Storage desde tu aplicación Spark. Después de crear una sesión de Spark con RayDP, puedes leer archivos con algunos parámetros de configuración. En el siguiente código se muestra cómo leer un archivo CSV almacenado en un segmento de Cloud Storage desde una aplicación Spark en el clúster de Ray en Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Donde:

  • GCS_FILE_URI: el URI de un archivo almacenado en un segmento de Cloud Storage. Por ejemplo: gs://my-bucket/my-file.csv.

Usar datos de Ray

El conector Google Cloud proporciona una forma de leer archivos de un Google Cloud contenedor y puede ser suficiente para la mayoría de los casos prácticos. Puede usar Ray Data para leer archivos del contenedor Google Cloud cuando necesite usar el procesamiento distribuido de Ray para leer datos o cuando tenga problemas para leer archivosGoogle Cloud con el conector Google Cloud . Esto puede ocurrir debido a conflictos de dependencias de Java cuando se añaden dependencias de otra aplicación a la ruta de clases de Java de Spark mediante spark.jars.packages o spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF de Pandas de PySpark en un clúster de Ray en Vertex AI

Las UDFs de Pandas de PySpark a veces requieren código adicional cuando las usas en tu aplicación de Spark que se ejecuta en un clúster de Ray en Vertex AI. Normalmente, esto es necesario cuando la función definida por el usuario de Pandas usa una biblioteca de Python que no está disponible en el clúster de Ray en Vertex AI. Puedes empaquetar las dependencias de Python de una aplicación mediante el entorno de ejecución con la API Ray Job. Después de enviar el trabajo de Ray al clúster, Ray instala esas dependencias en el entorno virtual de Python que crea para ejecutar el trabajo. Sin embargo, las UDFs de Pandas no usan el mismo entorno virtual. En su lugar, usan el entorno predeterminado del sistema Python. Si esa dependencia no está disponible en el entorno System, puede que tengas que instalarla en tu función definida por el usuario de Pandas. En el siguiente ejemplo, se instala la biblioteca statsmodels en la función definida por el usuario.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()