Usar Ray en Vertex AI con BigQuery

Cuando ejecutes una aplicación de Ray en Vertex AI, usa BigQuery como base de datos en la nube. En esta sección se explica cómo leer y escribir en una base de datos de BigQuery desde tu clúster de Ray en Vertex AI. En los pasos de esta sección se da por hecho que usas el SDK de Vertex AI para Python.

Para leer datos de un conjunto de datos de BigQuery, crea un conjunto de datos de BigQuery o usa uno que ya tengas.

Importar e inicializar el cliente de Ray en Vertex AI

Si te has conectado a tu clúster de Ray en Vertex AI, reinicia el kernel y ejecuta el siguiente código. La variable runtime_env es necesaria en el momento de la conexión para ejecutar comandos de BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

Leer datos de BigQuery

Lee datos de tu conjunto de datos de BigQuery. Una tarea de Ray debe realizar la operación de lectura.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Donde:

  • PROJECT_ID: Google Cloud ID de proyecto. Busca el ID de proyecto en la página Bienvenido de la consola. Google Cloud

  • LOCATION: la ubicación donde se almacena el Dataset. Por ejemplo, us-central1.

  • DATASET: conjunto de datos de BigQuery. Debe tener el formato dataset.table. Asigna el valor None si proporcionas una consulta.

  • PARALLELISM: un número entero que influye en el número de tareas de lectura que se crean en paralelo. Puede que se hayan creado menos flujos de lectura de los que has solicitado.

  • QUERY: cadena que contiene una consulta de SQL para leer datos de la base de datos de BigQuery. Asigna el valor None si no se requiere ninguna consulta.

Transformar datos

Actualiza y elimina filas y columnas de tus tablas de BigQuery con pyarrow o pandas. Si quieres usar transformaciones de pandas, mantén el tipo de entrada como pyarrow y conviértelo a pandas en la función definida por el usuario (FDU) para poder detectar errores de tipo de conversión de pandas en la FDU. Una tarea de Ray debe realizar la transformación.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Escribir datos en BigQuery

Inserta datos en tu conjunto de datos de BigQuery. Una tarea de Ray debe realizar la escritura.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Donde:

  • DATASET: conjunto de datos de BigQuery. El conjunto de datos debe tener el formato dataset.table.

Siguientes pasos