Fonctionnalités supplémentaires du métastore BigQuery

.

Pour personnaliser la configuration de votre métastore BigQuery, vous pouvez utiliser les fonctionnalités supplémentaires suivantes:

  • Procédures Apache Spark Iceberg
  • Option de filtre pour les tableaux non compatibles
  • Forcer une connexion BigQuery

Utiliser des procédures Spark Iceberg

Pour utiliser les procédures Spark Iceberg, vous devez inclure les extensions SQL Iceberg dans votre configuration Spark. Par exemple, vous pouvez créer une procédure pour revenir à un état précédent.

Utiliser Spark-SQL interactif pour revenir à un état précédent

Vous pouvez utiliser une procédure Iceberg Spark pour créer, modifier et rétablir l'état précédent d'une table. Exemple :

  1. Créez une table Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Remplacez les éléments suivants :

    • CATALOG_NAME: nom du catalogue qui fait référence à votre table Spark.
    • PROJECT_ID: ID du projet Google Cloud .

    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Remplacez les éléments suivants :

    • NAMESPACE_NAME: nom de l'espace de noms qui fait référence à votre table Spark.
    • TABLE_NAME: nom de table qui fait référence à votre table Spark.

    La sortie contient des informations sur la configuration de la table:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Modifiez à nouveau la table, puis restaurez-la à l'instantané 1659239298328512231 créé précédemment:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot(NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Remplacez les éléments suivants :

    • SNAPSHOT_ID: ID de l'instantané auquel vous effectuez la restauration.

    Le résultat ressemble à ce qui suit :

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrer les tables non compatibles à partir des fonctions de liste de tables

Lorsque vous utilisez Spark SQL avec le catalogue de métadonnées BigQuery, la commande SHOW TABLES affiche toutes les tables de l'espace de noms spécifié, même celles qui ne sont pas compatibles avec Spark.

Pour n'afficher que les tables compatibles, activez l'option filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Remplacez les éléments suivants :

  • CATALOG_NAME: nom du catalogue Spark à utiliser.
  • PROJECT_ID: ID du projet Google Cloud à utiliser.
  • LOCATION: emplacement des ressources BigQuery.
  • WAREHOUSE_DIRECTORY: dossier Cloud Storage à utiliser comme entrepôt de données.

Définir un forçage de connexion BigQuery

Vous pouvez utiliser des connexions BigQuery pour accéder aux données stockées en dehors de BigQuery, par exemple dans Cloud Storage.

Pour définir un forçage de connexion BigQuery qui permet d'accéder à un bucket Cloud Storage, procédez comme suit:

  1. Dans votre projet BigQuery, créez une connexion à votre ressource Cloud Storage. Cette connexion définit la façon dont BigQuery accède à vos données.

  2. Attribuez au compte utilisateur ou au compte de service accédant aux données le rôle roles/bigquery.connectionUser sur la connexion.

    Assurez-vous que la ressource de connexion partage le même emplacement que les ressources cibles dans BigQuery. Pour en savoir plus, consultez la page Gérer les connexions.

  3. Spécifiez la connexion dans votre table Iceberg avec la propriété bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Remplacez les éléments suivants :

    • TABLE_NAME: nom de la table Spark.
    • WAREHOUSE_DIRECTORY: URI du bucket Cloud Storage qui stocke vos données.
    • PROJECT_ID: ID du projet Google Cloud à utiliser.
    • LOCATION: emplacement de la connexion.
    • CONNECTION_ID : ID de la connexion.

Étape suivante