BigQuery と Ray on Vertex AI を使用する

Vertex AI で Ray アプリケーションを実行する場合は、クラウド データベースとして BigQuery を使用できます。このセクションでは、Vertex AI 上の Ray クラスタから BigQuery データベースに対して読み取り / 書き込みを行う方法について説明します。このセクションの手順では、Vertex AI SDK for Python を使用していることを前提としています。

BigQuery データセットから読み取る場合は、新しい BigQuery データセットを作成するか、既存のデータセットを使用する必要があります。

Ray on Vertex AI クライアントをインポートして初期化する

Vertex AI 上の Ray クラスタにすでに接続している場合は、カーネルを再起動して次のコードを実行します。runtime_env 変数は、接続時に BigQuery コマンドを実行するために必要です。

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

BigQuery からデータを読み取る

BigQuery データセットからデータを読み取ります。読み取りは Ray タスクで行う必要があります。

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

各要素の意味は次のとおりです。

  • PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、Google Cloud コンソールの [ようこそ] ページで確認できます。

  • LOCATION: Dataset が保存されているロケーション。例: us-central1

  • DATASET: BigQuery データセット。dataset.table の形式で指定してください。クエリを指定する場合は None に設定します。

  • PARALLELISM: 並行して作成される読み取りタスクの数に影響を与える整数。作成された読み取りストリームがリクエストの数よりも少ない場合があります。

  • QUERY: BigQuery データベースから読み取る SQL クエリを含む文字列。クエリが不要な場合は、None に設定します。

データを変換する

pyarrow または pandas を使用して、BigQuery テーブルの行と列の更新および削除を行います。pandas 変換を使用する場合は、入力型を pyarrow として維持し、ユーザー定義関数(UDF)内で pandas に変換することをおすすめします。これにより、UDF 内で pandas 変換タイプエラーを検出できます。変換は Ray タスクで行う必要があります。

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

データを BigQuery に書き込む

BigQuery データセットにデータを挿入します。書き込みは Ray タスクで行う必要があります。

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

各要素の意味は次のとおりです。

  • DATASET: BigQuery データセット。dataset.table の形式で指定してください。

次のステップ