Use Ray on Vertex AI with BigQuery

When running a Ray application on Vertex AI, you can use BigQuery as your cloud database. This section covers how to read from and write to a BigQuery database from your Ray cluster on Vertex AI. The steps in this section assume you're using the Vertex AI SDK for Python.

If you want to read from a BigQuery dataset, you should create a new BigQuery dataset or use an existing dataset.

Import and initialize Ray on Vertex AI client

If you're already connected to your Ray cluster on Vertex AI, restart your kernel and run the following code. The runtime_env variable is necessary at connection time to run BigQuery commands.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

Read data from BigQuery

Read data from your BigQuery dataset. The read must be done in a Ray Task.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Where:

  • PROJECT_ID: Google Cloud project ID. You can find the project ID in the Google Cloud console welcome page.

  • LOCATION: The location where the Dataset is stored. For example, us-central1.

  • DATASET: BigQuery dataset. Must be in the format dataset.table. Set to None if providing a query.

  • PARALLELISM: An integer that influences how many read tasks are created in parallel. There may be fewer read streams created than you requested.

  • QUERY: A string containing a SQL query to read from BigQuery database. Set to None if no query is required.

Transform data

Update and delete rows and columns from your BigQuery tables using pyarrow or pandas. If you want to use pandas transformations, it's recommended that you keep the input type as pyarrow and convert to pandas within the user-defined function (UDF) so you can catch any pandas conversion type errors within the UDF. The transformation must be done in a Ray Task.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Write data to BigQuery

Insert data to your BigQuery dataset. The write must be done in a Ray Task.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Where:

  • DATASET: BigQuery dataset. Must be in the format dataset.table.

What's next