Menggunakan Ray di Vertex AI dengan BigQuery

Saat menjalankan aplikasi Ray di Vertex AI, Anda dapat menggunakan BigQuery sebagai database cloud. Bagian ini membahas cara membaca dan menulis ke database BigQuery dari cluster Ray di Vertex AI. Langkah-langkah di bagian ini mengasumsikan bahwa Anda menggunakan Vertex AI SDK untuk Python.

Jika ingin membaca dari set data BigQuery, Anda harus membuat set data BigQuery baru atau menggunakan set data yang ada.

Mengimpor dan melakukan inisialisasi klien Ray di Vertex AI

Jika Anda telah terhubung ke cluster Ray di Vertex AI, mulai ulang kernel dan jalankan kode berikut. Variabel runtime_env diperlukan pada waktu koneksi untuk menjalankan perintah BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.9.3"]
  }

ray.init(address=address, runtime_env=runtime_env)

Membaca data dari BigQuery

Membaca data dari set data BigQuery Anda. Pembacaan harus dilakukan di Ray Task.

aiplatform.init(project=project_id, location=location)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Dengan keterangan:

  • PROJECT_ID: ID project Google Cloud. Anda dapat menemukan project ID di halaman welcome Google Cloud Console.

  • DATASET: Set data BigQuery. Harus dalam format dataset.table. Tetapkan ke None jika memberikan kueri.

  • NUM_BLOCKS: Bilangan bulat yang memengaruhi jumlah tugas baca yang dibuat secara paralel. Mungkin terdapat lebih sedikit streaming baca yang dibuat daripada yang Anda minta.

  • SQL_QUERY: String yang berisi kueri SQL untuk dibaca dari database BigQuery. Tetapkan ke None jika tidak ada kueri yang diperlukan.

Mengubah data

Perbarui dan hapus baris serta kolom dari tabel BigQuery menggunakan pyarrow atau pandas. Jika ingin menggunakan transformasi pandas, sebaiknya Anda mempertahankan jenis input sebagai pyarrow dan mengonversinya ke pandas dalam fungsi yang ditentukan pengguna (UDF) sehingga Anda dapat menangkap error jenis konversi pandas dalam UDF. Transformasi harus dilakukan di Ray Task.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Menulis data ke BigQuery

Menyisipkan data ke set data BigQuery. Penulisan harus dilakukan di Ray Task.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Dengan keterangan:

  • DATASET: Set data BigQuery. Harus dalam format dataset.table.

Langkah selanjutnya