Vertex AI 기반 Ray 클러스터에서 Spark 실행

RayDP Python 라이브러리를 사용하면 Ray 클러스터에서 Spark를 실행할 수 있습니다. 이 문서에서는 Vertex AI 기반 Ray에서 RayDP(Vertex AI 기반 Ray 클러스터)를 설치, 구성, 실행하는 방법에 대해 설명합니다.

설치

Vertex AI 기반 Ray를 사용하면 오픈소스 Ray 프레임워크를 사용하여 애플리케이션을 실행할 수 있습니다. RayDP는 Ray에서 Spark를 실행하기 위한 API를 제공합니다. Vertex AI 기반 Ray 클러스터를 만드는 데 사용할 수 있는 사전 빌드된 컨테이너 이미지는 RayDP가 사전 설치된 상태로 제공되지 않습니다. 즉, Vertex AI 기반 Ray 클러스터에서 RayDP 애플리케이션을 실행하려면 Vertex AI 기반 Ray 클러스터에 대해 Vertex AI 기반 커스텀 Ray 클러스터를 만들어야 합니다. 다음 섹션에서는 RayDP 커스텀 이미지를 빌드하는 방법을 설명합니다.

Vertex AI 기반 RAY 커스텀 컨테이너 이미지 빌드

이 Dockerfile을 사용하여 RayDP가 설치되어 있는 Vertex AI 기반 Ray 커스텀 컨테이너 이미지를 만듭니다.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Vertex AI 기반 최신 Ray 클러스터로 사전 빌드된 이미지를 사용해서 RayDP 커스텀 이미지를 만들 수 있습니다. 또한 애플리케이션에 사용할 것으로 예상되는 다른 Python 패키지를 설치할 수도 있습니다. pyarrow==14.0은 Ray 2.33.0의 종속성 제약조건입니다.

커스텀 컨테이너 이미지 빌드 및 푸시

커스텀 이미지를 빌드하려면 먼저 Artifact Registry에서 Docker 저장소를 만들어야 합니다. Docker 저장소 만들기 및 구성 방법은 컨테이너 이미지 작업을 참조하세요. Docker 저장소를 만든 후에는 Dockerfile을 사용하여 커스텀 컨테이너 이미지를 빌드하고 푸시합니다.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

각 항목의 의미는 다음과 같습니다.

  • LOCATION: Artifact Registry에서 만든 Cloud Storage 위치(예: us-central1)입니다.
  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.
  • DOCKER_REPOSITORY: 생성한 Docker 저장소의 이름입니다.
  • IMAGE_NAME: 커스텀 컨테이너 이미지의 이름입니다.

Vertex AI 기반 Ray 클러스터 만들기

Vertex AI 기반 Ray 클러스터를 만들려면 이전 단계에서 빌드한 커스텀 컨테이너 이미지를 사용합니다. Vertex AI 기반 Ray 클러스터를 만들려면 Vertex AI SDK for Python을 사용할 수 있습니다.

아직 설치하지 않았으면 필요한 Python 라이브러리를 설치하세요.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Vertex AI SDK for Python을 사용하여 헤드 및 작업자 노드를 구성하고 클러스터를 만듭니다. 예를 들면 다음과 같습니다.

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

각 항목의 의미는 다음과 같습니다.

  • CUSTOM_CONTAINER_IMAGE_URI: Artifact Registry에 푸시된 커스텀 컨테이너 이미지의 URI입니다.
  • CLUSTER_NAME: Vertex AI 기반 Ray 클러스터 이름입니다.

Vertex AI 기반 Ray 클러스터의 Spark

Spark 애플리케이션을 실행하려면 먼저 RayDP API를 사용해서 Spark 세션을 만들어야 합니다. 이렇게 하려면 Ray 클라이언트를 대화형으로 이용하거나 Ray 작업 API를 이용하면 됩니다. 프로덕션 및 장기 실행 애플리케이션의 경우에는 특히 Ray 작업 API가 권장됩니다. RayDP API는 Spark 세션 구성은 물론 Spark 구성 지원을 위한 매개변수를 제공합니다. Spark 세션 만들기를 위한 RayDP API에 대한 자세한 내용은 Spark 마스터 액터 노드 어피니티를 참조하세요.

Ray 클라이언트와 RayDP

