Usar a metastore do BigQuery com o Spark no BigQuery Studio
Este documento explica como usar a metastore do BigQuery com o Spark no BigQuery Studio.
Você pode usar o Spark no BigQuery Studio para criar uma tabela Iceberg com o Apache Spark no BigQuery Studio. Depois de criar a tabela, você pode consultar os dados do Spark. Também é possível consultar os mesmos dados no console do BigQuery usando SQL.
Antes de começar
- Solicite acesso ao Spark no BigQuery Studio pelo formulário de inscrição abaixo.
- Ative o faturamento do seu Google Cloud projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Ative as APIs BigQuery e Dataflow.
Opcional: entenda como a metastore do BigQuery funciona e por que você deve usá-la.
Funções exigidas
Para receber as permissões necessárias para usar os notebooks do Spark no BigQuery Studio, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Crie tabelas de metastore do BigQuery Studio no Spark:
Editor de dados do BigQuery (
roles/bigquery.dataEditor
) no projeto -
Crie uma sessão do Spark usando as tabelas do metastore do notebook no Spark:
Dataproc Worker (
roles/dataproc.serverlessEditor
) na conta de usuário
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.
Conectar com um notebook
O exemplo a seguir mostra como configurar um notebook do Spark para interagir com tabelas do Iceberg armazenadas na metastore do BigQuery.
Neste exemplo, você configura uma sessão do Spark, cria um namespace e uma tabela, adiciona alguns dados à tabela e consulta os dados no BigQuery Studio.
Crie um notebook do Spark no BigQuery Studio.
No notebook do Apache Spark, inclua as importações necessárias do Apache Spark:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
Defina um catálogo, um namespace e um diretório de repositório.
catalog = "CATALOG_NAME" namespace = "NAMESPACE_NAME" warehouse_dir = "gs://WAREHOUSE_DIRECTORY"
Substitua:
CATALOG_NAME
: um nome de catálogo para referenciar sua tabela do Spark.NAMESPACE_NAME
: um rótulo de namespace para referenciar sua tabela do Spark.WAREHOUSE_DIRECTORY
: o URI da pasta do Cloud Storage em que o data warehouse está armazenado.
Inicialize uma sessão do Spark.
session.environment_config.execution_config.network_uri = NETWORK_NAME session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())
Substitua:
NETWORK_NAME
: o nome ou URI da rede que executa o código do Spark. Se não for especificado, a rededefault
será usada.PROJECT_ID
: o ID do projeto Google Cloud que está executando o código do Spark.LOCATION
: o local em que o job do Spark será executado.
Crie um catálogo e um namespace.
spark.sql(f"USE `CATALOG_NAME`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;") spark.sql(f"USE `NAMESPACE_NAME`;")
Crie uma tabela.
spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME ;")
Substitua:
TABLE_NAME
: um nome para a tabela Iceberg.
Execute uma linguagem de manipulação de dados (DML) no Spark.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Execute uma linguagem de definição de dados (DDL) no Spark.
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBE TABLE_NAME ;")
Inserir dados na tabela.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")
Consulte a tabela no Spark.
df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Faça uma consulta na tabela do console do Google Cloud em um novo conjunto de dados.
SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100
A seguir
- Configure os recursos opcionais da metastore do BigQuery.
- Consultar tabelas de metastore no BigQuery.