Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Quando esegui un'applicazione Ray su Vertex AI, utilizza
BigQuery come database cloud. Questa sezione illustra come leggere e scrivere in un database BigQuery dal tuo cluster Ray su Vertex AI.
I passaggi descritti in questa sezione presuppongono l'utilizzo
dell'SDK Vertex AI per Python.
Se sei connesso al cluster Ray su Vertex AI, riavvia il kernel ed esegui il seguente codice. La variabile runtime_env è necessaria al momento della connessione per eseguire i comandi BigQuery.
importrayfromgoogle.cloudimportaiplatform# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.address='vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)runtime_env={"pip":["google-cloud-aiplatform[ray]","ray==2.47.1"]}ray.init(address=address,runtime_env=runtime_env)
Leggere i dati da BigQuery
Leggi i dati dal tuo set di dati BigQuery. Un
Ray Task deve eseguire
l'operazione di lettura.
PROJECT_ID: Google Cloud ID progetto. Trova l'ID progetto nella pagina di benvenuto della console Google Cloud .
LOCATION: la posizione in cui è archiviato Dataset. Ad esempio
us-central1.
DATASET: set di dati BigQuery. Deve essere nel formato dataset.table.
Imposta su None se fornisci una query.
PARALLELISM: un numero intero che influenza il numero di attività di lettura
create in parallelo. Potrebbero essere stati creati meno flussi di lettura di quelli
che hai richiesto.
QUERY: una stringa contenente una query SQL per leggere dal database BigQuery. Imposta questo valore su None se non è richiesta alcuna query.
Trasformare i dati
Aggiorna ed elimina righe e colonne dalle tabelle BigQuery utilizzando
pyarrow o pandas. Se vuoi utilizzare le trasformazioni pandas,
mantieni il tipo di input pyarrow e converti in pandas
all'interno della funzione definita dall'utente dall'utente (UDF) in modo da poter rilevare eventuali errori di tipo di conversione pandas
all'interno della UDF. Un
Ray Task deve eseguire la trasformazione.
@ray.remotedefrun_remotely():# BigQuery Read firstimportpandasaspdimportpyarrowaspadeffilter_batch(table:pa.Table)->pa.Table:df=table.to_pandas(types_mapper={pa.int64():pd.Int64Dtype()}.get)# PANDAS_TRANSFORMATIONS_HEREreturnpa.Table.from_pandas(df)ds=ds.map_batches(filter_batch,batch_format="pyarrow").random_shuffle()ds.materialize()# You can repartition before writing to determine the number of write blocksds=ds.repartition(4)ds.materialize()
Scrivere dati in BigQuery
Inserisci i dati nel set di dati BigQuery. Un'attività Ray deve eseguire la scrittura.
@ray.remotedefrun_remotely():# BigQuery Read and optional data transformation firstdataset=DATASETvertex_ray.data.write_bigquery(ds,dataset=dataset)
Dove:
DATASET: set di dati BigQuery. Il set di dati deve essere nel formato dataset.table.
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-09-04 UTC."],[],[],null,["# Use Ray on Vertex AI with BigQuery\n\nWhen you run a Ray application on Vertex AI, use\n[BigQuery](/bigquery/docs/introduction) as your cloud database. This\nsection covers how to read from and write to a BigQuery database from\nyour on Vertex AI.\nThe steps in this section assume that you use\nthe Vertex AI SDK for Python.\n\nTo read from a BigQuery dataset, [create a new\nBigQuery dataset](/bigquery/docs/datasets) or use an existing dataset.\n\nImport and initialize Ray on Vertex AI client\n---------------------------------------------\n\nIf you're connected to your Ray cluster on Vertex AI, restart your\nkernel and run the following code. The `runtime_env` variable is necessary at\nconnection time to run BigQuery commands. \n\n```python\nimport ray\nfrom google.cloud import aiplatform\n\n# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.\naddress = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)\n\nruntime_env = {\n \"pip\":\n [\"google-cloud-aiplatform[ray]\",\"ray==2.47.1\"]\n }\n\nray.init(address=address, runtime_env=runtime_env)\n```\n\nRead data from BigQuery\n-----------------------\n\nRead data from your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform\nthe read operation.\n**Note:** The maximum query response size is 10 GB. \n\n```python\naiplatform.init(project=PROJECT_ID, location=LOCATION)\n\n@ray.remote\ndef run_remotely():\n import vertex_ray\n dataset = DATASET\n parallelism = PARALLELISM\n query = QUERY\n\n ds = vertex_ray.data.read_bigquery(\n dataset=dataset,\n parallelism=parallelism,\n query=query\n )\n ds.materialize()\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e** : Google Cloud project ID. Find the project ID\n in the Google Cloud console [welcome](https://console.cloud.google.com/welcome)\n page.\n\n- **\u003cvar translate=\"no\"\u003eLOCATION\u003c/var\u003e** : The location where the `Dataset` is stored. For example,\n `us-central1`.\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. It must be in the format `dataset.table`.\n Set to `None` if you provide a query.\n\n- **\u003cvar translate=\"no\"\u003ePARALLELISM\u003c/var\u003e**: An integer that influences how many read tasks are\n created in parallel. There may be fewer read streams created than you\n requested.\n\n- **\u003cvar translate=\"no\"\u003eQUERY\u003c/var\u003e** : A string containing a SQL query to read from BigQuery database. Set to `None` if no query is required.\n\nTransform data\n--------------\n\nUpdate and delete rows and columns from your BigQuery tables using\n`pyarrow` or `pandas`. If you want to use `pandas` transformations,\nkeep the input type as pyarrow and convert to `pandas`\nwithin the user-defined function (UDF) so you can catch any `pandas` conversion\ntype errors within the UDF. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the transformation. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read first\n import pandas as pd\n import pyarrow as pa\n\n def filter_batch(table: pa.Table) -\u003e pa.Table:\n df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)\n # PANDAS_TRANSFORMATIONS_HERE\n return pa.Table.from_pandas(df)\n\n ds = ds.map_batches(filter_batch, batch_format=\"pyarrow\").random_shuffle()\n ds.materialize()\n\n # You can repartition before writing to determine the number of write blocks\n ds = ds.repartition(4)\n ds.materialize()\n```\n\nWrite data to BigQuery\n----------------------\n\nInsert data to your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the write. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read and optional data transformation first\n dataset=DATASET\n vertex_ray.data.write_bigquery(\n ds,\n dataset=dataset\n )\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. The dataset must be in the format `dataset.table`.\n\nWhat's next\n-----------\n\n- [Deploy a model on Vertex AI\n and get predictions](/vertex-ai/docs/open-source/ray-on-vertex-ai/deploy-predict)\n\n- [View logs for your Ray cluster on Vertex AI](/vertex-ai/docs/open-source/ray-on-vertex-ai/view-logs)\n\n- [Delete a Ray cluster](/vertex-ai/docs/open-source/ray-on-vertex-ai/delete-cluster)"]]