Utiliser le métastore BigQuery avec des tables dans BigQuery

.

Ce document explique comment utiliser le métastore BigQuery avec les tables BigQuery et Spark.

Avec le métastore BigQuery, vous pouvez créer et utiliser des tables standards (intégrées), des tables BigQuery pour Apache Spark Iceberg et des tables externes BigLake à partir de BigQuery.

Avant de commencer

  1. Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
  2. Activez les API BigQuery et Dataproc.

    Activer les API

  3. Facultatif: Découvrez le fonctionnement du métastore BigQuery et pourquoi vous devriez l'utiliser.

Rôles requis

Pour obtenir les autorisations nécessaires pour utiliser Spark et Dataproc avec le métastore BigQuery en tant que magasin de métadonnées, demandez à votre administrateur de vous accorder les rôles IAM suivants:

Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Connexion à une table

  1. Créez un ensemble de données dans la console Google Cloud .

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Remplacez les éléments suivants :

    • PROJECT_ID: ID du projet Google Cloud à partir duquel créer l'ensemble de données.
    • DATASET_NAME: nom de votre ensemble de données.
  2. Créez une connexion de ressource cloud.

  3. Créez une table BigQuery standard.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Remplacez les éléments suivants :

    • TABLE_NAME: nom de votre table.
  4. Insérer des données dans la table BigQuery standard

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Créez une table BigQuery pour Apache Iceberg.

    Par exemple, pour créer une table, exécutez l'instruction CREATE suivante.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Remplacez les éléments suivants :

    • ICEBERG_TABLE_NAME: nom de votre table Iceberg. Exemple :iceberg_managed_table
    • CONNECTION_NAME: nom de votre connexion. Vous l'avez créé à l'étape précédente. Exemple :myproject.us.myconnection
    • STORAGE_URI: URI Cloud Storage complet. Exemple :gs://mybucket/table
  6. Insérer des données dans la table BigQuery pour Apache Iceberg

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Créez une table Iceberg en lecture seule.

    Par exemple, pour créer une table Iceberg en lecture seule, exécutez l'instruction CREATE suivante.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Remplacez les éléments suivants :

    • READONLY_ICEBERG_TABLE_NAME: nom de votre table en lecture seule.
    • BUCKET_PATH: chemin d'accès au bucket Cloud Storage contenant les données de la table externe, au format ['gs://bucket_name/[folder_name/]file_name'].
  8. Dans PySpark, interrogez la table standard, la table Iceberg gérée et la table Iceberg en lecture seule.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Remplacez les éléments suivants :

    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage contenant votre entrepôt de données.
    • CATALOG_NAME: nom du catalogue que vous utilisez.
    • MATERIALIZATION_NAMESPACE: espace de noms pour le stockage des résultats temporaires.
  9. Exécutez le script PySpark à l'aide de Spark sans serveur.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    Remplacez les éléments suivants :

    • SCRIPT_PATH: chemin d'accès au script utilisé par la job par lot.
    • PROJECT_ID: ID du projet Google Cloud dans lequel exécuter la job par lot.
    • REGION: région dans laquelle votre charge de travail s'exécute.
    • YOUR_BUCKET: emplacement du bucket Cloud Storage pour importer les dépendances de la charge de travail. Le préfixe d'URI gs:// du compartiment n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemple mybucketname1.

Étape suivante