Exécuter Spark sur un cluster Ray sur Vertex AI

La bibliothèque Python RayDP permet d'exécuter Spark sur un cluster Ray. Ce document explique comment installer, configurer et exécuter RayDP sur Ray sur Vertex AI (cluster Ray sur Vertex AI).

Installation

Ray sur Vertex AI permet aux utilisateurs d'exécuter leurs applications à l'aide du framework Ray Open Source. RayDP fournit des API permettant d'exécuter Spark sur Ray. Les images de conteneurs prédéfinies disponibles pour créer un cluster Ray sur Vertex AI ne sont pas préinstallées avec RayDP. Vous devez donc créer une image de cluster Ray personnalisé sur Vertex AI pour que votre cluster Ray sur Vertex AI puisse exécuter des applications RayDP sur le cluster Ray sur Vertex AI. La section suivante explique comment créer une image personnalisée RayDP.

Créer une image de conteneur personnalisée Ray sur Vertex AI

Utilisez ce fichier Dockerfile pour créer une image de conteneur personnalisée pour Ray sur Vertex AI sur laquelle RayDP est installé.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Vous pouvez utiliser la dernière image prédéfinie de cluster Ray sur Vertex AI pour créer l'image personnalisée RayDP. Vous pouvez également installer d'autres packages Python que vous prévoyez d'utiliser dans vos applications. pyarrow==14.0 est dû à une contrainte de dépendance de Ray 2.33.0.

Créer et transférer une image de conteneur personnalisée

Vous devez créer un dépôt Docker dans Artifact Registry avant de pouvoir créer votre image personnalisée (consultez la section Utiliser des images de conteneur pour savoir comment créer et configurer votre dépôt Docker). Une fois le dépôt Docker créé, compilez et transférez l'image de conteneur personnalisée à l'aide du fichier Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Où :

  • LOCATION : emplacement Cloud Storage (par exemple, us-central1) que vous avez créé dans votre registre d'artefacts.
  • PROJECT_ID : ID de votre projet Google Cloud.
  • DOCKER_REPOSITORY : nom du dépôt Docker que vous avez créé.
  • IMAGE_NAME : nom de vos images de conteneurs personnalisées.

Créer un cluster Ray sur Vertex AI

Utilisez l'image de conteneur personnalisée créée à l'étape précédente pour créer un cluster Ray sur Vertex AI. Vous pouvez utiliser le SDK Vertex AI pour Python pour créer un cluster Ray sur Vertex AI.

Si vous ne l'avez pas déjà fait,installez les bibliothèques Python requises.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configurez les nœuds principaux et de calcul, puis créez le cluster à l'aide du SDK Vertex AI pour Python. Exemple :

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Où :

  • CUSTOM_CONTAINER_IMAGE_URI : URI de l'image de conteneur personnalisée transférée vers Artifact Registry.
  • CLUSTER_NAME : nom de votre cluster Ray sur Vertex AI.

Spark sur un cluster Ray sur Vertex AI

Avant de pouvoir exécuter votre application Spark, vous devez créer une session Spark à l'aide de l'API RayDP. Vous pouvez utiliser le client Ray pour effectuer cette opération de manière interactive ou utiliser l'API Ray Job. L'API Ray Job est recommandée, en particulier pour les applications de production et de longue durée. L'API RayDP fournit des paramètres pour configurer la session Spark, et prend en charge la configuration Spark. Pour en savoir plus sur l'API RayDP permettant de créer des sessions Spark, consultez la section Affinité des nœuds acteurs principaux Spark.

RayDP avec le client Ray

Vous pouvez utiliser une Tâche ou un Acteur RAY pour créer un cluster et une session Spark sur le cluster Ray sur Vertex AI. Une tâche ou un acteur RAY est nécessaire pour utiliser un client Ray afin de créer une session Spark sur le cluster Ray sur Vertex AI. Le code suivant montre comment un acteur Ray peut être utilisé pour créer une session Spark, exécuter une application Spark et arrêter un cluster Spark sur un cluster Ray sur Vertex AI à l'aide de RayDP.

Pour savoir comment vous connecter de manière interactive au cluster Ray sur Vertex AI, consultez la section Se connecter à un cluster Ray via le client Ray.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP avec l'API Ray Job

Le client Ray est utile pour les petits tests qui nécessitent une connexion interactive avec le cluster Ray. L'API Ray Job est la méthode recommandée pour exécuter des jobs de production et de longue durée sur un cluster Ray. Cela s'applique également à l'exécution d'applications Spark sur le cluster Ray sur Vertex AI.

Créez un script Python contenant le code de votre application Spark. Exemple :

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envoyez la tâche pour exécuter le script Python à l'aide de l'API Ray Job. Exemple :

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Où :

  • SCRIPT_NAME : nom du fichier du script que vous avez créé.

Lire des fichiers Cloud Storage à partir de l'application Spark

Il est courant de stocker des fichiers de données dans un bucket Google Cloud Storage. Il existe plusieurs façons de lire ces fichiers à partir d'une application Spark exécutée sur le cluster Ray sur Vertex AI. Cette section explique deux techniques permettant de lire des fichiers Cloud Storage à partir d'applications Spark exécutées sur un cluster Ray sur Vertex AI.

Utiliser le connecteur Google Cloud Storage

Vous pouvez utiliser le connecteur Google Cloud pour Hadoop afin de lire des fichiers à partir d'un bucket Cloud Storage depuis votre application Spark. Pour ce faire, quelques paramètres de configuration sont utilisés lorsqu'une session Spark est créée à l'aide de RayDP. Le code suivant montre comment un fichier CSV stocké dans un bucket Cloud Storage peut être lu à partir d'une application Spark sur le cluster Ray sur Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Où :

  • GCS_FILE_URI : URI d'un fichier stocké dans un bucket Cloud Storage. Exemple : gs://my-bucket/my-file.csv.

Utiliser Ray Data

Le connecteur Google Cloud permet de lire des fichiers à partir d'un bucket Google Cloud. Il peut suffire pour la plupart des cas d'utilisation. Vous pouvez utiliser Ray Data pour lire des fichiers à partir du bucket Google Cloud lorsque vous devez utiliser le traitement distribué de Ray pour lire des données ou lorsque vous rencontrez des problèmes de lecture de fichiers Google Cloud avec le connecteur Google Cloud. Ceci peut se produire en raison de conflits de dépendances Java lorsque d'autres dépendances d'application sont ajoutées au classpath Spark Java à l'aide de spark.jars.packages ou spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF Pyspark Pandas sur un cluster Ray sur Vertex AI

Les UDF Pyspark Pandas peuvent parfois nécessiter du code supplémentaire lorsque vous les utilisez dans votre application Spark s'exécutant sur un cluster Ray sur Vertex AI. Cette opération est généralement nécessaire lorsque l'UDF Pandas utilise une bibliothèque Python qui n'est pas disponible sur le cluster Ray sur Vertex AI. Il est possible de créer un package des dépendances Python d'une application à l'aide de l'environnement d'exécution avec l'API Ray Job. Lorsque le job Ray est envoyé au cluster, Ray installe ces dépendances dans l'environnement virtuel Python qu'il crée pour exécuter le job. Cependant, les UDF Pandas n'utilisent pas le même environnement virtuel. Ils utilisent plutôt l'environnement système Python par défaut. Si cette dépendance n'est pas disponible dans l'environnement système, vous devrez peut-être l'installer dans votre UDF Pandas. Dans l'exemple suivant, la bibliothèque statsmodels doit être installée dans l'UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()