Library Python RayDP membuatnya dapat digunakan untuk menjalankan Spark pada gugus Ray. Dokumen ini membahas tentang penginstalan, mengonfigurasi dan menjalankan RayDP pada Ray pada Vertex AI (cluster Ray pada Vertex AI).
Penginstalan
Ray di Vertex AI memungkinkan pengguna menjalankan aplikasi mereka framework Ray. RayDP menyediakan API untuk menjalankan Spark di Ray. Image container bawaan yang tersedia untuk membuat gugus Ray Vertex AI tidak dilengkapi dengan RayDP yang sudah diinstal sebelumnya, yang berarti Anda perlu membuat cluster Ray kustom di Vertex AI gambar untuk cluster Ray Anda di Vertex AI untuk menjalankan aplikasi RayDP di Gugus Ray di Vertex AI. Bagian berikut ini menjelaskan bagaimana RayDP image kustom dapat dibuat.
Image container kustom Ray on Vertex AI
Gunakan dockerfile ini untuk membuat image container kustom untuk Ray di Vertex AI yang telah menginstal RayDP.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Anda dapat menggunakan cluster Ray terbaru pada image bawaan Vertex AI untuk
yang membuat image kustom RayDP. Anda juga dapat menginstal
paket Python lainnya
yang diperkirakan akan digunakan
dalam aplikasi Anda. pyarrow==14.0
jatuh tempo
ke batasan dependensi Ray 2.9.3.
Membangun dan mengirim image container kustom
Anda perlu membuat Repositori Docker di Artifact Registry sebelum Anda dapat membangun image kustom (lihat Menggunakan image container untuk mengetahui cara membuat dan mengonfigurasi repositori Docker Anda). Setelah Anda memiliki membuat, membangun, dan mengirim image container kustom menggunakan dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Dengan keterangan:
LOCATION
: Lokasi Cloud Storage (misalnya, us-central1) yang Anda yang dibuat di Artifact Registry.PROJECT_ID
: ID project Google Cloud Anda.DOCKER_REPOSITORY
: Nama repositori Docker yang Anda buat.IMAGE_NAME
: Nama image container kustom Anda.
Membuat cluster Ray di Vertex AI
Gunakan image container kustom yang dibuat pada langkah sebelumnya untuk membuat Ray cluster di Vertex AI. Anda dapat menggunakan Vertex AI SDK untuk Python pembuatan gugus Ray pada Vertex AI.
Jika Anda belum melakukannya,instal library Python yang diperlukan.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Konfigurasi node Head dan Worker serta buat cluster menggunakan Vertex AI SDK untuk Python. Contoh:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Dengan keterangan:
CUSTOM_CONTAINER_IMAGE_URI
: URI image container kustom yang dikirim ke dan Artifact Registry.CLUSTER_NAME
: Nama cluster Ray Anda di Vertex AI.
Spark on Ray cluster pada Vertex AI
Sebelum dapat menjalankan aplikasi Spark, Anda perlu membuat sesi Spark menggunakan RayDP API. Anda dapat menggunakan klien Ray untuk melakukan ini secara interaktif atau menggunakan Ray job API. Ray job API direkomendasikan, terutama untuk produksi dan aplikasi yang berjalan lama. RayDP API menyediakan parameter untuk mengonfigurasi sesi Spark, serta mendukung Konfigurasi Spark. Pelajari lebih lanjut RayDP API untuk membuat Sesi Spark. Afinitas node pelaku master Spark.
RayDP dengan klien Ray
Anda bisa menggunakan Ray Tugas atau Pelaku untuk membuat Cluster Spark dan sesi di cluster Ray pada Vertex AI. Ray Task, atau Aktor, diwajibkan untuk menggunakan Klien Ray untuk membuat sesi Spark di cluster Ray pada Vertex AI. Hal berikut kode menunjukkan bagaimana Ray Actor dapat digunakan untuk membuat Sesi Spark, menjalankan aplikasi Spark dan menghentikan cluster Spark pada cluster Ray Vertex AI menggunakan RayDP.
Untuk mengetahui cara terhubung secara interaktif ke cluster Ray di Vertex AI, lihat Menghubungkan ke cluster Ray melalui Ray Client
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP dengan Ray Job API
Klien Ray berguna untuk eksperimen kecil yang memerlukan koneksi dengan gugus Ray. Tujuan Ray Job API adalah cara yang direkomendasikan untuk menjalankan tugas produksi dan jangka panjang pada . Hal ini juga berlaku untuk menjalankan aplikasi Spark pada gugus Ray Vertex AI.
Buat skrip Python yang berisi kode aplikasi Spark Anda. Contoh:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Kirim tugas untuk menjalankan skrip python menggunakan Ray Job API. Contoh:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Dengan keterangan:
SCRIPT_NAME
: Nama file skrip yang Anda buat.
Membaca file Cloud Storage dari aplikasi Spark
Menyimpan file data di bucket Google Cloud Storage adalah praktik yang umum. Ada beberapa cara untuk membaca file ini dari aplikasi Spark yang berjalan di gugus Ray di Vertex AI. Bagian ini menjelaskan dua teknik untuk membaca file Cloud Storage dari aplikasi Spark yang berjalan di Cluster Ray dengan Vertex AI.
Menggunakan Konektor Google Cloud Storage
Anda dapat menggunakan Google Cloud Connector untuk Hadoop untuk membaca file dari Bucket Cloud Storage dari aplikasi Spark Anda. Hal ini dilakukan dengan menggunakan beberapa parameter konfigurasi saat sesi Spark dibuat menggunakan RayDP. Tujuan kode berikut menunjukkan cara kerja file CSV yang disimpan di bucket Cloud Storage membaca dari aplikasi Spark di gugus Ray di Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP GCS Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Dengan keterangan:
GCS_FILE_URI
: URI file yang disimpan di bucket Cloud Storage. Misalnya: gs://my-bucket/my-file.csv.
Menggunakan data Ray
Google Cloud Connector menyediakan cara untuk membaca file dari Google Cloud
bucket dan mungkin memadai untuk sebagian besar kasus penggunaan. Anda mungkin ingin menggunakan
Ray Data untuk membaca file dari bucket Google Cloud saat Anda perlu menggunakan Ray's
pemrosesan terdistribusi untuk pembacaan data, atau saat Anda mengalami masalah saat membaca
File Google Cloud dengan konektor Google Cloud, yang mungkin dapat
terjadi karena konflik dependensi Java ketika beberapa aplikasi lain
dependensi ditambahkan ke classpath Spark Java menggunakan
spark.jars.packages
atau spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP GCS Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the GCS connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
Pyspark Pandas UDF pada gugus Ray di Vertex AI
Tujuan
UDF Pyspark Panda
terkadang memerlukan kode tambahan saat Anda menggunakannya di Spark
yang berjalan pada cluster Ray di Vertex AI. Hal ini biasanya
diperlukan saat Pandas UDF menggunakan
library Python yang tidak tersedia di
gugus Ray di Vertex AI. Anda dapat mengemas
Dependensi Python
aplikasi yang menggunakan Lingkungan Runtime dengan Ray job API dan ketika
Tugas Ray dikirimkan ke cluster, Ray menginstal dependensi itu di
Lingkungan virtual python yang dibuatnya untuk menjalankan tugas. Tujuan
UDF Panda,
namun, jangan menggunakan
lingkungan virtual yang sama. Sebagai gantinya, mereka
menggunakan {i>default <i}
Python System. Jika dependensi itu tidak
tersedia di sistem
Anda mungkin perlu menginstalnya dalam UDF Pandas Anda. Di kolom
berikut, library statsmodels
harus diinstal dalam UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()