Spark auf Ray-Cluster in Vertex AI ausführen

Mit der Python-Bibliothek RayDP kann Spark auf einem Ray-Cluster ausgeführt werden. In diesem Dokument wird das Installieren, Konfigurieren und Ausführen von RayDP auf Ray on Vertex AI (Ray-Cluster in Vertex AI) behandelt.

Installation

Mit Ray on Vertex AI können Nutzer ihre Anwendungen mit dem Open-Source-Ray-Framework ausführen. RayDP bietet APIs zum Ausführen von Spark auf Ray. Bei den vordefinierten Container-Images, die zum Erstellen eines Ray-Clusters in Vertex AI verfügbar sind, ist RayDP nicht vorinstalliert. Das bedeutet, dass Sie einen benutzerdefinierten Ray-Cluster in Vertex AI Image für Ihren Ray-Cluster in Vertex AI erstellen müssen, um RayDP-Anwendungen auf dem Ray-Cluster in Vertex AI auszuführen. Im folgenden Abschnitt wird erläutert, wie Sie ein benutzerdefiniertes RayDP-Image erstellen.

Benutzerdefiniertes Container-Image für Ray on Vertex AI erstellen

Verwenden Sie dieses Dockerfile, um ein benutzerdefiniertes Container-Image für Ray in Vertex AI zu erstellen, auf dem RayDP installiert ist.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Sie können das neueste vordefinierte Ray-Cluster in Vertex AI-Image zum Erstellen des benutzerdefinierten RayDP-Images verwenden. Sie können auch andere Python-Pakete installieren, die Sie in Ihren Anwendungen voraussichtlich verwenden werden. pyarrow==14.0 beruht auf einer Abhängigkeitseinschränkung von Ray 2.9.3.

Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push.

Sie müssen ein Docker-Repository in Artifact Registry erstellen, bevor Sie Ihr benutzerdefiniertes Image erstellen können (Informationen zum Erstellen und Konfigurieren Ihres Docker Repository finden Sie unter Mit Container-Images arbeiten). Nachdem Sie das Docker-Repository erstellt haben, erstellen Sie das benutzerdefinierte Container-Image und übertragen es mit dem Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Dabei gilt:

  • LOCATION: Der Cloud Storage-Speicherort (z. B. us-central1), den Sie in Ihrer Artifact Registry erstellt haben.
  • PROJECT_ID: Ihre Google Cloud-Projekt-ID.
  • DOCKER_REPOSITORY: Der Name des Docker-Repositorys, das Sie erstellt haben.
  • IMAGE_NAME: Der Name Ihrer benutzerdefinierten Container-Images.

Ray-Cluster in Vertex AI erstellen

Verwenden Sie das im vorherigen Schritt erstellte benutzerdefinierte Container-Image, um einen Ray-Cluster in Vertex AI zu erstellen. Sie können das Vertex AI SDK für Python verwenden, um einen Ray-Cluster in Vertex AI zu erstellen.

Installieren Sie die erforderlichen Python-Bibliotheken,falls noch nicht geschehen.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Konfigurieren Sie Haupt- und Worker-Knoten und erstellen Sie den Cluster mit dem Vertex AI SDK für Python. Beispiel:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Dabei gilt:

  • CUSTOM_CONTAINER_IMAGE_URI: Der URI des benutzerdefinierten Container-Images, das per Push in Artifact Registry übertragen wird.
  • CLUSTER_NAME: Der Name Ihres Ray-Clusters in Vertex AI.

Spark auf Ray-Cluster in Vertex AI

Bevor Sie Ihre Spark-Anwendung ausführen können, müssen Sie mit der RayDP API eine Spark-Sitzung erstellen. Sie können den Ray-Client dafür interaktiv oder die Ray Job API verwenden. Die Ray Job API wird empfohlen, insbesondere für Produktionsanwendungen und Anwendungen mit langer Laufzeit. Die RayDP API bietet Parameter zum Konfigurieren der Spark-Sitzung und unterstützt die Spark-Konfiguration. Weitere Informationen zur RayDP API zum Erstellen von Spark-Sitzungen finden Sie unter Knotenaffinität für Spark-Master-Akteure.

RayDP mit Ray-Client

Sie können eine Aufgabe oder Akteur von Ray verwenden, um einen Spark-Cluster und eine Sitzung im Ray-Cluster in Vertex AI zu erstellen. Ray Task oder Akteur ist erforderlich, um einen Ray-Client zu verwenden, um eine Spark-Sitzung im Ray-Cluster in Vertex AI zu erstellen. Der folgende Code zeigt, wie ein Ray Actor zum Erstellen einer Spark-Sitzung, zum Ausführen einer Spark-Anwendung und zum Beenden eines Spark-Clusters auf einem Ray-Cluster in Vertex AI mit RayDP verwendet werden kann.

