Utilizzare il metastore BigQuery con Spark in BigQuery Studio

Questo documento spiega come utilizzare il metastore BigQuery con Spark in BigQuery Studio.

Puoi utilizzare Spark in BigQuery Studio per creare una tabella Iceberg con Apache Spark in BigQuery Studio. Dopo aver creato la tabella, puoi eseguire query sui dati da Spark. Puoi anche eseguire query sugli stessi dati dalla console BigQuery utilizzando SQL.

Prima di iniziare

  1. Richiedi l'accesso a Spark in BigQuery Studio tramite il seguente modulo di registrazione.
  2. Attiva la fatturazione per il tuo progetto Google Cloud . Scopri come controllare se la fatturazione è attivata in un progetto.
  3. Abilita le API BigQuery e Dataflow.

    Abilita le API

  4. (Facoltativo) Scopri come funziona il metastore BigQuery e perché dovresti utilizzarlo.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per utilizzare i notebook Spark in BigQuery Studio, chiedi all'amministratore di concederti i seguenti ruoli IAM:

  • Crea le tabelle del metastore di BigQuery Studio in Spark: Editor dati BigQuery (roles/bigquery.dataEditor) nel progetto
  • Crea una sessione Spark dalle tabelle del metastore del notebook in Spark: Dataproc Worker (roles/dataproc.serverlessEditor) nell'account utente

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Connettiti a un notebook

L'esempio seguente mostra come configurare un notebook Spark per interagire con le tabelle Iceberg archiviate nel metastore BigQuery.

In questo esempio, configuri una sessione Spark, crei un ambito e una tabella, aggiungi alcuni dati alla tabella e poi esegui una query sui dati in BigQuery Studio.

  1. Crea un notebook Spark in BigQuery Studio.

  2. Nel notebook Apache Spark, includi le importazioni di Apache Spark necessarie:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. Definisci un catalogo, uno spazio dei nomi e una directory del magazzino.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Sostituisci quanto segue:

    • CATALOG_NAME: un nome di catalogo a cui fare riferimento per la tabella Spark.
    • NAMESPACE_NAME: un'etichetta dello spazio dei nomi per fare riferimento alla tabella Spark.
    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.
  4. Inizializza una sessione Spark.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    Sostituisci quanto segue:

    • NETWORK_NAME: il nome o l'URI della rete che esegue il codice Spark. Se non specificato, viene utilizzata la rete default.
    • PROJECT_ID: l'ID del progetto Google Cloud in cui viene eseguito il codice Spark.
    • LOCATION: la posizione in cui eseguire il job Spark.
  5. Crea un catalogo e uno spazio dei nomi.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Creare una tabella.

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Sostituisci quanto segue:

    • TABLE_NAME: un nome per la tabella Iceberg.
  7. Esegui un linguaggio di manipolazione dei dati (DML) da Spark.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Esegui un Data Definition Language (DDL) da Spark.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Inserisci i dati nella tabella.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Esegui una query sulla tabella da Spark.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Esegui una query sulla tabella dalla console Google Cloud in un nuovo set di dati.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

Passaggi successivi