Utiliser le métastore BigQuery avec Spark dans BigQuery Studio

.

Ce document explique comment utiliser le métastore BigQuery avec Spark dans BigQuery Studio.

Vous pouvez utiliser Spark dans BigQuery Studio pour créer une table Iceberg avec Apache Spark dans BigQuery Studio. Une fois la table créée, vous pouvez interroger les données depuis Spark. Vous pouvez également interroger les mêmes données à partir de la console BigQuery à l'aide de SQL.

Avant de commencer

  1. Demandez l'accès à Spark dans BigQuery Studio via le formulaire d'inscription suivant.
  2. Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
  3. Activez les API BigQuery et Dataflow.

    Activer les API

  4. Facultatif: Découvrez le fonctionnement du métastore BigQuery et pourquoi vous devriez l'utiliser.

Rôles requis

Pour obtenir les autorisations nécessaires pour utiliser des notebooks Spark dans BigQuery Studio, demandez à votre administrateur de vous accorder les rôles IAM suivants:

  • Créez des tables de métastore BigQuery Studio dans Spark : Éditeur de données BigQuery (roles/bigquery.dataEditor) sur le projet
  • Créez une session Spark à partir des tables Metastore du notebook dans Spark : Dataproc Worker (roles/dataproc.serverlessEditor) sur le compte utilisateur.

Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Se connecter à un notebook

L'exemple suivant montre comment configurer un notebook Spark pour interagir avec les tables Iceberg stockées dans le métastore BigQuery.

Dans cet exemple, vous allez configurer une session Spark, créer un espace de noms et une table, ajouter des données à la table, puis interroger les données dans BigQuery Studio.

  1. Créez un notebook Spark dans BigQuery Studio.

  2. Dans le notebook Apache Spark, incluez les importations Apache Spark nécessaires:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. Définissez un catalogue, un espace de noms et un répertoire d'entrepôt.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • CATALOG_NAME: nom de catalogue pour faire référence à votre table Spark.
    • NAMESPACE_NAME: libellé d'espace de noms pour faire référence à votre table Spark.
    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
  4. Initialisez une session Spark.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    Remplacez les éléments suivants :

    • NETWORK_NAME: nom ou URI du réseau exécutant le code Spark. Si aucune valeur n'est spécifiée, le réseau default est utilisé.
    • PROJECT_ID: ID du projet Google Cloud qui exécute le code Spark.
    • LOCATION: emplacement dans lequel exécuter la tâche Spark.
  5. Créez un catalogue et un espace de noms.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Créer une table.

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Remplacez les éléments suivants :

    • TABLE_NAME: nom de votre table Iceberg.
  7. Exécutez un langage de manipulation de données (LMD) à partir de Spark.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Exécutez un langage de définition de données (LDD) à partir de Spark.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Insérer des données dans une table.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Interrogez la table à partir de Spark.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Interrogez le tableau à partir de la console Google Cloud dans un nouvel ensemble de données.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

Étape suivante