Utilizzare il metastore BigQuery con le tabelle in BigQuery

Questo documento spiega come utilizzare il metastore BigQuery con le tabelle BigQuery e Spark.

Con il metastore BigQuery puoi creare e utilizzare tabelle standard (integrate), tabelle BigQuery per Apache Spark Iceberg e tabelle esterne BigLake da BigQuery.

Prima di iniziare

  1. Attiva la fatturazione per il tuo progetto Google Cloud . Scopri come controllare se la fatturazione è attivata in un progetto.
  2. Abilita le API BigQuery e Dataproc.

    Abilita le API

  3. (Facoltativo) Scopri come funziona il metastore BigQuery e perché dovresti utilizzarlo.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per utilizzare Spark e Dataproc con BigQuery Metastore come metastore, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Connettiti a una tabella

  1. Crea un set di dati nella console Google Cloud .

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud per creare il set di dati.
    • DATASET_NAME: un nome per il set di dati.
  2. Crea una connessione a una risorsa cloud.

  3. Crea una tabella BigQuery standard.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Sostituisci quanto segue:

    • TABLE_NAME: un nome per la tabella.
  4. Inserisci i dati nella tabella BigQuery standard.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Crea una tabella BigQuery per Apache Iceberg.

    Ad esempio, per creare una tabella, esegui il seguente statement CREATE.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Sostituisci quanto segue:

    • ICEBERG_TABLE_NAME: un nome per la tabella Iceberg. Ad esempio, iceberg_managed_table.
    • CONNECTION_NAME: il nome della connessione. Lo hai creato nel passaggio precedente. Ad esempio, myproject.us.myconnection.
    • STORAGE_URI: un URI Cloud Storage completo. Ad esempio, gs://mybucket/table.
  6. Inserisci i dati nella tabella BigQuery per Apache Iceberg.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Crea una tabella Iceberg di sola lettura.

    Ad esempio, per creare una tabella Iceberg di sola lettura, esegui la seguente istruzione CREATE.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Sostituisci quanto segue:

    • READONLY_ICEBERG_TABLE_NAME: un nome per la tabella di sola lettura.
    • BUCKET_PATH: il percorso del bucket Cloud Storage che contiene i dati per la tabella esterna, nel formato['gs://bucket_name/[folder_name/]file_name'].
  8. Da PySpark, esegui query sulla tabella standard, sulla tabella Iceberg gestita e sulla tabella Iceberg di sola lettura.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Sostituisci quanto segue:

    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage contenente il data warehouse.
    • CATALOG_NAME: il nome del catalogo che stai utilizzando.
    • MATERIALIZATION_NAMESPACE: lo spazio dei nomi per memorizzare i risultati temporanei.
  9. Esegui lo script PySpark utilizzando Spark serverless.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    Sostituisci quanto segue:

    • SCRIPT_PATH: il percorso dello script utilizzato dal job batch.
    • PROJECT_ID: l'ID del progetto Google Cloud in cui eseguire il job batch.
    • REGION: la regione in cui viene eseguito il tuo carico di lavoro.
    • YOUR_BUCKET: la posizione del bucket Cloud Storage in cui caricare le dipendenze del carico di lavoro. Il prefisso URI gs:// del bucket non è obbligatorio. Puoi specificare il percorso o il nome del bucket, ad esempio mybucketname1.

Passaggi successivi