Utilizzare Ray su Vertex AI con BigQuery

Quando esegui un'applicazione Ray su Vertex AI, puoi utilizzare BigQuery come database cloud. Questo in cui viene spiegato come leggere e scrivere in un database BigQuery del tuo cluster Ray su Vertex AI. I passaggi in questa sezione presuppongono che utilizzando l'SDK Vertex AI per Python.

Se vuoi leggere da un set di dati BigQuery, devi creare una nuova set di dati BigQuery o utilizzane uno esistente.

Importa e inizializza Ray sul client Vertex AI

Se sei già connesso al tuo cluster Ray su Vertex AI, riavvia il kernel ed esegui il seguente codice. La variabile runtime_env è necessaria al momento della connessione per eseguire i comandi BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

Leggere i dati da BigQuery

Leggere i dati dal set di dati BigQuery. La lettura deve essere eseguita in un Ray Task.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Dove:

  • PROJECT_ID: ID progetto Google Cloud. Puoi trovare l'ID progetto nel benvenuto della console Google Cloud .

  • LOCATION: la posizione in cui è archiviato Dataset. Ad esempio, us-central1.

  • DATASET: set di dati BigQuery. Deve essere nel formato dataset.table. Imposta su None se fornisci una query.

  • PARALLELISM: un numero intero che influenza il numero di attività di lettura creati in parallelo. Potrebbero essere stati creati meno stream di lettura rispetto a quelli richiesti.

  • QUERY: una stringa contenente una query SQL da leggere dal database BigQuery. Imposta su None se non sono richieste query.

Trasformare i dati

Aggiorna ed elimina righe e colonne dalle tabelle BigQuery utilizzando pyarrow o pandas. Se vuoi utilizzare le trasformazioni pandas, ti consigliamo di mantenere il tipo di input come pyarrow e di convertirlo in pandas all'interno della funzione definita dall'utente, in modo da poter rilevare eventuali errori di tipo di conversione pandas all'interno della funzione definita dall'utente. La trasformazione deve essere eseguita in una task Ray.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Scrivere dati in BigQuery

Inserisci i dati nel tuo set di dati BigQuery. La scrittura deve essere eseguita in un Ray Task.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Dove:

  • DATASET: set di dati BigQuery. Deve essere nel formato dataset.table.

Passaggi successivi