RayDP Python ライブラリを使用すると、Ray クラスタで Spark を実行できます。このドキュメントでは、Ray on Vertex AI(Vertex AI の Ray クラスタ)での RayDP のインストール、構成、実行について説明します。
インストール
Ray on Vertex AI を使用すると、オープンソースの Ray フレームワークを使用してアプリケーションを実行できます。RayDP は、Ray で Spark を実行するための API を提供します。Vertex AI で Ray クラスタを作成するために使用できるビルド済みコンテナ イメージには、RayDP がプリインストールされていません。つまり、Vertex AI の Ray クラスタで RayDP アプリケーションを実行するには、Vertex AI の Ray クラスタに対して Vertex AI のカスタム Ray クラスタ イメージを作成する必要があります。次のセクションでは、RayDP カスタム イメージを構築する方法について説明します。
Ray on Vertex AI カスタム コンテナ イメージを構築する
この Dockerfile を使用して、RayDP がインストールされた Ray on Vertex AI のカスタム コンテナ イメージを作成します。
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
RayDP カスタム イメージの作成には、Vertex AI ビルド済みイメージの最新の Ray クラスタを使用できます。アプリケーションで使用する予定の他の Python パッケージをインストールすることもできます。pyarrow==14.0
は、Ray 2.9.3 の依存関係の制約によるものです。
カスタム コンテナ イメージを構築して push する
カスタム イメージを構築する前に、Artifact Registry に Docker リポジトリを作成します(Docker リポジトリを作成して構成する方法については、コンテナ イメージを操作するをご覧ください)。Docker リポジトリを作成したら、Dockerfile を使用してカスタム コンテナ イメージを構築して push します。
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
ここで
LOCATION
: Artifact Registry で作成した Cloud Storage のロケーション(例: us-central1)。PROJECT_ID
: 実際の Google Cloud プロジェクト ID。DOCKER_REPOSITORY
: 作成した Docker リポジトリの名前。IMAGE_NAME
: カスタム コンテナ イメージの名前。
Vertex AI で Ray クラスタを作成する
前のステップで構築したカスタム コンテナ イメージを使用して、Vertex AI に Ray クラスタを作成します。Vertex AI で Ray クラスタを作成するには、Vertex AI SDK for Python を使用します。
必要な Python ライブラリをまだインストールしていない場合は、インストールします。
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Vertex AI SDK for Python を使用して、ヘッドノードとワーカーノードを構成し、クラスタを作成します。例:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
ここで
CUSTOM_CONTAINER_IMAGE_URI
: Artifact Registry に push したカスタム コンテナ イメージの URI。CLUSTER_NAME
: Vertex AI の Ray クラスタの名前。
Vertex AI の Ray クラスタでの Spark
Spark アプリケーションを実行する前に、RayDP API を使用して Spark セッションを作成します。この操作をインタラクティブに行うには、Ray クライアントを使用するか、Ray Job API を使用します。特に本番環境アプリケーションと長時間実行アプリケーションでは、Ray Job API を使用することをおすすめします。RayDP API は、Spark セッションを構成するパラメータを提供し、Spark 構成をサポートします。Spark セッションの作成に使用する RayDP API の詳細については、Spark マスター アクターのノード アフィニティをご覧ください。
Ray クライアントを使用した RayDP
Ray のタスクまたはアクターを使用して、Vertex AI の Ray クラスタに Spark クラスタとセッションを作成できます。Ray クライアントを使用して Vertex AI の Ray クラスタに Spark セッションを作成する場合は、Ray タスク(アクター)が必要です。次のコードは、Ray アクターが RayDP を使用して Vertex AI の Ray クラスタに Spark セッションを作成し、Spark アプリケーションを実行し、Spark クラスタを停止する方法を示しています。
Vertex AI の Ray クラスタにインタラクティブに接続するには、Ray クライアントを介して Ray クラスタに接続するをご覧ください。
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
Ray Job API を使用した RayDP
Ray クライアントは、Ray クラスタとのインタラクティブな接続を必要とする小規模なテストで活用できます。Ray クラスタで長時間実行ジョブと本番環境ジョブを実行する場合は、Ray Job API を使用することをおすすめします。これは、Vertex AI の Ray クラスタで実行される Spark アプリケーションについても該当します。
Spark アプリケーション コードを含む Python スクリプトを作成します。例:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Ray Job API を使用して Python スクリプトを実行するジョブを送信します。例:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
ここで
SCRIPT_NAME
: 作成したスクリプトのファイル名。
Spark アプリケーションから Cloud Storage ファイルを読み取る
データファイルを Google Cloud Storage バケットに保存するのが一般的な手法です。このファイルは、Vertex AI の Ray クラスタで実行されている Spark アプリケーションから複数の方法で読み取ることができます。このセクションでは、Vertex AI の Ray クラスタで実行されている Spark アプリケーションから Cloud Storage ファイルを読み取るための 2 つの方法について説明します。
Google Cloud Storage コネクタを使用する
Google Cloud Connector for Hadoop を使用すると、Spark アプリケーションから Cloud Storage バケットのファイルを読み取ることができます。RayDP を使用して Spark セッションを作成したら、いくつかの構成パラメータを使用してファイルを読み取ることができます。次のコードは、Cloud Storage バケットに保存されている CSV ファイルを、Vertex AI の Ray クラスタ上の Spark アプリケーションから読み取る方法を示しています。
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
ここで
GCS_FILE_URI
: Cloud Storage バケットに保存されているファイルの URI(例: gs://my-bucket/my-file.csv)。
Ray Data を使用する
Google Cloud コネクタを使用すると、 Google Cloudバケットからファイルを読み取ることができます。ほとんどのユースケースについては、これで十分です。Ray の分散処理を使用してデータを読み取る必要がある場合や、 Google Cloud コネクタを使用してGoogle Cloud ファイルを読み取る際に問題が発生した場合は、Ray Data を使用して Google Cloud バケットからファイルを読み取ることができます。これは、spark.jars.packages
または spark.jars
を使用して他のアプリケーションの依存関係が Spark Java クラスパスに追加されたときに、Java の依存関係の競合が原因で発生する可能性があります。
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
Vertex AI の Ray クラスタでの Pyspark Pandas UDF
Vertex AI の Ray クラスタで実行されている Spark アプリケーションで Pyspark Pandas UDF を使用する場合には、追加のコードが必要になることがあります。これは通常、Pandas UDF が Vertex AI の Ray クラスタで使用できない Python ライブラリを使用している場合に必要です。Ray Job API でランタイム環境を使用して、アプリケーションの Python 依存関係をパッケージ化できます。Ray ジョブをクラスタに送信すると、Ray はジョブの実行用に作成した Python 仮想環境にこれらの依存関係をインストールします。ただし、Pandas UDF は同じ仮想環境を使用しません。代わりに、デフォルトの Python System 環境を使用します。その依存関係が System 環境で使用できない場合は、Pandas UDF 内にインストールすることが必要になる可能性があります。次の例では、statsmodels
ライブラリを UDF 内にインストールします。
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()