Usar a metastore do BigQuery com o Spark no BigQuery Studio

Este documento explica como usar a metastore do BigQuery com o Spark no BigQuery Studio.

Você pode usar o Spark no BigQuery Studio para criar uma tabela Iceberg com o Apache Spark no BigQuery Studio. Depois de criar a tabela, você pode consultar os dados do Spark. Também é possível consultar os mesmos dados no console do BigQuery usando SQL.

Antes de começar

  1. Solicite acesso ao Spark no BigQuery Studio pelo formulário de inscrição abaixo.
  2. Ative o faturamento do seu Google Cloud projeto. Saiba como verificar se o faturamento está ativado em um projeto.
  3. Ative as APIs BigQuery e Dataflow.

    Ativar as APIs

  4. Opcional: entenda como a metastore do BigQuery funciona e por que você deve usá-la.

Funções exigidas

Para receber as permissões necessárias para usar os notebooks do Spark no BigQuery Studio, peça ao administrador para conceder a você os seguintes papéis do IAM:

  • Crie tabelas de metastore do BigQuery Studio no Spark: Editor de dados do BigQuery (roles/bigquery.dataEditor) no projeto
  • Crie uma sessão do Spark usando as tabelas do metastore do notebook no Spark: Dataproc Worker (roles/dataproc.serverlessEditor) na conta de usuário

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Conectar com um notebook

O exemplo a seguir mostra como configurar um notebook do Spark para interagir com tabelas do Iceberg armazenadas na metastore do BigQuery.

Neste exemplo, você configura uma sessão do Spark, cria um namespace e uma tabela, adiciona alguns dados à tabela e consulta os dados no BigQuery Studio.

  1. Crie um notebook do Spark no BigQuery Studio.

  2. No notebook do Apache Spark, inclua as importações necessárias do Apache Spark:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. Defina um catálogo, um namespace e um diretório de repositório.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Substitua:

    • CATALOG_NAME: um nome de catálogo para referenciar sua tabela do Spark.
    • NAMESPACE_NAME: um rótulo de namespace para referenciar sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse está armazenado.
  4. Inicialize uma sessão do Spark.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    Substitua:

    • NETWORK_NAME: o nome ou URI da rede que executa o código do Spark. Se não for especificado, a rede default será usada.
    • PROJECT_ID: o ID do projeto Google Cloud que está executando o código do Spark.
    • LOCATION: o local em que o job do Spark será executado.
  5. Crie um catálogo e um namespace.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Crie uma tabela.

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Substitua:

    • TABLE_NAME: um nome para a tabela Iceberg.
  7. Execute uma linguagem de manipulação de dados (DML) no Spark.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Execute uma linguagem de definição de dados (DDL) no Spark.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Inserir dados na tabela.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Consulte a tabela no Spark.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Faça uma consulta na tabela do console do Google Cloud em um novo conjunto de dados.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

A seguir