Usar a metastore do BigQuery com tabelas no BigQuery

Este documento explica como usar a metastore do BigQuery com tabelas do BigQuery e o Spark.

Com a metastore do BigQuery, é possível criar e usar tabelas padrão (integradas), tabelas do BigQuery para o Apache Iceberg e tabelas externas do BigLake no BigQuery.

Antes de começar

  1. Ative o faturamento para o projeto do Google Cloud. Saiba como verificar se o faturamento está ativado em um projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ativar as APIs

  3. Opcional: entenda como a metastore do BigQuery funciona e por que você deve usá-la.

  4. Opcional: se você estiver trabalhando com o Iceberg, use o seguinte para comparar as tabelas do Iceberg da metastore do BigQuery com outras variedades de tabelas semelhantes no BigQuery:

Tabelas padrão do BigQuery Tabelas externas do BigLake Tabelas externas do BigLake para o Apache Iceberg (também conhecidas como tabelas do BigLake Iceberg) Tabelas Iceberg da metastore do BigQuery (pré-lançamento) Tabelas do BigQuery para o Apache Iceberg (também conhecidas como tabelas gerenciadas do Iceberg / tabelas do BigQuery Iceberg) (pré-lançamento)
Principais recursos Experiência totalmente gerenciada Governado (controle de acesso refinado) e funcional em mecanismos do BigQuery e de código aberto Recursos de tabela externa do BigLake + consistência de dados, atualizações de esquema. Não é possível criar no Spark ou em outros mecanismos abertos. Recursos da tabela BigLake Iceberg + mutável de mecanismos externos. Não é possível criar com DDL ou a ferramenta de linha de comando bq. Recursos da tabela BigLake Iceberg + sobrecarga de gerenciamento baixa com dados e metadados abertos
Armazenamento de dados Armazenamento gerenciado do BigQuery Abrir dados formatados hospedados em buckets gerenciados pelo usuário
Modelo aberto API BigQuery Storage Read (por conectores) Formatos de arquivo abertos (Parquet) Bibliotecas abertas (Iceberg) Compatível com código aberto (snapshots de metadados do Iceberg)
Governança Governança unificada do BigQuery
Gravações (DML e streaming) Por meio de conectores do BigQuery, APIs, DML de alta taxa de transferência, CDC Gravações apenas em mecanismos externos Por meio de conectores do BigQuery, APIs, DML de alta taxa de transferência, CDC

Funções exigidas

Para receber as permissões necessárias para usar o Spark e o Dataproc com a metastore do BigQuery como uma loja de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Conectar-se a uma tabela

  1. Crie um conjunto de dados no console do Google Cloud.

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Substitua:

    • PROJECT_ID: o ID do projeto Google Cloud para criar o conjunto de dados.
    • DATASET_NAME: um nome para o conjunto de dados.
  2. Crie uma Conexão de recursos do Cloud.

  3. Crie uma tabela padrão do BigQuery.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Substitua:

    • TABLE_NAME: um nome para a tabela.
  4. Insira dados na tabela padrão do BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Crie uma tabela do BigQuery para o Apache Iceberg.

    Por exemplo, para criar uma tabela, execute a instrução CREATE a seguir.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Substitua:

    • ICEBERG_TABLE_NAME: um nome para a tabela Iceberg. Por exemplo, iceberg_managed_table.
    • CONNECTION_NAME: o nome da conexão. Você criou isso na etapa anterior. Por exemplo, myproject.us.myconnection.
    • STORAGE_URI: um URI do Cloud Storage totalmente qualificado. Por exemplo, gs://mybucket/table.
  6. Insira dados na tabela do BigQuery para o Apache Iceberg.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Crie uma tabela Iceberg somente leitura.

    Por exemplo, para criar uma tabela Iceberg somente leitura, execute a instrução CREATE a seguir.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Substitua:

    • READONLY_ICEBERG_TABLE_NAME: um nome para a tabela somente leitura.
    • BUCKET_PATH: o caminho para o bucket do Cloud Storage que contém os dados da tabela externa, no formato ['gs://bucket_name/[folder_name/]file_name'].
  8. No PySpark, consulte a tabela padrão, a tabela Iceberg gerenciada e a tabela Iceberg somente leitura.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Substitua:

    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage que contém seu data warehouse.
    • CATALOG_NAME: o nome do catálogo que você está usando.
    • MATERIALIZATION_NAMESPACE: o namespace para armazenar resultados temporários.
  9. Execute o script PySpark usando o Spark sem servidor.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar

    Substitua:

    • SCRIPT_PATH: o caminho para o script usado pelo job em lote.
    • PROJECT_ID: o ID do projeto do Google Cloud em que o job em lote será executado.
    • REGION: a região em que a carga de trabalho é executada.
    • YOUR_BUCKET: o local do bucket do Cloud Storage para fazer upload das dependências de carga de trabalho. O prefixo de URI gs:// do bucket não é obrigatório. É possível especificar o caminho ou o nome do bucket, por exemplo, mybucketname1.

A seguir