Executar o Spark no cluster do Ray na Vertex AI

A biblioteca Python RayDP torna possível executar o Spark em um cluster do Ray. Este documento abrange a instalação, a configuração e a execução do RayDP no Ray na Vertex AI (cluster do Ray na Vertex AI).

Instalação

O Ray na Vertex AI permite que os usuários executem aplicativos usando o framework do Ray de código aberto. A RayDP fornece APIs para executar o Spark no Ray. As imagens de contêiner pré-criadas disponíveis para criar um cluster do Ray na Vertex AI não vêm com o RayDP pré-instalado, o que significa que é preciso criar uma imagem de cluster do Ray personalizado na Vertex AI para o cluster do Ray na Vertex AI para executar aplicativos do RayDP no cluster do Ray na Vertex AI. A seção a seguir explica como uma imagem personalizada do RayDP pode ser criada.

Criar uma imagem de contêiner personalizada do Ray na Vertex AI

Use este dockerfile para criar uma imagem de contêiner personalizada para o Ray na Vertex AI com o RayDP instalado.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

É possível usar o cluster do Ray mais recente na imagem pré-criada da Vertex AI para criar a imagem personalizada do RayDP. Você também pode instalar outros pacotes Python que você prevê que usará nos seus aplicativos. O pyarrow==14.0 é devido a uma restrição de dependência do Ray 2.9.3.

Criar e enviar a imagem de contêiner personalizada

Você precisa criar um repositório do Docker no Artifact Registry para criar sua imagem personalizada (consulte Trabalhar com imagens de contêiner para saber como criar e configurar seu repositório do Docker). Assim que tiver criado o repositório do Docker, crie e envie a imagem de contêiner personalizada usando o dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Em que:

  • LOCATION: o local do Cloud Storage (por exemplo, us-central1) que você criou no seu Artifact Registry.
  • PROJECT_ID pelo ID do projeto no Google Cloud.
  • DOCKER_REPOSITORY: o nome do repositório do Docker que você criou.
  • IMAGE_NAME: o nome das suas imagens de contêiner personalizadas.

Criar um cluster do Ray na Vertex AI

Use a imagem personalizada de contêiner criada na etapa anterior para criar um cluster do Ray na Vertex AI. O SDK da Vertex AI para Python pode ser usado para criar um cluster do Ray na Vertex AI.

Instale as bibliotecas necessárias do Python, caso ainda não tenha feito isso.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configure nós de cabeçalho e de trabalho e crie o cluster usando o SDK da Vertex AI para Python. Por exemplo:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Em que:

  • CUSTOM_CONTAINER_IMAGE_URI: o URI da imagem de contêiner personalizada enviada para o Artifact Registry.
  • CLUSTER_NAME: o nome do cluster do Ray na Vertex AI.

Cluster do Spark no Ray na Vertex AI

Antes de executar seu aplicativo Spark, você precisa criar uma sessão do Spark usando a API RayDP. É possível usar o cliente Ray para fazer isso de maneira interativa ou usar a API de jobs do Ray. A API de job do Ray é recomendada, especialmente para aplicativos de produção e de longa duração. A API RayDP fornece parâmetros para configurar a sessão do Spark, além de oferecer suporte à configuração do Spark. Para saber mais sobre a API RayDP para criar uma sessão do Spark, consulte Afinidade do nó de atores mestres do Spark.

RayDP com cliente Ray

Você pode usar Ray Task ou Actor para criar um cluster e sessão do Spark no cluster do Ray na Vertex AI. O Ray Task ou Actor é necessário para usar um Cliente do Ray para criar uma sessão do Spark no cluster do Ray na Vertex AI. O seguinte código mostra como um Ray Actor pode ser usado para criar uma sessão do Spark, executando um aplicativo do Spark e interrompendo um cluster do Spark em um cluster do Ray na Vertex AI usando o RayDP.

Para saber como se conectar interativamente ao cluster do Ray na Vertex AI, consulte Conectar-se a um cluster do Ray pelo cliente do Ray

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP com a API Ray Job

O cliente do Ray é útil para pequenos experimentos que exigem conexão com o cluster do Ray. A API Ray Job é a maneira recomendada de executar jobs de longa duração e de produção em um cluster do Ray. Isso também se aplica à execução de aplicativos Spark no cluster do Ray na Vertex AI.

Criar um script Python que contenha o código do aplicativo Spark. Por exemplo:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envie o job para executar o script do Python usando a API Ray Job. Por exemplo:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Em que:

  • SCRIPT_NAME: o nome do arquivo do script que você criou.

Como ler arquivos do Cloud Storage do aplicativo Spark

É uma prática comum armazenar arquivos de dados em um bucket do Google Cloud Storage. Há várias maneiras de ler esses arquivos de um aplicativo Spark em execução no cluster do Ray na Vertex AI. Esta seção explica duas técnicas de leitura de arquivos do Cloud Storage de aplicativos Spark em execução no cluster do Ray na Vertex AI.

Usar o conector do Google Cloud Storage

É possível usar o Google Cloud Connector para Hadoop para ler arquivos de um bucket do Cloud Storage do aplicativo Spark. Isso é feito usando alguns parâmetros de configuração quando uma sessão do Spark é criada usando o RayDP. O código a seguir mostra como um arquivo CSV armazenado em um bucket do Cloud Storage pode ser lido em um aplicativo Spark no cluster do Ray na Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Em que:

  • GCS_FILE_URI: o URI de um arquivo armazenado em um bucket do Cloud Storage. Por exemplo: gs://my-bucket/my-file.csv.

Usar dados do Ray

O Google Cloud Connector é uma maneira de ler arquivos de um bucket do Google Cloud, o que pode ser suficiente para a maioria dos casos de uso. Você pode querer usar dados do Ray para ler arquivos do bucket do Google Cloud quando precisar usar processamento distribuído do Ray para leitura de dados ou quando você tiver problemas de leitura com arquivos do Google Cloud com o conector do Google Cloud, que pode acontecer devido a conflitos de dependência do Java quando algumas outras dependências de aplicativo forem adicionadas ao caminho de classe Java do Spark usando spark.jars.packages ou spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF do Pyspark Pandas no cluster do Ray na Vertex AI

As UDFs do Pyspark Pandas podem exigir códigos adicionais ao usá-las no aplicativo Spark em execução em um cluster do Ray na Vertex AI. Isso geralmente é necessário quando a UDF do Pandas usa uma biblioteca Python indisponível no cluster do Ray na Vertex AI. É possível empacotar as Dependências do Python de um aplicativo usando o Runtime Environment com a API de jobs do Ray e quando o job do Ray é enviado ao cluster, ele instala essas dependências no ambiente virtual Python criado para executar o job. As UDFs do Pandas, no entanto, não usam o mesmo ambiente virtual. Em vez disso, eles usam o ambiente de sistema Python padrão. Se essa dependência não estiver disponível no ambiente do sistema, talvez seja necessário instalá-lo na UDF do Pandas. No exemplo a seguir, a biblioteca statsmodels precisa ser instalada na UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()