Library Python RayDP memungkinkan Anda menjalankan Spark di cluster Ray. Dokumen ini membahas cara menginstal, mengonfigurasi, dan menjalankan RayDP di Ray on Vertex AI (cluster Ray di Vertex AI).
Penginstalan
Ray di Vertex AI memungkinkan pengguna menjalankan aplikasi mereka menggunakan framework Ray open source. RayDP menyediakan API untuk menjalankan Spark di Ray. Image container bawaan yang tersedia untuk membuat cluster Ray di Vertex AI tidak dilengkapi dengan RayDP yang sudah diinstal sebelumnya, yang berarti Anda harus membuat image cluster Ray kustom di Vertex AI untuk cluster Ray di Vertex AI guna menjalankan aplikasi RayDP di cluster Ray di Vertex AI. Bagian berikut menjelaskan cara mem-build image kustom RayDP.
Mem-build image container kustom Ray on Vertex AI
Gunakan dockerfile ini untuk membuat image container kustom untuk Ray di Vertex AI yang telah menginstal RayDP.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Anda dapat menggunakan cluster Ray terbaru pada image bawaan Vertex AI untuk membuat image kustom RayDP. Anda juga dapat menginstal paket Python lainnya
yang diperkirakan akan digunakan dalam aplikasi. pyarrow==14.0
disebabkan
oleh batasan dependensi Ray 2.33.0.
Membangun dan mengirim image container kustom
Anda harus membuat Repositori Docker di Artifact Registry sebelum dapat mem-build image kustom (lihat Menggunakan image container untuk mengetahui cara membuat dan mengonfigurasi repositori Docker). Setelah membuat repositori Docker, build dan kirim image container kustom menggunakan Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Dengan keterangan:
LOCATION
: Lokasi Cloud Storage (misalnya, us-central1) yang Anda buat di Artifact Registry.PROJECT_ID
: ID project Google Cloud Anda.DOCKER_REPOSITORY
: Nama repositori docker yang Anda buat.IMAGE_NAME
: Nama image penampung kustom Anda.
Membuat cluster Ray di Vertex AI
Gunakan image container kustom yang dibuat di langkah sebelumnya untuk membuat cluster Ray di Vertex AI. Anda dapat menggunakan Vertex AI SDK untuk Python guna membuat cluster Ray di Vertex AI.
Jika Anda belum melakukannya,instal library Python yang diperlukan.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Konfigurasikan node Head dan Worker, lalu buat cluster menggunakan Vertex AI SDK untuk Python. Contoh:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Dengan keterangan:
CUSTOM_CONTAINER_IMAGE_URI
: URI image container kustom yang di-push ke Artifact Registry.CLUSTER_NAME
: Nama cluster Ray Anda di Vertex AI.
Spark di cluster Ray di Vertex AI
Sebelum dapat menjalankan aplikasi Spark, Anda harus membuat sesi Spark menggunakan RayDP API. Anda dapat menggunakan klien Ray untuk melakukannya secara interaktif atau menggunakan API tugas Ray. Ray job API direkomendasikan, terutama untuk produksi dan aplikasi yang berjalan lama. RayDP API menyediakan parameter untuk mengonfigurasi sesi Spark, serta mendukung Konfigurasi Spark. Pelajari RayDP API lebih lanjut untuk membuat Sesi Spark, lihat Afinitas node aktor master Spark.
RayDP dengan klien Ray
Anda dapat menggunakan Ray Task atau Actor untuk membuat cluster dan sesi Spark di cluster Ray di Vertex AI. Tugas Ray, atau Aktor, harus menggunakan Ray Client untuk membuat sesi Spark di cluster Ray di Vertex AI. Kode berikut menunjukkan cara Ray Actor dapat digunakan untuk membuat Sesi Spark, menjalankan aplikasi Spark, dan menghentikan cluster Spark di cluster Ray di Vertex AI menggunakan RayDP.
Untuk mengetahui cara terhubung secara interaktif ke cluster Ray di Vertex AI, lihat Terhubung ke cluster Ray melalui Ray Client
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP dengan Ray Job API
Klien Ray berguna untuk eksperimen kecil yang memerlukan koneksi interaktif dengan cluster Ray. Ray Job API adalah cara yang direkomendasikan untuk menjalankan tugas produksi dan tugas yang berjalan lama di cluster Ray. Hal ini juga berlaku untuk menjalankan aplikasi Spark di cluster Ray di Vertex AI.
Buat skrip Python yang berisi kode aplikasi Spark Anda. Contoh:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Kirim tugas untuk menjalankan skrip python menggunakan Ray Job API. Contoh:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Dengan keterangan:
SCRIPT_NAME
: Nama file skrip yang Anda buat.
Membaca file Cloud Storage dari aplikasi Spark
Menyimpan file data di bucket Google Cloud Storage merupakan praktik yang umum dilakukan. Ada beberapa cara untuk membaca file ini dari aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Bagian ini menjelaskan dua teknik untuk membaca file Cloud Storage dari aplikasi Spark yang berjalan di Cluster Ray di Vertex AI.
Menggunakan Konektor Google Cloud Storage
Anda dapat menggunakan Konektor Google Cloud untuk Hadoop guna membaca file dari bucket Cloud Storage dari aplikasi Spark. Hal ini dilakukan menggunakan beberapa parameter konfigurasi saat sesi Spark dibuat menggunakan RayDP. Kode berikut menunjukkan cara file CSV yang disimpan di bucket Cloud Storage dapat dibaca dari aplikasi Spark di cluster Ray di Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Dengan keterangan:
GCS_FILE_URI
: URI file yang disimpan di bucket Cloud Storage. Misalnya: gs://my-bucket/my-file.csv.
Menggunakan data Ray
Google Cloud Connector menyediakan cara untuk membaca file dari bucket Google Cloud dan mungkin sudah cukup untuk sebagian besar kasus penggunaan. Sebaiknya gunakan Ray Data untuk membaca file dari bucket Google Cloud saat Anda perlu menggunakan pemrosesan terdistribusi Ray untuk membaca data, atau saat Anda mengalami masalah saat membaca file Google Cloud dengan konektor Google Google Cloud, yang mungkin terjadi karena konflik dependensi Java saat beberapa dependensi aplikasi lain ditambahkan ke classpath Java Spark menggunakan spark.jars.packages
atau spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF Pyspark Pandas di cluster Ray di Vertex AI
UDF Pandas Pyspark kadang-kadang mungkin memerlukan kode tambahan saat Anda menggunakannya dalam aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Hal ini biasanya
diperlukan saat UDF Pandas menggunakan library Python yang tidak tersedia di
cluster Ray di Vertex AI. Anda dapat memaketkan
dependensi Python
aplikasi menggunakan Runtime Environment dengan API tugas Ray dan saat
tugas Ray dikirim ke cluster, Ray akan menginstal dependensi tersebut di
lingkungan virtual Python yang dibuatnya untuk menjalankan tugas. Namun,
UDF Pandas
tidak menggunakan lingkungan virtual yang sama. Sebagai gantinya, mereka menggunakan lingkungan Sistem Python default. Jika dependensi tersebut tidak tersedia di lingkungan
System, Anda mungkin perlu menginstalnya dalam UDF Pandas. Dalam
contoh berikut, library statsmodels
harus diinstal dalam UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()