Wenn Sie eine Ray-Anwendung in Vertex AI ausführen, können Sie BigQuery als Cloud-Datenbank verwenden. In diesem Abschnitt wird beschrieben, wie Sie über Ihren Ray-Cluster in Vertex AI aus einer BigQuery-Datenbank lesen und in diese schreiben. Bei den Schritten in diesem Abschnitt wird davon ausgegangen, dass Sie Vertex AI SDK für Python verwenden.
Wenn Sie aus einem BigQuery-Dataset lesen möchten, sollten Sie ein neues BigQuery-Dataset erstellen oder ein vorhandenes Dataset verwenden.
Ray on Vertex AI-Client importieren und initialisieren
Wenn Sie bereits mit Ihrem Ray on Vertex AI-Cluster verbunden sind, starten Sie den Kernel neu und führen Sie den folgenden Code aus. Die Variable runtime_env
ist beim Herstellen der Verbindung erforderlich, um BigQuery-Befehle auszuführen.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.9.3"] } ray.init(address=address, runtime_env=runtime_env)
Daten aus BigQuery lesen
Lesen Sie Daten aus Ihrem BigQuery-Dataset. Der Lesevorgang muss in einer Ray-Aufgabe durchgeführt werden.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Wobei:
PROJECT_ID: Google Cloud-Projekt-ID. Sie finden die Projekt-ID auf der Begrüßungsseite der Google Cloud Console.
LOCATION: Der Standort, an dem das
Dataset
gespeichert ist. Beispiel:us-central1
.DATASET: BigQuery-Dataset. Muss im Format
dataset.table
vorliegen. AufNone
gesetzt, wenn eine Abfrage bereitgestellt wird.PARALLELISM: Eine Ganzzahl, die beeinflusst, wie viele Leseaufgaben parallel erstellt werden. Es werden möglicherweise weniger Lesestreams erstellt, als Sie anfordern.
QUERY: Ein String mit einer SQL-Abfrage, die aus der BigQuery-Datenbank lesen soll. Auf
None
gesetzt, wenn keine Abfrage erforderlich ist.
Daten transformieren
Aktualisieren und löschen Sie Zeilen und Spalten aus Ihren BigQuery-Tabellen mit pyarrow
oder pandas
. Wenn Sie pandas
-Transformationen verwenden möchten, empfehlen wir, den Eingabetyp als pyarrow zu beizubehalten und innerhalb der benutzerdefinierten Funktion (UDF) in pandas
zu konvertieren, damit Sie pandas
-Konvertierungstypfehler innerhalb der UDF erkennen. Die Transformation muss in einer Ray-Aufgabe durchgeführt werden.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Daten in BigQuery schreiben
Fügen Sie Daten in Ihr BigQuery-Dataset ein. Der Schreibvorgang muss in einer Ray-Aufgabe durchgeführt werden.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Wobei:
- DATASET: BigQuery-Dataset. Muss im Format
dataset.table
vorliegen.