Ao executar um aplicativo do Ray na Vertex AI, é possível usar o BigQuery como banco de dados na nuvem. Esta seção aborda como ler e gravar em um banco de dados do BigQuery a partir do cluster do Ray na Vertex AI. As etapas desta seção pressupõem que você esteja usando o SDK da Vertex AI para Python.
Se você quiser ler informações de um conjunto de dados do BigQuery, crie um novo conjunto de dados do BigQuery ou use um atual.
Importar e inicializar o Ray no cliente da Vertex AI
Se você já estiver conectado ao cluster do Ray na Vertex AI, reinicie o
kernel e execute o código a seguir. A variável runtime_env
é necessária no momento da conexão, para executar comandos do BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.33.0"] } ray.init(address=address, runtime_env=runtime_env)
Ler dados do BigQuery
Leia os dados do conjunto de dados do BigQuery. A leitura precisa ser feita em uma tarefa do Ray (em inglês).
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Em que:
PROJECT_ID: ID do projeto do Google Cloud Você encontra o ID do projeto na página de boas-vindas do console do Google Cloud.
LOCATION: o local onde o
Dataset
é armazenado. Por exemplo,us-central1
.DATASET: conjunto de dados do BigQuery. Precisa estar no formato
dataset.table
. Defina comoNone
se estiver fornecendo uma consulta.PARALLELISM: um número inteiro que influencia quantas tarefas de leitura são criadas em paralelo. Pode haver menos streams de leitura criados do que solicitado.
QUERY: uma string contendo uma consulta SQL para leitura do banco de dados do BigQuery. Defina como
None
se nenhuma consulta for necessária.
Transformar dados
Atualize e exclua linhas e colunas das tabelas do BigQuery usando pyarrow
ou pandas
. Se você quiser usar transformações pandas
, recomendamos manter o tipo de entrada como pyarrow e converter em pandas
na função definida pelo usuário (UDF). Assim, será possível capturar erros de tipo de conversão pandas
no UDF. A transformação precisa ser feita em uma tarefa do Ray.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Gravar dados no BigQuery
Insira dados no seu conjunto de dados do BigQuery. A gravação precisa ser feita em uma tarefa do Ray (em inglês).
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Em que:
- DATASET: conjunto de dados do BigQuery. Precisa estar no formato
dataset.table
.