Menjalankan Spark di cluster Ray pada Vertex AI

Library Python RayDP membuatnya dapat digunakan untuk menjalankan Spark pada gugus Ray. Dokumen ini membahas tentang penginstalan, mengonfigurasi dan menjalankan RayDP pada Ray pada Vertex AI (cluster Ray pada Vertex AI).

Penginstalan

Ray di Vertex AI memungkinkan pengguna menjalankan aplikasi mereka framework Ray. RayDP menyediakan API untuk menjalankan Spark di Ray. Image container bawaan yang tersedia untuk membuat gugus Ray Vertex AI tidak dilengkapi dengan RayDP yang sudah diinstal sebelumnya, yang berarti Anda perlu membuat cluster Ray kustom di Vertex AI gambar untuk cluster Ray Anda di Vertex AI untuk menjalankan aplikasi RayDP di Gugus Ray di Vertex AI. Bagian berikut ini menjelaskan bagaimana RayDP image kustom dapat dibuat.

Image container kustom Ray on Vertex AI

Gunakan dockerfile ini untuk membuat image container kustom untuk Ray di Vertex AI yang telah menginstal RayDP.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Anda dapat menggunakan cluster Ray terbaru pada image bawaan Vertex AI untuk yang membuat image kustom RayDP. Anda juga dapat menginstal paket Python lainnya yang diperkirakan akan digunakan dalam aplikasi Anda. pyarrow==14.0 jatuh tempo ke batasan dependensi Ray 2.9.3.

Membangun dan mengirim image container kustom

Anda perlu membuat Repositori Docker di Artifact Registry sebelum Anda dapat membangun image kustom (lihat Menggunakan image container untuk mengetahui cara membuat dan mengonfigurasi repositori Docker Anda). Setelah Anda memiliki membuat, membangun, dan mengirim image container kustom menggunakan dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Dengan keterangan:

  • LOCATION: Lokasi Cloud Storage (misalnya, us-central1) yang Anda yang dibuat di Artifact Registry.
  • PROJECT_ID: ID project Google Cloud Anda.
  • DOCKER_REPOSITORY: Nama repositori Docker yang Anda buat.
  • IMAGE_NAME: Nama image container kustom Anda.

Membuat cluster Ray di Vertex AI

Gunakan image container kustom yang dibuat pada langkah sebelumnya untuk membuat Ray cluster di Vertex AI. Anda dapat menggunakan Vertex AI SDK untuk Python pembuatan gugus Ray pada Vertex AI.

Jika Anda belum melakukannya,instal library Python yang diperlukan.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Konfigurasi node Head dan Worker serta buat cluster menggunakan Vertex AI SDK untuk Python. Contoh:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Dengan keterangan:

  • CUSTOM_CONTAINER_IMAGE_URI: URI image container kustom yang dikirim ke dan Artifact Registry.
  • CLUSTER_NAME: Nama cluster Ray Anda di Vertex AI.

Spark on Ray cluster pada Vertex AI

Sebelum dapat menjalankan aplikasi Spark, Anda perlu membuat sesi Spark menggunakan RayDP API. Anda dapat menggunakan klien Ray untuk melakukan ini secara interaktif atau menggunakan Ray job API. Ray job API direkomendasikan, terutama untuk produksi dan aplikasi yang berjalan lama. RayDP API menyediakan parameter untuk mengonfigurasi sesi Spark, serta mendukung Konfigurasi Spark. Pelajari lebih lanjut RayDP API untuk membuat Sesi Spark. Afinitas node pelaku master Spark.

RayDP dengan klien Ray

Anda bisa menggunakan Ray Tugas atau Pelaku untuk membuat Cluster Spark dan sesi di cluster Ray pada Vertex AI. Ray Task, atau Aktor, diwajibkan untuk menggunakan Klien Ray untuk membuat sesi Spark di cluster Ray pada Vertex AI. Hal berikut kode menunjukkan bagaimana Ray Actor dapat digunakan untuk membuat Sesi Spark, menjalankan aplikasi Spark dan menghentikan cluster Spark pada cluster Ray Vertex AI menggunakan RayDP.

Untuk mengetahui cara terhubung secara interaktif ke cluster Ray di Vertex AI, lihat Menghubungkan ke cluster Ray melalui Ray Client

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP dengan Ray Job API

Klien Ray berguna untuk eksperimen kecil yang memerlukan koneksi dengan gugus Ray. Tujuan Ray Job API adalah cara yang direkomendasikan untuk menjalankan tugas produksi dan jangka panjang pada . Hal ini juga berlaku untuk menjalankan aplikasi Spark pada gugus Ray Vertex AI.

Buat skrip Python yang berisi kode aplikasi Spark Anda. Contoh:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Kirim tugas untuk menjalankan skrip python menggunakan Ray Job API. Contoh:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Dengan keterangan:

  • SCRIPT_NAME: Nama file skrip yang Anda buat.

Membaca file Cloud Storage dari aplikasi Spark

Menyimpan file data di bucket Google Cloud Storage adalah praktik yang umum. Ada beberapa cara untuk membaca file ini dari aplikasi Spark yang berjalan di gugus Ray di Vertex AI. Bagian ini menjelaskan dua teknik untuk membaca file Cloud Storage dari aplikasi Spark yang berjalan di Cluster Ray dengan Vertex AI.

Menggunakan Konektor Google Cloud Storage

Anda dapat menggunakan Google Cloud Connector untuk Hadoop untuk membaca file dari Bucket Cloud Storage dari aplikasi Spark Anda. Hal ini dilakukan dengan menggunakan beberapa parameter konfigurasi saat sesi Spark dibuat menggunakan RayDP. Tujuan kode berikut menunjukkan cara kerja file CSV yang disimpan di bucket Cloud Storage membaca dari aplikasi Spark di gugus Ray di Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Dengan keterangan:

  • GCS_FILE_URI: URI file yang disimpan di bucket Cloud Storage. Misalnya: gs://my-bucket/my-file.csv.

Menggunakan data Ray

Google Cloud Connector menyediakan cara untuk membaca file dari Google Cloud bucket dan mungkin memadai untuk sebagian besar kasus penggunaan. Anda mungkin ingin menggunakan Ray Data untuk membaca file dari bucket Google Cloud saat Anda perlu menggunakan Ray's pemrosesan terdistribusi untuk pembacaan data, atau saat Anda mengalami masalah saat membaca File Google Cloud dengan konektor Google Cloud, yang mungkin dapat terjadi karena konflik dependensi Java ketika beberapa aplikasi lain dependensi ditambahkan ke classpath Spark Java menggunakan spark.jars.packages atau spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Pyspark Pandas UDF pada gugus Ray di Vertex AI

Tujuan UDF Pyspark Panda terkadang memerlukan kode tambahan saat Anda menggunakannya di Spark yang berjalan pada cluster Ray di Vertex AI. Hal ini biasanya diperlukan saat Pandas UDF menggunakan library Python yang tidak tersedia di gugus Ray di Vertex AI. Anda dapat mengemas Dependensi Python aplikasi yang menggunakan Lingkungan Runtime dengan Ray job API dan ketika Tugas Ray dikirimkan ke cluster, Ray menginstal dependensi itu di Lingkungan virtual python yang dibuatnya untuk menjalankan tugas. Tujuan UDF Panda, namun, jangan menggunakan lingkungan virtual yang sama. Sebagai gantinya, mereka menggunakan {i>default <i} Python System. Jika dependensi itu tidak tersedia di sistem Anda mungkin perlu menginstalnya dalam UDF Pandas Anda. Di kolom berikut, library statsmodels harus diinstal dalam UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()