Informationen zum interaktiven Herstellen einer Verbindung zum Ray-Cluster in Vertex AI finden Sie unter Über den Ray-Client eine Verbindung zu einem Ray-Cluster herstellen.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP mit der Ray Job API

Der Ray-Client ist nützlich für kleine Experimente, die eine interaktive Verbindung mit dem Ray-Cluster erfordern. Zum Ausführen von Jobs mit langer Ausführungszeit und Produktionsjobs in einem Ray-Cluster wird die Ray Job API empfohlen. Dies gilt auch für die Ausführung von Spark-Anwendungen auf dem Ray-Cluster in Vertex AI.

Erstellen Sie ein Python-Skript, das Ihren Spark-Anwendungscode enthält. Beispiel:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Senden Sie den Job, um das Python-Skript mit der Ray Job API auszuführen. Beispiel:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Dabei gilt:

  • SCRIPT_NAME: Der Dateiname des von Ihnen erstellten Skripts.

Cloud Storage-Dateien aus der Spark-Anwendung lesen

Es ist üblich, Datendateien in einem Google Cloud Storage-Bucket zu speichern. Es gibt mehrere Möglichkeiten, diese Dateien aus einer Spark-Anwendung zu lesen, die auf dem Ray-Cluster in Vertex AI ausgeführt wird. In diesem Abschnitt werden zwei Techniken zum Lesen von Cloud Storage-Dateien aus Spark-Anwendungen erläutert, die auf Ray Cluster in Vertex AI ausgeführt werden.

Google Cloud Storage-Connector verwenden

Sie können den Google Cloud Connector für Hadoop verwenden, um Dateien aus einem Cloud Storage-Bucket aus Ihrer Spark-Anwendung zu lesen. Dies geschieht mit einigen Konfigurationsparametern, wenn eine Spark-Sitzung mit RayDP erstellt wird. Der folgende Code zeigt, wie eine in einem Cloud Storage-Bucket gespeicherte CSV-Datei aus einer Spark-Anwendung im Ray-Cluster in Vertex AI gelesen werden kann.

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Dabei gilt:

  • GCS_FILE_URI: Der URI einer Datei, die in einem Cloud Storage-Bucket gespeichert ist. Beispiel: gs://my-bucket/my-file.csv.

Ray-Daten verwenden

Der Google Cloud-Connector bietet eine Möglichkeit, Dateien aus einem Google Cloud-Bucket zu lesen. Er kann für die meisten Anwendungsfälle ausreichend sein. Sie können Ray Data verwenden, um Dateien aus dem Google Cloud-Bucket zu lesen, wenn Sie die verteilte Verarbeitung von Ray zum Lesen von Daten verwenden müssen oder wenn beim Lesen der Google Cloud-Datei mit dem Google Cloud-Connector Probleme auftreten. Dies kann aufgrund von Java-Abhängigkeitskonflikten auftreten, wenn andere Anwendungsabhängigkeiten mithilfe von spark.jars.packages oder spark.jars dem Spark-Java-Klassenpfad hinzugefügt werden.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Pyspark Pandas-UDF auf Ray-Cluster in Vertex AI

Die Pyspark Pandas-UDFs benötigen möglicherweise zusätzlichen Code, wenn Sie sie in Ihrer Spark-Anwendung verwenden, die in einem Ray-Cluster auf Vertex AI ausgeführt wird. Dies ist normalerweise erforderlich, wenn die Pandas-UDF eine Python-Bibliothek verwendet, die im Ray-Cluster in Vertex AI nicht verfügbar ist. Es ist möglich, die Python-Abhängigkeiten einer Anwendung zu verpacken, die die Laufzeitumgebung mit der Ray Job API verwendet. Wenn der Ray-Job an den Cluster gesendet wird, installiert Ray diese Abhängigkeiten in der Virtuelle Python-Umgebung, die zum Ausführen des Jobs erstellt wird. Die Pandas-UDFs verwenden jedoch nicht dieselbe virtuelle Umgebung. Stattdessen verwenden sie die Standard-Python-Systemumgebung. Wenn diese Abhängigkeit in der Systemumgebung nicht verfügbar ist, müssen Sie sie möglicherweise in Ihrer Pandas-UDF installieren. Im folgenden Beispiel muss die Bibliothek statsmodels in der UDF installiert werden.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()