Vertex AI で Ray アプリケーションを実行する場合は、クラウド データベースとして BigQuery を使用できます。このセクションでは、Vertex AI 上の Ray クラスタから BigQuery データベースに対して読み取り / 書き込みを行う方法について説明します。このセクションの手順では、Vertex AI SDK for Python を使用していることを前提としています。
BigQuery データセットから読み取る場合は、新しい BigQuery データセットを作成するか、既存のデータセットを使用する必要があります。
Ray on Vertex AI クライアントをインポートして初期化する
Vertex AI 上の Ray クラスタにすでに接続している場合は、カーネルを再起動して次のコードを実行します。runtime_env
変数は、接続時に BigQuery コマンドを実行するために必要です。
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.33.0"] } ray.init(address=address, runtime_env=runtime_env)
BigQuery からデータを読み取る
BigQuery データセットからデータを読み取ります。読み取りは Ray タスクで行う必要があります。
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
各要素の意味は次のとおりです。
PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、Google Cloud コンソールの [ようこそ] ページで確認できます。
LOCATION:
Dataset
が保存されているロケーション。例:us-central1
DATASET: BigQuery データセット。
dataset.table
の形式で指定してください。クエリを指定する場合はNone
に設定します。PARALLELISM: 並行して作成される読み取りタスクの数に影響を与える整数。作成された読み取りストリームがリクエストの数よりも少ない場合があります。
QUERY: BigQuery データベースから読み取る SQL クエリを含む文字列。クエリが不要な場合は、
None
に設定します。
データを変換する
pyarrow
または pandas
を使用して、BigQuery テーブルの行と列の更新および削除を行います。pandas
変換を使用する場合は、入力型を pyarrow として維持し、ユーザー定義関数(UDF)内で pandas
に変換することをおすすめします。これにより、UDF 内で pandas
変換タイプエラーを検出できます。変換は Ray タスクで行う必要があります。
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
データを BigQuery に書き込む
BigQuery データセットにデータを挿入します。書き込みは Ray タスクで行う必要があります。
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
各要素の意味は次のとおりです。
- DATASET: BigQuery データセット。
dataset.table
の形式で指定してください。