Menjalankan Spark di cluster Ray di Vertex AI

Library Python RayDP memungkinkan Anda menjalankan Spark di cluster Ray. Dokumen ini membahas cara menginstal, mengonfigurasi, dan menjalankan RayDP di Ray on Vertex AI (cluster Ray di Vertex AI).

Penginstalan

Ray di Vertex AI memungkinkan pengguna menjalankan aplikasi mereka menggunakan framework Ray open source. RayDP menyediakan API untuk menjalankan Spark di Ray. Image container bawaan yang tersedia untuk membuat cluster Ray di Vertex AI tidak dilengkapi dengan RayDP yang sudah diinstal sebelumnya, yang berarti Anda harus membuat image cluster Ray kustom di Vertex AI untuk cluster Ray di Vertex AI guna menjalankan aplikasi RayDP di cluster Ray di Vertex AI. Bagian berikut menjelaskan cara mem-build image kustom RayDP.

Mem-build image container kustom Ray on Vertex AI

Gunakan dockerfile ini untuk membuat image container kustom untuk Ray di Vertex AI yang telah menginstal RayDP.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Anda dapat menggunakan cluster Ray terbaru pada image bawaan Vertex AI untuk membuat image kustom RayDP. Anda juga dapat menginstal paket Python lainnya yang diperkirakan akan digunakan dalam aplikasi. pyarrow==14.0 disebabkan oleh batasan dependensi Ray 2.33.0.

Membangun dan mengirim image container kustom

Anda harus membuat Repositori Docker di Artifact Registry sebelum dapat mem-build image kustom (lihat Menggunakan image container untuk mengetahui cara membuat dan mengonfigurasi repositori Docker). Setelah membuat repositori Docker, build dan kirim image container kustom menggunakan Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Dengan keterangan:

  • LOCATION: Lokasi Cloud Storage (misalnya, us-central1) yang Anda buat di Artifact Registry.
  • PROJECT_ID: ID project Google Cloud Anda.
  • DOCKER_REPOSITORY: Nama repositori docker yang Anda buat.
  • IMAGE_NAME: Nama image penampung kustom Anda.

Membuat cluster Ray di Vertex AI

Gunakan image container kustom yang dibuat di langkah sebelumnya untuk membuat cluster Ray di Vertex AI. Anda dapat menggunakan Vertex AI SDK untuk Python guna membuat cluster Ray di Vertex AI.

Jika Anda belum melakukannya,instal library Python yang diperlukan.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Konfigurasikan node Head dan Worker, lalu buat cluster menggunakan Vertex AI SDK untuk Python. Contoh:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Dengan keterangan:

  • CUSTOM_CONTAINER_IMAGE_URI: URI image container kustom yang di-push ke Artifact Registry.
  • CLUSTER_NAME: Nama cluster Ray Anda di Vertex AI.

Spark di cluster Ray di Vertex AI

Sebelum dapat menjalankan aplikasi Spark, Anda harus membuat sesi Spark menggunakan RayDP API. Anda dapat menggunakan klien Ray untuk melakukannya secara interaktif atau menggunakan API tugas Ray. Ray job API direkomendasikan, terutama untuk produksi dan aplikasi yang berjalan lama. RayDP API menyediakan parameter untuk mengonfigurasi sesi Spark, serta mendukung Konfigurasi Spark. Pelajari RayDP API lebih lanjut untuk membuat Sesi Spark, lihat Afinitas node aktor master Spark.

RayDP dengan klien Ray

Anda dapat menggunakan Ray Task atau Actor untuk membuat cluster dan sesi Spark di cluster Ray di Vertex AI. Tugas Ray, atau Aktor, harus menggunakan Ray Client untuk membuat sesi Spark di cluster Ray di Vertex AI. Kode berikut menunjukkan cara Ray Actor dapat digunakan untuk membuat Sesi Spark, menjalankan aplikasi Spark, dan menghentikan cluster Spark di cluster Ray di Vertex AI menggunakan RayDP.

Untuk mengetahui cara terhubung secara interaktif ke cluster Ray di Vertex AI, lihat Terhubung ke cluster Ray melalui Ray Client

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP dengan Ray Job API

Klien Ray berguna untuk eksperimen kecil yang memerlukan koneksi interaktif dengan cluster Ray. Ray Job API adalah cara yang direkomendasikan untuk menjalankan tugas produksi dan tugas yang berjalan lama di cluster Ray. Hal ini juga berlaku untuk menjalankan aplikasi Spark di cluster Ray di Vertex AI.

Buat skrip Python yang berisi kode aplikasi Spark Anda. Contoh:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Kirim tugas untuk menjalankan skrip python menggunakan Ray Job API. Contoh:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Dengan keterangan:

  • SCRIPT_NAME: Nama file skrip yang Anda buat.

Membaca file Cloud Storage dari aplikasi Spark

Menyimpan file data di bucket Google Cloud Storage merupakan praktik yang umum dilakukan. Ada beberapa cara untuk membaca file ini dari aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Bagian ini menjelaskan dua teknik untuk membaca file Cloud Storage dari aplikasi Spark yang berjalan di Cluster Ray di Vertex AI.

Menggunakan Konektor Google Cloud Storage

Anda dapat menggunakan Konektor Google Cloud untuk Hadoop guna membaca file dari bucket Cloud Storage dari aplikasi Spark. Hal ini dilakukan menggunakan beberapa parameter konfigurasi saat sesi Spark dibuat menggunakan RayDP. Kode berikut menunjukkan cara file CSV yang disimpan di bucket Cloud Storage dapat dibaca dari aplikasi Spark di cluster Ray di Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Dengan keterangan:

  • GCS_FILE_URI: URI file yang disimpan di bucket Cloud Storage. Misalnya: gs://my-bucket/my-file.csv.

Menggunakan data Ray

Google Cloud Connector menyediakan cara untuk membaca file dari bucket Google Cloud dan mungkin sudah cukup untuk sebagian besar kasus penggunaan. Sebaiknya gunakan Ray Data untuk membaca file dari bucket Google Cloud saat Anda perlu menggunakan pemrosesan terdistribusi Ray untuk membaca data, atau saat Anda mengalami masalah saat membaca file Google Cloud dengan konektor Google Google Cloud, yang mungkin terjadi karena konflik dependensi Java saat beberapa dependensi aplikasi lain ditambahkan ke classpath Java Spark menggunakan spark.jars.packages atau spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF Pyspark Pandas di cluster Ray di Vertex AI

UDF Pandas Pyspark kadang-kadang mungkin memerlukan kode tambahan saat Anda menggunakannya dalam aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Hal ini biasanya diperlukan saat Pandas UDF menggunakan library Python yang tidak tersedia di cluster Ray di Vertex AI. Anda dapat memaketkan dependensi Python aplikasi menggunakan Runtime Environment dengan API tugas Ray dan saat tugas Ray dikirim ke cluster, Ray akan menginstal dependensi tersebut di lingkungan virtual Python yang dibuatnya untuk menjalankan tugas. Namun, UDF Pandas tidak menggunakan lingkungan virtual yang sama. Sebagai gantinya, mereka menggunakan lingkungan Sistem Python default. Jika dependensi tersebut tidak tersedia di lingkungan System, Anda mungkin perlu menginstalnya dalam UDF Pandas. Dalam contoh berikut, library statsmodels harus diinstal dalam UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()