Outros recursos do metastore do BigLake

Para personalizar a configuração do BigLake Metastore, use estes recursos adicionais:

  • Procedimentos do Apache Spark Iceberg
  • A opção de filtro para tabelas sem suporte
  • Substituições de conexão do BigQuery
  • Políticas de controle de acesso para tabelas Iceberg do metastore do BigLake

Usar procedimentos do Iceberg Spark

Para usar procedimentos do Iceberg Spark, inclua extensões SQL do Iceberg na configuração do Spark. Por exemplo, você pode criar um procedimento para reverter a um estado anterior.

Usar o Spark SQL interativo para reverter a um estado anterior

É possível usar um procedimento do Iceberg Spark para criar, modificar e reverter uma tabela ao estado anterior. Exemplo:

  1. Crie uma tabela do Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Substitua:

    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • PROJECT_ID: o ID do projeto Google Cloud .

    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que seu data warehouse está armazenado.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Substitua:

    • NAMESPACE_NAME: o nome do namespace que referencia sua tabela do Spark.
    • TABLE_NAME: um nome de tabela que faz referência à sua tabela do Spark.

    A saída contém detalhes sobre a configuração da tabela:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Altere a tabela novamente e reverta para o snapshot 1659239298328512231 criado anteriormente:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Substitua:

    • SNAPSHOT_ID: o ID do snapshot para o qual você está revertendo.

    O resultado será assim:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrar tabelas não compatíveis das funções de listagem de tabelas

Ao usar o Spark SQL com o catálogo do BigLake Metastore, o comando SHOW TABLES mostra todas as tabelas no namespace especificado, mesmo aquelas que não são compatíveis com o Spark.

Para mostrar apenas as tabelas compatíveis, ative a opção filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Substitua:

  • CATALOG_NAME: o nome do catálogo do Spark a ser usado.
  • PROJECT_ID: o ID do projeto Google Cloud a ser usado.
  • LOCATION: o local dos recursos do BigQuery.
  • WAREHOUSE_DIRECTORY: a pasta do Cloud Storage a ser usada como data warehouse.

Definir uma substituição de conexão do BigQuery

É possível usar as conexões do BigQuery para acessar dados armazenados fora do BigQuery, como no Cloud Storage.

Para definir uma substituição de conexão do BigQuery que forneça acesso a um bucket do Cloud Storage, siga estas etapas:

  1. No projeto do BigQuery, crie uma conexão com o recurso do Cloud Storage. Essa conexão define como o BigQuery acessa seus dados.

  2. Conceda ao usuário ou à conta de serviço que acessa os dados a função roles/bigquery.connectionUser na conexão.

    Verifique se o recurso de conexão compartilha o mesmo local que os recursos de destino no BigQuery. Para mais informações, consulte Gerenciar conexões.

  3. Especifique a conexão na sua tabela do Iceberg com a propriedade bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Substitua:

    • TABLE_NAME: um nome para sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI do bucket do Cloud Storage que armazena seus dados.
    • PROJECT_ID: o ID do projeto Google Cloud a ser usado.
    • LOCATION: o local da conexão.
    • CONNECTION_ID: o ID da conexão.

Definir políticas de controle de acesso

É possível ativar o controle de acesso refinado (FGAC) em tabelas do Iceberg da metastore do BigLake configurando políticas de controle de acesso. Só é possível definir políticas de controle de acesso em tabelas que usam uma substituição de conexão do BigQuery. É possível definir essas políticas das seguintes maneiras:

Depois de configurar as políticas de FGAC, é possível consultar a tabela do Spark usando o exemplo a seguir:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Substitua:

  • CATALOG_NAME: o nome do catálogo.
  • PROJECT_ID: o ID do projeto que contém seus recursos do BigQuery.
  • LOCATION: o local dos recursos do BigQuery.
  • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage que contém seu data warehouse.
  • MATERIALIZATION_NAMESPACE: o namespace em que você quer armazenar resultados temporários.
  • DATASET_NAME: o nome do conjunto de dados que contém a tabela que você está consultando.
  • ICEBERG_TABLE_NAME: o nome da tabela que você está consultando.

A seguir