Lorsque vous exécutez une application Ray sur Vertex AI, vous pouvez utiliser BigQuery comme base de données cloud. Cette section explique comment lire et écrire dans une base de données BigQuery à partir de votre cluster Ray sur Vertex AI. Les étapes de cette section supposent que vous utilisez le SDK Vertex AI pour Python.
Si vous souhaitez lire les données d'un ensemble de données BigQuery, vous devez créer un ensemble de données BigQuery ou utiliser un ensemble de données existant.
Importer et initialiser le client Ray sur Vertex AI
Si vous êtes déjà connecté à votre cluster Ray sur Vertex AI, redémarrez votre kernel et exécutez le code suivant. La variable runtime_env
est nécessaire au moment de la connexion pour exécuter les commandes BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.9.3"] } ray.init(address=address, runtime_env=runtime_env)
Lire des données de BigQuery
Lisez les données depuis votre ensemble de données BigQuery La lecture doit être effectuée dans une tâche Ray.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Où :
PROJECT_ID : ID de projet Google Cloud. Vous pouvez trouver l'ID du projet sur la page Accueil de la console Google Cloud.
LOCATION : emplacement de stockage de
Dataset
. Exemple :us-central1
.DATASET : ensemble de données BigQuery Doit être au format
dataset.table
. Défini surNone
si vous fournissez une requête.PARALLELISM : entier qui influe sur le nombre de tâches de lecture créées en parallèle. Il se peut que le nombre de flux de lecture créés soit inférieur à celui demandé.
QUERY : chaîne contenant une requête SQL à lire à partir de la base de données BigQuery. Définie sur
None
si aucune requête n'est nécessaire.
Transformer les données
Mettez à jour et supprimez des lignes et des colonnes de vos tables BigQuery à l'aide de pyarrow
ou pandas
. Si vous souhaitez utiliser des transformations pandas
, il est recommandé de conserver le type d'entrée pyarrow et de le convertir en pandas
dans la fonction définie par l'utilisateur (UDF) afin de pouvoir détecter les erreurs de type de conversion pandas
dans l'UDF. La transformation doit être effectuée dans une tâche Ray.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Écrire des données dans BigQuery
Insérez des données dans votre ensemble de données BigQuery. L'écriture doit être effectuée dans une tâche Ray.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Où :
- DATASET : ensemble de données BigQuery Doit être au format
dataset.table
.