搭配使用 BigQuery 和 Ray on Vertex AI

在 Vertex AI 上執行 Ray 應用程式時,請使用 BigQuery 做為雲端資料庫。本節說明如何從 Vertex AI 上的 Ray 叢集讀取及寫入 BigQuery 資料庫。本節中的步驟假設您使用 Vertex AI SDK for Python。

如要從 BigQuery 資料集讀取資料,請建立新的 BigQuery 資料集,或使用現有資料集。

匯入並初始化 Ray on Vertex AI 用戶端

如果您已連線至 Vertex AI 的 Ray 叢集,請重新啟動核心並執行下列程式碼。連線時必須使用 runtime_env 變數,才能執行 BigQuery 指令。

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

從 BigQuery 讀取資料

從 BigQuery 資料集讀取資料。Ray 工作必須執行讀取作業。

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

其中:

  • PROJECT_ID: Google Cloud 專案 ID。在 Google Cloud 控制台歡迎頁面中找到專案 ID。

  • LOCATION:儲存 Dataset 的位置。例如:us-central1

  • DATASET:BigQuery 資料集。格式必須為 dataset.table。如果您提供查詢,請設為 None

  • PARALLELISM:整數,可影響平行建立的讀取工作數量。建立的讀取串流數量可能少於要求數量。

  • QUERY:包含從 BigQuery 資料庫讀取資料的 SQL 查詢字串。如果不需要查詢,請設為 None

轉換資料

使用 pyarrowpandas 更新及刪除 BigQuery 資料表中的資料列和資料欄。如要使用 pandas 轉換,請將輸入類型保留為 pyarrow,並在使用者定義函式 (UDF) 中轉換為 pandas,這樣就能在 UDF 中找出任何 pandas 轉換類型錯誤。Ray 工作必須執行轉換。

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

將資料寫入 BigQuery

將資料插入 BigQuery 資料集。Ray Task 必須執行寫入作業。

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

其中:

  • DATASET:BigQuery 資料集。資料集必須採用 dataset.table 格式。

後續步驟