Vertex AI에서 Ray 애플리케이션을 실행할 때 BigQuery를 클라우드 데이터베이스로 사용할 수 있습니다. 이 섹션에서는 Vertex AI의 Ray 클러스터에서 BigQuery 데이터베이스를 읽고 쓰는 방법을 설명합니다.
이 섹션의 단계에서는 Vertex AI SDK for Python을 사용한다고 가정합니다.
Vertex AI의 Ray 클러스터에 이미 연결되어 있으면 커널을 다시 시작하고 다음 코드를 실행합니다. runtime_env 변수는 연결 시 BigQuery 명령어를 실행하기 위해 필요합니다.
importrayfromgoogle.cloudimportaiplatform# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.address='vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)runtime_env={"pip":["google-cloud-aiplatform[ray]","ray==2.42.0"]}ray.init(address=address,runtime_env=runtime_env)
BigQuery에서 데이터 읽기
BigQuery 데이터 세트에서 데이터를 읽습니다. 읽기는 Ray Task에서 수행해야 합니다.
PARALLELISM: 동시에 생성되는 읽기 태스크 수에 영향을 미치는 정수. 읽기 스트림이 요청한 것보다 적게 생성될 수 있습니다.
QUERY: BigQuery 데이터베이스에서 읽을 SQL 쿼리가 포함된 문자열. 쿼리가 필요하지 않으면 None으로 설정합니다.
데이터 변환
pyarrow 또는 pandas를 사용하여 BigQuery 테이블에서 행과 열을 업데이트하고 삭제합니다. pandas 변환을 사용하려면 입력 유형을 pyarrow로 유지하고 사용자 정의 함수(UDF) 내에서 pandas 변환 유형 오류를 포착할 수 있도록 UDF 내에서 pandas로 변환하는 것이 좋습니다. 변환은 Ray Task에서 수행해야 합니다.
@ray.remotedefrun_remotely():# BigQuery Read firstimportpandasaspdimportpyarrowaspadeffilter_batch(table:pa.Table)->pa.Table:df=table.to_pandas(types_mapper={pa.int64():pd.Int64Dtype()}.get)# PANDAS_TRANSFORMATIONS_HEREreturnpa.Table.from_pandas(df)ds=ds.map_batches(filter_batch,batch_format="pyarrow").random_shuffle()ds.materialize()# You can repartition before writing to determine the number of write blocksds=ds.repartition(4)ds.materialize()
BigQuery에 데이터 쓰기
BigQuery 데이터 세트에 데이터를 삽입합니다. 쓰기는 Ray Task에서 수행해야 합니다.
@ray.remotedefrun_remotely():# BigQuery Read and optional data transformation firstdataset=DATASETvertex_ray.data.write_bigquery(ds,dataset=dataset)
각 항목의 의미는 다음과 같습니다.
DATASET: BigQuery 데이터 세트. dataset.table 형식이어야 합니다.
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["이해하기 어려움","hardToUnderstand","thumb-down"],["잘못된 정보 또는 샘플 코드","incorrectInformationOrSampleCode","thumb-down"],["필요한 정보/샘플이 없음","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],["최종 업데이트: 2025-09-04(UTC)"],[],[],null,["# Use Ray on Vertex AI with BigQuery\n\nWhen you run a Ray application on Vertex AI, use\n[BigQuery](/bigquery/docs/introduction) as your cloud database. This\nsection covers how to read from and write to a BigQuery database from\nyour on Vertex AI.\nThe steps in this section assume that you use\nthe Vertex AI SDK for Python.\n\nTo read from a BigQuery dataset, [create a new\nBigQuery dataset](/bigquery/docs/datasets) or use an existing dataset.\n\nImport and initialize Ray on Vertex AI client\n---------------------------------------------\n\nIf you're connected to your Ray cluster on Vertex AI, restart your\nkernel and run the following code. The `runtime_env` variable is necessary at\nconnection time to run BigQuery commands. \n\n```python\nimport ray\nfrom google.cloud import aiplatform\n\n# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.\naddress = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)\n\nruntime_env = {\n \"pip\":\n [\"google-cloud-aiplatform[ray]\",\"ray==2.47.1\"]\n }\n\nray.init(address=address, runtime_env=runtime_env)\n```\n\nRead data from BigQuery\n-----------------------\n\nRead data from your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform\nthe read operation.\n**Note:** The maximum query response size is 10 GB. \n\n```python\naiplatform.init(project=PROJECT_ID, location=LOCATION)\n\n@ray.remote\ndef run_remotely():\n import vertex_ray\n dataset = DATASET\n parallelism = PARALLELISM\n query = QUERY\n\n ds = vertex_ray.data.read_bigquery(\n dataset=dataset,\n parallelism=parallelism,\n query=query\n )\n ds.materialize()\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e** : Google Cloud project ID. Find the project ID\n in the Google Cloud console [welcome](https://console.cloud.google.com/welcome)\n page.\n\n- **\u003cvar translate=\"no\"\u003eLOCATION\u003c/var\u003e** : The location where the `Dataset` is stored. For example,\n `us-central1`.\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. It must be in the format `dataset.table`.\n Set to `None` if you provide a query.\n\n- **\u003cvar translate=\"no\"\u003ePARALLELISM\u003c/var\u003e**: An integer that influences how many read tasks are\n created in parallel. There may be fewer read streams created than you\n requested.\n\n- **\u003cvar translate=\"no\"\u003eQUERY\u003c/var\u003e** : A string containing a SQL query to read from BigQuery database. Set to `None` if no query is required.\n\nTransform data\n--------------\n\nUpdate and delete rows and columns from your BigQuery tables using\n`pyarrow` or `pandas`. If you want to use `pandas` transformations,\nkeep the input type as pyarrow and convert to `pandas`\nwithin the user-defined function (UDF) so you can catch any `pandas` conversion\ntype errors within the UDF. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the transformation. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read first\n import pandas as pd\n import pyarrow as pa\n\n def filter_batch(table: pa.Table) -\u003e pa.Table:\n df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)\n # PANDAS_TRANSFORMATIONS_HERE\n return pa.Table.from_pandas(df)\n\n ds = ds.map_batches(filter_batch, batch_format=\"pyarrow\").random_shuffle()\n ds.materialize()\n\n # You can repartition before writing to determine the number of write blocks\n ds = ds.repartition(4)\n ds.materialize()\n```\n\nWrite data to BigQuery\n----------------------\n\nInsert data to your BigQuery dataset. A\n[Ray Task](https://docs.ray.io/en/latest/ray-core/tasks.html) must perform the write. \n\n```python\n@ray.remote\ndef run_remotely():\n # BigQuery Read and optional data transformation first\n dataset=DATASET\n vertex_ray.data.write_bigquery(\n ds,\n dataset=dataset\n )\n```\n\nWhere:\n\n- **\u003cvar translate=\"no\"\u003eDATASET\u003c/var\u003e** : BigQuery dataset. The dataset must be in the format `dataset.table`.\n\nWhat's next\n-----------\n\n- [Deploy a model on Vertex AI\n and get predictions](/vertex-ai/docs/open-source/ray-on-vertex-ai/deploy-predict)\n\n- [View logs for your Ray cluster on Vertex AI](/vertex-ai/docs/open-source/ray-on-vertex-ai/view-logs)\n\n- [Delete a Ray cluster](/vertex-ai/docs/open-source/ray-on-vertex-ai/delete-cluster)"]]