将 Ray on Vertex AI 与 BigQuery 搭配使用

在 Vertex AI 上运行 Ray 应用时,您可以将 BigQuery 用作云数据库。本部分介绍如何从 Vertex AI 上的 Ray 集群对 BigQuery 数据库进行读取和写入操作。本部分中的步骤假定您使用的是 Vertex AI SDK for Python。

如果要从 BigQuery 数据集读取数据,您应该新建 BigQuery 数据集或使用现有数据集。

导入并初始化 Ray on Vertex AI 客户端

如果您已连接到 Vertex AI 上的 Ray 集群,请重启内核并运行以下代码。连接时必须采用 runtime_env 变量才能运行 BigQuery 命令。

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

从 BigQuery 中读取数据

从 BigQuery 数据集读取数据。读取必须在 Ray 任务中进行。

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

其中:

  • PROJECT_ID:Google Cloud 项目 ID。 您可以在 Google Cloud 控制台欢迎页面中找到项目 ID。

  • LOCATIONDataset 的存储位置。 例如 us-central1

  • DATASET:BigQuery 数据集。必须采用 dataset.table 格式。如果提供查询,则设置为 None

  • PARALLELISM:一个整数,将影响并行创建的读取任务数。创建的读取流数可能少于您请求的数量。

  • QUERY:包含要从 BigQuery 数据库读取的 SQL 查询的字符串。如果不需要查询,则设置为 None

转换数据

使用 pyarrowpandas 更新和删除 BigQuery 表中的行和列。如果您想使用 pandas 转换,建议将输入类型保留为 pyrow 类型,并转换为用户定义的函数 (UDF) 中的 pandas 类型,以便您可以捕获 UDF 中的任何 pandas 转换类型错误。转换必须在 Ray 任务中进行。

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

将数据写入 BigQuery

将数据插入 BigQuery 数据集。 写入必须在 Ray 任务中进行。

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

其中:

  • DATASET:BigQuery 数据集。必须采用 dataset.table 格式。

后续步骤