Saat menjalankan aplikasi Ray di Vertex AI, Anda dapat menggunakan BigQuery sebagai database cloud. Bagian ini membahas cara membaca dan menulis ke database BigQuery dari cluster Ray di Vertex AI. Langkah-langkah di bagian ini mengasumsikan bahwa Anda menggunakan Vertex AI SDK untuk Python.
Jika ingin membaca dari set data BigQuery, Anda harus membuat set data BigQuery baru atau menggunakan set data yang ada.
Mengimpor dan melakukan inisialisasi klien Ray di Vertex AI
Jika Anda telah terhubung ke cluster Ray di Vertex AI, mulai ulang
kernel dan jalankan kode berikut. Variabel runtime_env
diperlukan pada
waktu koneksi untuk menjalankan perintah BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.9.3"] } ray.init(address=address, runtime_env=runtime_env)
Membaca data dari BigQuery
Membaca data dari set data BigQuery Anda. Pembacaan harus dilakukan di Ray Task.
aiplatform.init(project=project_id, location=location) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Dengan keterangan:
PROJECT_ID: ID project Google Cloud. Anda dapat menemukan project ID di halaman welcome Google Cloud Console.
DATASET: Set data BigQuery. Harus dalam format
dataset.table
. Tetapkan keNone
jika memberikan kueri.NUM_BLOCKS: Bilangan bulat yang memengaruhi jumlah tugas baca yang dibuat secara paralel. Mungkin terdapat lebih sedikit streaming baca yang dibuat daripada yang Anda minta.
SQL_QUERY: String yang berisi kueri SQL untuk dibaca dari database BigQuery. Tetapkan ke
None
jika tidak ada kueri yang diperlukan.
Mengubah data
Perbarui dan hapus baris serta kolom dari tabel BigQuery menggunakan pyarrow
atau pandas
. Jika ingin menggunakan transformasi pandas
, sebaiknya Anda mempertahankan jenis input sebagai pyarrow dan mengonversinya ke pandas
dalam fungsi yang ditentukan pengguna (UDF) sehingga Anda dapat menangkap error jenis konversi pandas
dalam UDF. Transformasi harus dilakukan di Ray Task.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Menulis data ke BigQuery
Menyisipkan data ke set data BigQuery. Penulisan harus dilakukan di Ray Task.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Dengan keterangan:
- DATASET: Set data BigQuery. Harus dalam format
dataset.table
.