Funciones adicionales de metastore de BigLake

Para personalizar la configuración de tu metastore de BigLake, puedes usar las siguientes funciones adicionales:

  • Procedimientos de Apache Spark Iceberg
  • Opción de filtro para tablas no admitidas
  • Sustituciones de conexiones de BigQuery
  • Políticas de control de acceso para tablas de Iceberg de metastore de BigLake

Usar procedimientos de Iceberg Spark

Para usar los procedimientos de Iceberg Spark, debes incluir las extensiones de Iceberg SQL en tu configuración de Spark. Por ejemplo, puedes crear un procedimiento para volver a un estado anterior.

Usar Spark-SQL interactivo para restaurar un estado anterior

Puedes usar un procedimiento de Iceberg Spark para crear, modificar y restaurar una tabla a su estado anterior. Por ejemplo:

  1. Crea una tabla de Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Haz los cambios siguientes:

    • CATALOG_NAME: el nombre del catálogo que hace referencia a tu tabla de Spark.
    • PROJECT_ID: el ID del Google Cloud proyecto.

    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Haz los cambios siguientes:

    • NAMESPACE_NAME: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: nombre de una tabla que hace referencia a tu tabla de Spark.

    La salida contiene detalles sobre la configuración de la tabla:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Vuelve a modificar la tabla y, a continuación, restaura la instantánea 1659239298328512231 que has creado anteriormente:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Haz los cambios siguientes:

    • SNAPSHOT_ID: el ID de la copia de seguridad a la que quieres volver.

    El resultado debería ser similar al siguiente:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrar tablas no admitidas de funciones de lista de tablas

Cuando usas Spark SQL con el catálogo de metastore de BigLake, el comando SHOW TABLES muestra todas las tablas del espacio de nombres especificado, incluso las que no son compatibles con Spark.

Para mostrar solo las tablas compatibles, activa la opción filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Haz los cambios siguientes:

  • CATALOG_NAME: nombre del catálogo de Spark que se va a usar.
  • PROJECT_ID: el ID del Google Cloud proyectoPROJECT_ID que se va a usar.
  • LOCATION: la ubicación de los recursos de BigQuery.
  • WAREHOUSE_DIRECTORY: la carpeta de Cloud Storage que se usará como almacén de datos.

Definir una anulación de conexión de BigQuery

Puedes usar conexiones de BigQuery para acceder a datos almacenados fuera de BigQuery, como en Cloud Storage.

Para definir una sustitución de conexión de BigQuery que proporcione acceso a un segmento de Cloud Storage, sigue estos pasos:

  1. En tu proyecto de BigQuery, crea una conexión con tu recurso de Cloud Storage. Esta conexión define cómo accede BigQuery a tus datos.

  2. Asigna el rol roles/bigquery.connectionUser a la cuenta de usuario o de servicio que acceda a los datos de la conexión.

    Asegúrese de que el recurso de conexión comparta la misma ubicación que los recursos de destino en BigQuery. Para obtener más información, consulta Gestionar conexiones.

  3. Especifica la conexión en tu tabla de Iceberg con la propiedad bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Haz los cambios siguientes:

    • TABLE_NAME: el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: el URI del segmento de Cloud Storage que almacena tus datos.
    • PROJECT_ID: el ID del Google Cloud proyectoPROJECT_ID que se va a usar.
    • LOCATION: la ubicación de la conexión.
    • CONNECTION_ID: el ID de la conexión.

Definir políticas de control de acceso

Puedes habilitar el control de acceso pormenorizado (FGAC) en las tablas Iceberg de BigLake metastore configurando políticas de control de acceso. Solo puedes definir políticas de control de acceso en tablas que usen una anulación de conexión de BigQuery. Puedes definir estas políticas de las siguientes formas:

Después de configurar las políticas de FGAC, puedes consultar la tabla desde Spark con el siguiente ejemplo:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Haz los cambios siguientes:

  • CATALOG_NAME: el nombre de tu catálogo.
  • PROJECT_ID: el ID del proyecto que contiene tus recursos de BigQuery.
  • LOCATION: la ubicación de los recursos de BigQuery.
  • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage que contiene tu almacén de datos.
  • MATERIALIZATION_NAMESPACE: el espacio de nombres en el que quieras almacenar los resultados temporales.
  • DATASET_NAME: el nombre del conjunto de datos que contiene la tabla que estás consultando.
  • ICEBERG_TABLE_NAME: el nombre de la tabla que estás consultando.

Siguientes pasos