Quando esegui un'applicazione Ray su Vertex AI, puoi utilizzare BigQuery come database cloud. Questa sezione illustra come leggere e scrivere in un database BigQuery dal tuo cluster Ray su Vertex AI. I passaggi in questa sezione presuppongono che usi l'SDK Vertex AI per Python.
Se vuoi leggere da un set di dati BigQuery, devi creare un nuovo set di dati BigQuery o utilizzarne uno esistente.
Importa e inizializza Ray sul client Vertex AI
Se hai già una connessione al tuo cluster Ray su Vertex AI, riavvia il kernel ed esegui il codice seguente. La variabile runtime_env
è necessaria al momento
della connessione per eseguire i comandi BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.9.3"] } ray.init(address=address, runtime_env=runtime_env)
Leggi i dati da BigQuery
Leggere i dati dal set di dati BigQuery. La lettura deve essere eseguita in un Ray Task.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Dove:
PROJECT_ID: ID progetto Google Cloud. Puoi trovare l'ID progetto nella pagina di benvenuto della console Google Cloud.
LOCATION: la posizione in cui è archiviato
Dataset
. Ad esempio,us-central1
.DATASET: set di dati BigQuery. Deve essere nel formato
dataset.table
. Imposta suNone
se fornisci una query.PARALLELISM: un numero intero che influenza il numero di attività di lettura create in parallelo. Potrebbero esserci meno flussi di lettura creati rispetto a quanto richiesto.
QUERY: una stringa contenente una query SQL da leggere dal database BigQuery. Imposta su
None
se non sono richieste query.
Trasformare i dati
Aggiorna ed elimina righe e colonne dalle tabelle BigQuery utilizzando pyarrow
o pandas
. Se vuoi utilizzare le trasformazioni pandas
, ti consigliamo di mantenere il tipo di input come pyarrow e di convertirlo in pandas
all'interno della funzione definita dall'utente dall'utente, in modo da poter rilevare eventuali errori di tipo di conversione pandas
all'interno della funzione definita dall'utente. La trasformazione deve essere eseguita in Ray Task.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Scrivi dati in BigQuery
Inserisci i dati nel tuo set di dati BigQuery. La scrittura deve essere eseguita in un Ray Task.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Dove:
- DATASET: set di dati BigQuery. Deve essere nel formato
dataset.table
.