BigQuery-Metastore mit Spark in BigQuery Studio verwenden
In diesem Dokument wird erläutert, wie Sie den BigQuery-Metastore mit Spark in BigQuery Studio verwenden.
Sie können Spark in BigQuery Studio verwenden, um eine Iceberg-Tabelle mit Apache Spark in BigQuery Studio zu erstellen. Nachdem Sie die Tabelle erstellt haben, können Sie die Daten aus Spark abfragen. Sie können dieselben Daten auch über die BigQuery-Konsole mit SQL abfragen.
Hinweise
- Über das folgende Registrierungsformular können Sie Zugriff auf Spark in BigQuery Studio anfordern.
- Aktivieren Sie die Abrechnung für Ihr Google Cloud -Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist
Aktivieren Sie die BigQuery- und Dataflow APIs.
Optional: Informationen zur Funktionsweise des BigQuery-Metaspeichers und zu den Vorteilen seiner Verwendung
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Verwenden von Spark-Notebooks in BigQuery Studio benötigen:
-
BigQuery Studio-Metastore-Tabellen in Spark erstellen:
BigQuery-Dateneditor (
roles/bigquery.dataEditor
) für das Projekt -
Erstellen Sie eine Spark-Sitzung aus den Metastore-Tabellen des Notebooks in Spark:
Dataproc-Worker (
roles/dataproc.serverlessEditor
) im Nutzerkonto
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Verbindung mit einem Notebook herstellen
Im folgenden Beispiel wird gezeigt, wie Sie ein Spark-Notebook für die Interaktion mit Iceberg-Tabellen konfigurieren, die im BigQuery-Metastore gespeichert sind.
In diesem Beispiel richten Sie eine Spark-Sitzung ein, erstellen einen Namensbereich und eine Tabelle, fügen der Tabelle einige Daten hinzu und fragen die Daten dann in BigQuery Studio ab.
Erstellen Sie ein Spark-Notebook in BigQuery Studio.
Fügen Sie dem Apache Spark-Notebook die erforderlichen Apache Spark-Importe hinzu:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
Definieren Sie einen Katalog, einen Namespace und ein Warehouse-Verzeichnis.
catalog = "CATALOG_NAME" namespace = "NAMESPACE_NAME" warehouse_dir = "gs://WAREHOUSE_DIRECTORY"
Ersetzen Sie Folgendes:
CATALOG_NAME
: Katalogname, der auf Ihre Spark-Tabelle verweist.NAMESPACE_NAME
: ein Namespace-Label, um auf Ihre Spark-Tabelle zu verweisen.WAREHOUSE_DIRECTORY
: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
Initialisieren Sie eine Spark-Sitzung.
session.environment_config.execution_config.network_uri = NETWORK_NAME session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())
Ersetzen Sie Folgendes:
NETWORK_NAME
: der Name oder URI des Netzwerks, in dem der Spark-Code ausgeführt wird. Wenn nichts angegeben ist, wird das Netzwerkdefault
verwendet.PROJECT_ID
: die ID des Google Cloud -Projekts, in dem der Spark-Code ausgeführt wird.LOCATION
: den Speicherort, an dem der Spark-Job ausgeführt werden soll.
Erstellen Sie einen Katalog und einen Namespace.
spark.sql(f"USE `CATALOG_NAME`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;") spark.sql(f"USE `NAMESPACE_NAME`;")
Eine Tabelle erstellen
spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME ;")
Ersetzen Sie Folgendes:
TABLE_NAME
: Name der Iceberg-Tabelle.
DML-Anweisungen (Data Manipulation Language) über Spark ausführen
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Eine DDL-Anweisung (Data Definition Language, Datendefinitionssprache) aus Spark ausführen
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBE TABLE_NAME ;")
Fügen Sie Daten in die Tabelle ein.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")
Tabelle in Spark abfragen
df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Stellen Sie eine Abfrage für die Tabelle aus der Google Cloud Console in einem neuen Datensatz.
SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100