Utiliser le métastore BigQuery avec Spark dans BigQuery Studio
.Ce document explique comment utiliser le métastore BigQuery avec Spark dans BigQuery Studio.
Vous pouvez utiliser Spark dans BigQuery Studio pour créer une table Iceberg avec Apache Spark dans BigQuery Studio. Une fois la table créée, vous pouvez interroger les données à partir de Spark. Vous pouvez également interroger les mêmes données à partir de la console BigQuery à l'aide de SQL.
Avant de commencer
- Demandez l'accès à Spark dans BigQuery Studio via le formulaire d'inscription suivant.
- Activer la facturation pour votre projet Google Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.
Activez les API BigQuery et Dataflow.
Facultatif: Découvrez le fonctionnement du métastore BigQuery et pourquoi vous devriez l'utiliser.
Rôles requis
Pour obtenir les autorisations nécessaires pour utiliser les notebooks Spark dans BigQuery Studio, demandez à votre administrateur de vous accorder les rôles IAM suivants:
-
Créer des tables Metastore BigQuery Studio dans Spark :
Éditeur de données BigQuery (
roles/bigquery.dataEditor
) sur le projet -
Créez une session Spark à partir des tables de métastore de notebook dans Spark :
Dataproc Worker (
roles/dataproc.serverlessEditor
) sur le compte utilisateur.
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
Se connecter avec un notebook
L'exemple suivant montre comment configurer un notebook Spark pour interagir avec les tables Iceberg stockées dans le métastore BigQuery.
Dans cet exemple, vous configurez une session Spark, créez un espace de noms et une table, ajoutez des données à la table, puis interrogez les données dans BigQuery Studio.
Créez un notebook Spark dans BigQuery Studio.
Dans le notebook Apache Spark, incluez les importations Apache Spark nécessaires:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
Définissez un catalogue, un espace de noms et un répertoire d'entrepôt.
catalog = "CATALOG_NAME" namespace = "NAMESPACE_NAME" warehouse_dir = "gs://WAREHOUSE_DIRECTORY"
Remplacez les éléments suivants :
CATALOG_NAME
: nom de catalogue permettant de référencer votre table Spark.NAMESPACE_NAME
: libellé d'espace de noms permettant de faire référence à votre table Spark.WAREHOUSE_DIRECTORY
: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
Initialisez une session Spark.
session.environment_config.execution_config.network_uri = NETWORK_NAME session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())
Remplacez les éléments suivants :
NETWORK_NAME
: nom ou URI du réseau exécutant le code Spark. S'il n'est pas spécifié, le réseaudefault
est utilisé.PROJECT_ID
: ID du projet Google Cloud qui exécute le code Spark.LOCATION
: emplacement dans lequel exécuter la tâche Spark.
Créez un catalogue et un espace de noms.
spark.sql(f"USE `CATALOG_NAME`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;") spark.sql(f"USE `NAMESPACE_NAME`;")
Créer une table.
spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME ;")
Remplacez les éléments suivants :
TABLE_NAME
: nom de votre table Iceberg.
Exécutez un langage de manipulation de données (LMD) à partir de Spark.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Exécuter un langage de définition de données (LDD) à partir de Spark
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBE TABLE_NAME ;")
Insérez des données dans la table.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")
Interrogez la table à partir de Spark.
df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Interrogez la table depuis la console Google Cloud dans un nouvel ensemble de données.
SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100
Étape suivante
- Configurez les fonctionnalités facultatives de BigQuery Metastore.
- Interrogez les tables du métastore dans BigQuery.