Vertex AI の Ray クラスタで Spark を実行する

RayDP Python ライブラリを使用すると、Ray クラスタで Spark を実行できます。このドキュメントでは、Ray on Vertex AI(Vertex AI の Ray クラスタ)での RayDP のインストール、構成、実行について説明します。

インストール

Ray on Vertex AI を使用すると、オープンソースの Ray フレームワークを使用してアプリケーションを実行できます。RayDP は、Ray で Spark を実行するための API を提供します。Vertex AI で Ray クラスタを作成するために使用できるビルド済みコンテナ イメージには RayDP がプリインストールされていません。つまり、Vertex AI の Ray クラスタで RayDP アプリケーションを実行するには、Vertex AI の Ray クラスタに対して Vertex AI のカスタム Ray クラスタ イメージを作成する必要があります。次のセクションでは、RayDP カスタム イメージを構築する方法について説明します。

Ray on Vertex AI カスタム コンテナ イメージを構築する

この Dockerfile を使用して、RayDP がインストールされた Ray on Vertex AI のカスタム コンテナ イメージを作成します。

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

RayDP カスタム イメージの作成には、Vertex AI ビルド済みイメージの最新の Ray クラスタを使用できます。アプリケーションで使用する予定の他の Python パッケージをインストールすることもできます。pyarrow==14.0 は、Ray 2.9.3 の依存関係の制約によるものです。

カスタム コンテナ イメージを構築して push する

カスタム イメージを構築するには、まず Artifact Registry に Docker リポジトリを作成する必要があります(Docker リポジトリを作成して構成する方法については、コンテナ イメージを操作するをご覧ください)。Docker リポジトリを作成したら、Dockerfile を使用してカスタム コンテナ イメージを構築して push します。

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

ここで

  • LOCATION: Artifact Registry で作成した Cloud Storage のロケーション(例: us-central1)。
  • PROJECT_ID: Google Cloud プロジェクト ID。
  • DOCKER_REPOSITORY: 作成した Docker リポジトリの名前。
  • IMAGE_NAME: カスタム コンテナ イメージの名前。

Vertex AI で Ray クラスタを作成する

前のステップで構築したカスタム コンテナ イメージを使用して、Vertex AI に Ray クラスタを作成します。Vertex AI で Ray クラスタを作成するには、Vertex AI SDK for Python を使用します。

まだインストールしていない場合は、必要な Python ライブラリをインストールします。

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Vertex AI SDK for Python を使用して、ヘッドノードとワーカーノードを構成し、クラスタを作成します。例:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

ここで

  • CUSTOM_CONTAINER_IMAGE_URI: Artifact Registry に push したカスタム コンテナ イメージの URI。
  • CLUSTER_NAME: Vertex AI の Ray クラスタの名前。

Vertex AI の Spark on Ray クラスタ

Spark アプリケーションを実行するには、まず RayDP API を使用して Spark セッションを作成する必要があります。この操作をインタラクティブに行うには、Ray クライアントを使用するか、Ray Job API を使用します。特に本番環境アプリケーションと長時間実行アプリケーションでは、Ray Job API を使用することをおすすめします。RayDP API は、Spark セッションを構成するパラメータを提供し、Spark 構成をサポートします。Spark セッションの作成に使用する RayDP API の詳細については、Spark マスター アクターのノード アフィニティをご覧ください。

Ray クライアントを使用した RayDP

Ray のタスクまたはアクターを使用して、Vertex AI の Ray クラスタに Spark クラスタとセッションを作成できます。Ray クライアントを使用して Vertex AI の Ray クラスタに Spark セッションを作成する場合は、Ray タスク(アクター)が必要です。次のコードは、Ray アクターを使用して Spark セッションを作成し、RayDP を使用して Vertex AI の Ray クラスタで Spark アプリケーションを実行し Spark クラスタを停止する方法を示しています。

Vertex AI の Ray クラスタにインタラクティブに接続する方法については、Ray クライアントを介して Ray クラスタに接続するをご覧ください。

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

Ray Job API を使用した RayDP

Ray クライアントは、Ray クラスタとのインタラクティブな接続を必要とする小規模なテストで活用できます。Ray クラスタで長時間実行ジョブと本番環境ジョブを実行する場合は、Ray Job API を使用することをおすすめします。これは、Vertex AI の Ray クラスタで実行される Spark アプリケーションについても該当します。

Spark アプリケーション コードを含む Python スクリプトを作成します。例:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Ray Job API を使用して Python スクリプトを実行するジョブを送信します。例:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

ここで

  • SCRIPT_NAME: 作成したスクリプトのファイル名。

Spark アプリケーションから Cloud Storage ファイルを読み取る

データファイルを Google Cloud Storage バケットに保存するのが一般的な手法です。Vertex AI の Ray クラスタで実行されている Spark アプリケーションからこれらのファイルを読み取る方法は複数あります。このセクションでは、Vertex AI の Ray クラスタで実行されている Spark アプリケーションから Cloud Storage ファイルを読み取るための 2 つの方法について説明します。

Google Cloud Storage コネクタを使用する

Google Cloud Connector for Hadoop を使用すると、Spark アプリケーションから Cloud Storage バケットのファイルを読み取ることができます。これは、RayDP を使用して Spark セッションを作成するときに、いくつかの構成パラメータを使用して行われます。次のコードは、Cloud Storage バケットに保存されている CSV ファイルを Vertex AI の Ray クラスタ上の Spark アプリケーションから読み取る方法を示しています。

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ここで

  • GCS_FILE_URI: Cloud Storage バケットに保存されているファイルの URI(例: gs://my-bucket/my-file.csv)。

Ray Data を使用する

Google Cloud Connector を使用すると、Google Cloud バケットからファイルを読み取ることができます。ほとんどのユースケースについては、これで十分です。Ray の分散処理を使用してデータを読み取る必要がある場合や、Google Cloud コネクタを使用して Google Cloud ファイルを読み取る際に問題が発生した場合は、Ray Data を使用して Google Cloud バケットからファイルを読み取ることができます。この問題は、spark.jars.packages または spark.jars を使用して他のアプリケーションの依存関係が Spark Java クラスパスに追加されたときに Java の依存関係の競合が発生した場合に生じる可能性があります。

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Vertex AI の Ray クラスタでの Pyspark Pandas UDF

Pyspark Pandas UDF を Vertex AI の Ray クラスタで実行されている Spark アプリケーションで使用すると、追加のコードが必要になる場合があります。これは通常、Pandas UDF が Vertex AI の Ray クラスタで使用できない Python ライブラリを使用している場合に必要です。Ray Job API でランタイム環境を使用してアプリケーションの Python 依存関係をパッケージ化できます。Ray ジョブがクラスタに送信されると、Ray はジョブの実行用に作成した Python 仮想環境にこれらの依存関係をインストールします。ただし、Pandas UDF は同じ仮想環境を使用しません。代わりに、デフォルトの Python System 環境を使用します。その依存関係が System 環境で使用できない場合は、Pandas UDF 内にインストールすることが必要になる可能性があります。次の例では、statsmodels ライブラリを UDF 内にインストールする必要があります。

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()