When running a Ray application on Vertex AI, you can use BigQuery as your cloud database. This section covers how to read from and write to a BigQuery database from your Ray cluster on Vertex AI. The steps in this section assume you're using the Vertex AI SDK for Python.
If you want to read from a BigQuery dataset, you should create a new BigQuery dataset or use an existing dataset.
Import and initialize Ray on Vertex AI client
If you're already connected to your Ray cluster on Vertex AI, restart your
kernel and run the following code. The runtime_env
variable is necessary at
connection time to run BigQuery commands.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.33.0"] } ray.init(address=address, runtime_env=runtime_env)
Read data from BigQuery
Read data from your BigQuery dataset. The read must be done in a Ray Task.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Where:
PROJECT_ID: Google Cloud project ID. You can find the project ID in the Google Cloud console welcome page.
LOCATION: The location where the
Dataset
is stored. For example,us-central1
.DATASET: BigQuery dataset. Must be in the format
dataset.table
. Set toNone
if providing a query.PARALLELISM: An integer that influences how many read tasks are created in parallel. There may be fewer read streams created than you requested.
QUERY: A string containing a SQL query to read from BigQuery database. Set to
None
if no query is required.
Transform data
Update and delete rows and columns from your BigQuery tables using
pyarrow
or pandas
. If you want to use pandas
transformations, it's recommended that you keep the input type as pyarrow and convert to pandas
within the user-defined function (UDF) so you can catch any pandas
conversion type errors within the UDF. The transformation must be done in a Ray Task.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Write data to BigQuery
Insert data to your BigQuery dataset. The write must be done in a Ray Task.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Where:
- DATASET: BigQuery dataset. Must be in the format
dataset.table
.