Usar o Ray na Vertex AI com o BigQuery

Ao executar um aplicativo do Ray na Vertex AI, é possível usar o BigQuery como banco de dados na nuvem. Esta seção aborda como ler e gravar em um banco de dados do BigQuery a partir do cluster do Ray na Vertex AI. As etapas desta seção pressupõem que você esteja usando o SDK da Vertex AI para Python.

Se você quiser ler informações de um conjunto de dados do BigQuery, crie um novo conjunto de dados do BigQuery ou use um atual.

Importar e inicializar o Ray no cliente da Vertex AI

Se você já estiver conectado ao cluster do Ray na Vertex AI, reinicie o kernel e execute o código a seguir. A variável runtime_env é necessária no momento da conexão, para executar comandos do BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

Ler dados do BigQuery

Leia os dados do conjunto de dados do BigQuery. A leitura precisa ser feita em uma tarefa do Ray (em inglês).

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Em que:

  • PROJECT_ID: ID do projeto do Google Cloud Você encontra o ID do projeto na página de boas-vindas do console do Google Cloud.

  • LOCATION: o local onde o Dataset é armazenado. Por exemplo, us-central1.

  • DATASET: conjunto de dados do BigQuery. Precisa estar no formato dataset.table. Defina como None se estiver fornecendo uma consulta.

  • PARALLELISM: um número inteiro que influencia quantas tarefas de leitura são criadas em paralelo. Pode haver menos streams de leitura criados do que solicitado.

  • QUERY: uma string contendo uma consulta SQL para leitura do banco de dados do BigQuery. Defina como None se nenhuma consulta for necessária.

Transformar dados

Atualize e exclua linhas e colunas das tabelas do BigQuery usando pyarrow ou pandas. Se você quiser usar transformações pandas, recomendamos manter o tipo de entrada como pyarrow e converter em pandas na função definida pelo usuário (UDF). Assim, será possível capturar erros de tipo de conversão pandas no UDF. A transformação precisa ser feita em uma tarefa do Ray.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Gravar dados no BigQuery

Insira dados no seu conjunto de dados do BigQuery. A gravação precisa ser feita em uma tarefa do Ray (em inglês).

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Em que:

  • DATASET: conjunto de dados do BigQuery. Precisa estar no formato dataset.table.

A seguir