Utiliser Ray sur Vertex AI avec BigQuery

Lorsque vous exécutez une application Ray sur Vertex AI, vous pouvez utiliser BigQuery comme base de données cloud. Cette section explique comment lire et écrire dans une base de données BigQuery à partir de votre cluster Ray sur Vertex AI. Les étapes de cette section supposent que vous utilisez le SDK Vertex AI pour Python.

Si vous souhaitez lire les données d'un ensemble de données BigQuery, vous devez créer un ensemble de données BigQuery ou utiliser un ensemble de données existant.

Importer et initialiser le client Ray sur Vertex AI

Si vous êtes déjà connecté à votre cluster Ray sur Vertex AI, redémarrez votre kernel et exécutez le code suivant. La variable runtime_env est nécessaire au moment de la connexion pour exécuter les commandes BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

Lire des données de BigQuery

Lisez les données depuis votre ensemble de données BigQuery La lecture doit être effectuée dans une tâche Ray.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Où :

  • PROJECT_ID : ID de projet Google Cloud. Vous pouvez trouver l'ID du projet sur la page Accueil de la console Google Cloud.

  • LOCATION : emplacement de stockage de Dataset. Exemple : us-central1.

  • DATASET : ensemble de données BigQuery Doit être au format dataset.table. Défini sur None si vous fournissez une requête.

  • PARALLELISM : entier qui influe sur le nombre de tâches de lecture créées en parallèle. Il se peut que le nombre de flux de lecture créés soit inférieur à celui demandé.

  • QUERY : chaîne contenant une requête SQL à lire à partir de la base de données BigQuery. Définie sur None si aucune requête n'est nécessaire.

Transformer les données

Mettez à jour et supprimez des lignes et des colonnes de vos tables BigQuery à l'aide de pyarrow ou pandas. Si vous souhaitez utiliser des transformations pandas, il est recommandé de conserver le type d'entrée pyarrow et de le convertir en pandas dans la fonction définie par l'utilisateur (UDF) afin de pouvoir détecter les erreurs de type de conversion pandas dans l'UDF. La transformation doit être effectuée dans une tâche Ray.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Écrire des données dans BigQuery

Insérez des données dans votre ensemble de données BigQuery. L'écriture doit être effectuée dans une tâche Ray.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Où :

  • DATASET : ensemble de données BigQuery Doit être au format dataset.table.

Étapes suivantes