Ray 태스크 또는 액터를 사용하여 Vertex AI 기반 Ray 클러스터에 Spark 클러스터 및 세션을 만들 수 있습니다. Ray 태스크 또는 액터는 Vertex AI 기반 Ray 클러스터에서 Spark 세션을 만들기 위해 Ray 클라이언트를 사용해야 합니다. 다음 코드에서는 RayDP를 사용하여 Vertex AI 기반 Ray 클러스터에서 Spark 세션 만들기, Spark 애플리케이션 실행, Spark 클러스터 중지를 위해 Ray 액터를 사용하는 방법을 보여줍니다.

Vertex AI 기반 Ray 클러스터에 대화형으로 연결하는 방법은 Ray 클라이언트를 통해 Ray 클러스터에 연결을 참조하세요.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

Ray 작업 API와 RayDP

Ray 클라이언트는 Ray 클러스터와 대화형 연결이 필요한 소규모 실험에 유용합니다. Ray 작업 API는 Ray 클러스터에서 장기 실행 및 프로덕션 작업을 실행하는 데 권장되는 방법입니다. 이는 또한 Vertex AI 기반 Ray 클러스터에서 Spark 애플리케이션을 실행하는 데에도 적용됩니다.

Spark 애플리케이션 코드가 포함된 Python 스크립트를 만듭니다. 예를 들면 다음과 같습니다.

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Ray 작업 API를 사용하여 Python 스크립트를 실행하는 작업을 제출합니다. 예를 들면 다음과 같습니다.

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

각 항목의 의미는 다음과 같습니다.

  • SCRIPT_NAME: 생성한 스크립트의 파일 이름입니다.

Spark 애플리케이션에서 Cloud Storage 파일 읽기

데이터 파일은 Google Cloud Storage 버킷에 저장하는 것이 일반적입니다. Vertex AI 기반 Ray 클러스터에서 실행 중인 Spark 애플리케이션에서는 이러한 파일을 여러 방법으로 읽을 수 있습니다. 이 섹션에서는 Vertex AI 기반 Ray 클러스터에서 실행 중인 Spark 애플리케이션으로부터 Cloud Storage 파일을 읽는 두 가지 기법을 설명합니다.

Google Cloud Storage 커넥터 사용

Hadoop용 Google Cloud 커넥터를 사용하여 Spark 애플리케이션의 Cloud Storage 버킷에서 파일을 읽을 수 있습니다. 이 작업은 RayDP를 사용하여 Spark 세션을 만들 때 몇 가지 구성 매개변수를 사용하여 수행됩니다. 다음 코드는 Vertex AI 기반 Ray 클러스터의 Spark 애플리케이션에서 Cloud Storage 버킷에 저장된 CSV 파일을 읽는 방법을 보여줍니다.

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

각 항목의 의미는 다음과 같습니다.

  • GCS_FILE_URI: Cloud Storage 버킷에 저장된 파일의 URI입니다. 예를 들면 gs://my-bucket/my-file.csv입니다.

Ray 데이터 사용

Google Cloud 커넥터는 Google Cloud 버킷에서 파일을 읽는 방법을 제공하며, 대부분의 사용 사례에 충분합니다. 데이터 읽기를 위해 Ray의 분산 처리를 사용해야 하는 경우 또는 Google Cloud 커넥터로 Google Cloud 파일을 읽을 때 문제가 발생하는 경우(spark.jars.packages 또는 spark.jars를 사용해서 Spark Java 클래스 경로에 다른 애플리케이션 종속 항목이 추가되었을 때 Java 종속성 충돌로 인해 발생 가능한 경우)에는 Ray 데이터를 사용하여 Google Cloud 버킷에서 파일을 읽어야 할 수 있습니다.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Vertex AI 기반 Ray 클러스터의 Pyspark Pandas UDF

Pyspark Pandas UDF는 Vertex AI 기반 Ray 클러스터에서 실행 중인 Spark 애플리케이션에서 사용할 경우 때에 따라 추가 코드가 필요할 수 있습니다. Vertex AI 기반 Ray 클러스터에서 사용할 수 없는 Python 라이브러리가 Pandas UDF에 사용되는 경우 일반적으로 필요합니다. Ray 작업 API와 함께 런타임 환경을 사용해서 애플리케이션의 Python 종속 항목을 패키지화할 수 있습니다. Ray 작업을 클러스터에 제출하면 Ray가 작업 실행을 위해 만드는 Python 가상 환경에 이러한 종속 항목을 설치합니다. 하지만 Pandas UDF는 동일한 가상 환경을 사용하지 않습니다. 대신 기본 Python 시스템 환경을 사용합니다. 시스템 환경에서 이러한 종속성이 제공되지 않으면 Pandas UDF 내에 설치해야 합니다. 다음 예시에서는 statsmodels 라이브러리를 UDF 내에 설치해야 합니다.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()