A biblioteca Python RayDP torna possível executar o Spark em um cluster do Ray. Este documento abrange a instalação, a configuração e a execução do RayDP no Ray na Vertex AI (cluster do Ray na Vertex AI).
Instalação
O Ray na Vertex AI permite que os usuários executem aplicativos usando o framework do Ray de código aberto. A RayDP fornece APIs para executar o Spark no Ray. As imagens de contêiner pré-criadas disponíveis para criar um cluster do Ray na Vertex AI não vêm com o RayDP pré-instalado, o que significa que é preciso criar uma imagem de cluster do Ray personalizado na Vertex AI para o cluster do Ray na Vertex AI para executar aplicativos do RayDP no cluster do Ray na Vertex AI. A seção a seguir explica como uma imagem personalizada do RayDP pode ser criada.
Criar uma imagem de contêiner personalizada do Ray na Vertex AI
Use este dockerfile para criar uma imagem de contêiner personalizada para o Ray na Vertex AI com o RayDP instalado.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
É possível usar o cluster do Ray mais recente na imagem pré-criada da Vertex AI para
criar a imagem personalizada do RayDP. Você também pode instalar outros pacotes Python
que você prevê que usará nos seus aplicativos. O pyarrow==14.0
é devido
a uma restrição de dependência do Ray 2.9.3.
Criar e enviar a imagem de contêiner personalizada
Você precisa criar um repositório do Docker no Artifact Registry para criar sua imagem personalizada (consulte Trabalhar com imagens de contêiner para saber como criar e configurar seu repositório do Docker). Assim que tiver criado o repositório do Docker, crie e envie a imagem de contêiner personalizada usando o dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Em que:
LOCATION
: o local do Cloud Storage (por exemplo, us-central1) que você criou no seu Artifact Registry.PROJECT_ID
pelo ID do projeto no Google Cloud.DOCKER_REPOSITORY
: o nome do repositório do Docker que você criou.IMAGE_NAME
: o nome das suas imagens de contêiner personalizadas.
Criar um cluster do Ray na Vertex AI
Use a imagem personalizada de contêiner criada na etapa anterior para criar um cluster do Ray na Vertex AI. O SDK da Vertex AI para Python pode ser usado para criar um cluster do Ray na Vertex AI.
Instale as bibliotecas necessárias do Python, caso ainda não tenha feito isso.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configure nós de cabeçalho e de trabalho e crie o cluster usando o SDK da Vertex AI para Python. Por exemplo:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Em que:
CUSTOM_CONTAINER_IMAGE_URI
: o URI da imagem de contêiner personalizada enviada para o Artifact Registry.CLUSTER_NAME
: o nome do cluster do Ray na Vertex AI.
Cluster do Spark no Ray na Vertex AI
Antes de executar seu aplicativo Spark, você precisa criar uma sessão do Spark usando a API RayDP. É possível usar o cliente Ray para fazer isso de maneira interativa ou usar a API de jobs do Ray. A API de job do Ray é recomendada, especialmente para aplicativos de produção e de longa duração. A API RayDP fornece parâmetros para configurar a sessão do Spark, além de oferecer suporte à configuração do Spark. Para saber mais sobre a API RayDP para criar uma sessão do Spark, consulte Afinidade do nó de atores mestres do Spark.
RayDP com cliente Ray
Você pode usar Ray Task ou Actor para criar um cluster e sessão do Spark no cluster do Ray na Vertex AI. O Ray Task ou Actor é necessário para usar um Cliente do Ray para criar uma sessão do Spark no cluster do Ray na Vertex AI. O seguinte código mostra como um Ray Actor pode ser usado para criar uma sessão do Spark, executando um aplicativo do Spark e interrompendo um cluster do Spark em um cluster do Ray na Vertex AI usando o RayDP.
Para saber como se conectar interativamente ao cluster do Ray na Vertex AI, consulte Conectar-se a um cluster do Ray pelo cliente do Ray
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP com a API Ray Job
O cliente do Ray é útil para pequenos experimentos que exigem conexão com o cluster do Ray. A API Ray Job é a maneira recomendada de executar jobs de longa duração e de produção em um cluster do Ray. Isso também se aplica à execução de aplicativos Spark no cluster do Ray na Vertex AI.
Criar um script Python que contenha o código do aplicativo Spark. Por exemplo:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Envie o job para executar o script do Python usando a API Ray Job. Por exemplo:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Em que:
SCRIPT_NAME
: o nome do arquivo do script que você criou.
Como ler arquivos do Cloud Storage do aplicativo Spark
É uma prática comum armazenar arquivos de dados em um bucket do Google Cloud Storage. Há várias maneiras de ler esses arquivos de um aplicativo Spark em execução no cluster do Ray na Vertex AI. Esta seção explica duas técnicas de leitura de arquivos do Cloud Storage de aplicativos Spark em execução no cluster do Ray na Vertex AI.
Usar o conector do Google Cloud Storage
É possível usar o Google Cloud Connector para Hadoop para ler arquivos de um bucket do Cloud Storage do aplicativo Spark. Isso é feito usando alguns parâmetros de configuração quando uma sessão do Spark é criada usando o RayDP. O código a seguir mostra como um arquivo CSV armazenado em um bucket do Cloud Storage pode ser lido em um aplicativo Spark no cluster do Ray na Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP GCS Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Em que:
GCS_FILE_URI
: o URI de um arquivo armazenado em um bucket do Cloud Storage. Por exemplo: gs://my-bucket/my-file.csv.
Usar dados do Ray
O Google Cloud Connector é uma maneira de ler arquivos de um bucket do Google Cloud,
o que pode ser suficiente para a maioria dos casos de uso. Você pode querer usar
dados do Ray para ler arquivos do bucket do Google Cloud quando precisar usar
processamento distribuído do Ray para leitura de dados ou quando você tiver problemas de leitura com
arquivos do Google Cloud com o conector do Google Cloud, que pode
acontecer devido a conflitos de dependência do Java quando algumas outras dependências de
aplicativo forem adicionadas ao caminho de classe Java do Spark usando
spark.jars.packages
ou spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP GCS Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the GCS connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF do Pyspark Pandas no cluster do Ray na Vertex AI
As
UDFs do Pyspark Pandas
podem exigir códigos adicionais ao usá-las no aplicativo
Spark em execução em um cluster do Ray na Vertex AI. Isso geralmente é
necessário quando a UDF do Pandas usa uma biblioteca Python indisponível no
cluster do Ray na Vertex AI. É possível empacotar as
Dependências do Python
de um aplicativo usando o Runtime Environment com a API de jobs do Ray e quando o
job do Ray é enviado ao cluster, ele instala essas dependências no
ambiente virtual Python criado para executar o job. As
UDFs do Pandas,
no entanto, não usam o mesmo ambiente virtual. Em vez disso, eles usam o
ambiente de sistema Python padrão. Se essa dependência não estiver disponível no ambiente do
sistema, talvez seja necessário instalá-lo na UDF do Pandas. No
exemplo a seguir, a biblioteca statsmodels
precisa ser instalada na UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()