Mit der Python-Bibliothek RayDP kann Spark auf einem Ray-Cluster ausgeführt werden. In diesem Dokument wird das Installieren, Konfigurieren und Ausführen von RayDP auf Ray on Vertex AI (Ray-Cluster in Vertex AI) behandelt.
Installation
Mit Ray on Vertex AI können Nutzer ihre Anwendungen mit dem Open-Source-Ray-Framework ausführen. RayDP bietet APIs zum Ausführen von Spark auf Ray. Bei den vordefinierten Container-Images, die zum Erstellen eines Ray-Clusters in Vertex AI verfügbar sind, ist RayDP nicht vorinstalliert. Das bedeutet, dass Sie einen benutzerdefinierten Ray-Cluster in Vertex AI Image für Ihren Ray-Cluster in Vertex AI erstellen müssen, um RayDP-Anwendungen auf dem Ray-Cluster in Vertex AI auszuführen. Im folgenden Abschnitt wird erläutert, wie Sie ein benutzerdefiniertes RayDP-Image erstellen.
Benutzerdefiniertes Container-Image für Ray on Vertex AI erstellen
Verwenden Sie dieses Dockerfile, um ein benutzerdefiniertes Container-Image für Ray in Vertex AI zu erstellen, auf dem RayDP installiert ist.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Sie können das neueste vordefinierte Ray-Cluster in Vertex AI-Image zum Erstellen des benutzerdefinierten RayDP-Images verwenden. Sie können auch andere Python-Pakete installieren, die Sie in Ihren Anwendungen voraussichtlich verwenden werden. pyarrow==14.0
beruht auf einer Abhängigkeitseinschränkung von Ray 2.33.0.
Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push.
Sie müssen ein Docker-Repository in Artifact Registry erstellen, bevor Sie Ihr benutzerdefiniertes Image erstellen können (Informationen zum Erstellen und Konfigurieren Ihres Docker Repository finden Sie unter Mit Container-Images arbeiten). Nachdem Sie das Docker-Repository erstellt haben, erstellen Sie das benutzerdefinierte Container-Image und übertragen es mit dem Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Dabei gilt:
LOCATION
: Der Cloud Storage-Speicherort (z. B. us-central1), den Sie in Ihrer Artifact Registry erstellt haben.PROJECT_ID
: Ihre Google Cloud-Projekt-ID.DOCKER_REPOSITORY
: Der Name des Docker-Repositorys, das Sie erstellt haben.IMAGE_NAME
: Der Name Ihrer benutzerdefinierten Container-Images.
Ray-Cluster in Vertex AI erstellen
Verwenden Sie das im vorherigen Schritt erstellte benutzerdefinierte Container-Image, um einen Ray-Cluster in Vertex AI zu erstellen. Sie können das Vertex AI SDK für Python verwenden, um einen Ray-Cluster in Vertex AI zu erstellen.
Installieren Sie die erforderlichen Python-Bibliotheken,falls noch nicht geschehen.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Konfigurieren Sie Haupt- und Worker-Knoten und erstellen Sie den Cluster mit dem Vertex AI SDK für Python. Beispiel:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Dabei gilt:
CUSTOM_CONTAINER_IMAGE_URI
: Der URI des benutzerdefinierten Container-Images, das per Push in Artifact Registry übertragen wird.CLUSTER_NAME
: Der Name Ihres Ray-Clusters in Vertex AI.
Spark auf Ray-Cluster in Vertex AI
Bevor Sie Ihre Spark-Anwendung ausführen können, müssen Sie mit der RayDP API eine Spark-Sitzung erstellen. Sie können den Ray-Client dafür interaktiv oder die Ray Job API verwenden. Die Ray Job API wird empfohlen, insbesondere für Produktionsanwendungen und Anwendungen mit langer Laufzeit. Die RayDP API bietet Parameter zum Konfigurieren der Spark-Sitzung und unterstützt die Spark-Konfiguration. Weitere Informationen zur RayDP API zum Erstellen von Spark-Sitzungen finden Sie unter Knotenaffinität für Spark-Master-Akteure.
RayDP mit Ray-Client
Sie können eine Aufgabe oder Akteur von Ray verwenden, um einen Spark-Cluster und eine Sitzung im Ray-Cluster in Vertex AI zu erstellen. Ray Task oder Akteur ist erforderlich, um einen Ray-Client zu verwenden, um eine Spark-Sitzung im Ray-Cluster in Vertex AI zu erstellen. Der folgende Code zeigt, wie ein Ray Actor zum Erstellen einer Spark-Sitzung, zum Ausführen einer Spark-Anwendung und zum Beenden eines Spark-Clusters auf einem Ray-Cluster in Vertex AI mit RayDP verwendet werden kann.
Informationen zum interaktiven Herstellen einer Verbindung zum Ray-Cluster in Vertex AI finden Sie unter Über den Ray-Client eine Verbindung zu einem Ray-Cluster herstellen.
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP mit der Ray Job API
Der Ray-Client ist nützlich für kleine Experimente, die eine interaktive Verbindung mit dem Ray-Cluster erfordern. Zum Ausführen von Jobs mit langer Ausführungszeit und Produktionsjobs in einem Ray-Cluster wird die Ray Job API empfohlen. Dies gilt auch für die Ausführung von Spark-Anwendungen auf dem Ray-Cluster in Vertex AI.
Erstellen Sie ein Python-Skript, das Ihren Spark-Anwendungscode enthält. Beispiel:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Senden Sie den Job, um das Python-Skript mit der Ray Job API auszuführen. Beispiel:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Dabei gilt:
SCRIPT_NAME
: Der Dateiname des von Ihnen erstellten Skripts.
Cloud Storage-Dateien aus der Spark-Anwendung lesen
Es ist üblich, Datendateien in einem Google Cloud Storage-Bucket zu speichern. Es gibt mehrere Möglichkeiten, diese Dateien aus einer Spark-Anwendung zu lesen, die auf dem Ray-Cluster in Vertex AI ausgeführt wird. In diesem Abschnitt werden zwei Techniken zum Lesen von Cloud Storage-Dateien aus Spark-Anwendungen erläutert, die auf Ray Cluster in Vertex AI ausgeführt werden.
Google Cloud Storage-Connector verwenden
Sie können den Google Cloud Connector für Hadoop verwenden, um Dateien aus einem Cloud Storage-Bucket aus Ihrer Spark-Anwendung zu lesen. Dies geschieht mit einigen Konfigurationsparametern, wenn eine Spark-Sitzung mit RayDP erstellt wird. Der folgende Code zeigt, wie eine in einem Cloud Storage-Bucket gespeicherte CSV-Datei aus einer Spark-Anwendung im Ray-Cluster in Vertex AI gelesen werden kann.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Dabei gilt:
GCS_FILE_URI
: Der URI einer Datei, die in einem Cloud Storage-Bucket gespeichert ist. Beispiel: gs://my-bucket/my-file.csv.
Ray-Daten verwenden
Der Google Cloud-Connector bietet eine Möglichkeit, Dateien aus einem Google Cloud-Bucket zu lesen. Er kann für die meisten Anwendungsfälle ausreichend sein. Sie können Ray Data verwenden, um Dateien aus dem Google Cloud-Bucket zu lesen, wenn Sie die verteilte Verarbeitung von Ray zum Lesen von Daten verwenden müssen oder wenn beim Lesen der Google Cloud-Datei mit dem Google Cloud-Connector Probleme auftreten. Dies kann aufgrund von Java-Abhängigkeitskonflikten auftreten, wenn andere Anwendungsabhängigkeiten mithilfe von spark.jars.packages
oder spark.jars
dem Spark-Java-Klassenpfad hinzugefügt werden.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
Pyspark Pandas-UDF auf Ray-Cluster in Vertex AI
Die Pyspark Pandas-UDFs benötigen möglicherweise zusätzlichen Code, wenn Sie sie in Ihrer Spark-Anwendung verwenden, die in einem Ray-Cluster auf Vertex AI ausgeführt wird. Dies ist normalerweise erforderlich, wenn die Pandas-UDF eine Python-Bibliothek verwendet, die im Ray-Cluster in Vertex AI nicht verfügbar ist. Es ist möglich, die Python-Abhängigkeiten einer Anwendung zu verpacken, die die Laufzeitumgebung mit der Ray Job API verwendet. Wenn der Ray-Job an den Cluster gesendet wird, installiert Ray diese Abhängigkeiten in der Virtuelle Python-Umgebung, die zum Ausführen des Jobs erstellt wird. Die Pandas-UDFs verwenden jedoch nicht dieselbe virtuelle Umgebung. Stattdessen verwenden sie die Standard-Python-Systemumgebung. Wenn diese Abhängigkeit in der Systemumgebung nicht verfügbar ist, müssen Sie sie möglicherweise in Ihrer Pandas-UDF installieren. Im folgenden Beispiel muss die Bibliothek statsmodels
in der UDF installiert werden.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()