Usar a metastore do BigQuery com procedimentos armazenados do Spark
Este documento explica como usar os procedimentos armazenados do Apache Spark com a metastore do BigQuery.
Antes de começar
- Ative o faturamento do seu Google Cloud projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Ative as APIs BigQuery e Dataflow.
Opcional: saiba mais sobre o seguinte:
- Entenda como a metastore do BigQuery funciona e por que você deve usá-la.
- Saiba como os procedimentos armazenados do BigQuery Spark funcionam e conclua antes de iniciar as tarefas.
Funções exigidas
Para usar os procedimentos armazenados do Spark, revise os papéis necessários para procedimentos armazenados e conceda os papéis necessários.
Para receber as permissões necessárias para usar o Spark e os procedimentos armazenados com a metastore do BigQuery como uma loja de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Crie tabelas do Metastore do BigQuery no Spark:
-
Editor de dados do BigQuery (
roles/bigquery.dataEditor
) na conta de serviço do Spark Connection no projeto -
Administrador de objetos do Storage (
roles/storage.objectAdmin
) na conta de serviço do Spark Connection no projeto
-
Editor de dados do BigQuery (
-
Consultar tabelas de metastore do BigQuery no BigQuery:
-
Leitor de dados do BigQuery (
roles/bigquery.dataViewer
) no projeto -
Usuário do BigQuery (
roles/bigquery.user
) no projeto -
Leitor de objetos do Storage (
roles/storage.objectViewer
) no projeto
-
Leitor de dados do BigQuery (
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.
Criar e executar um procedimento armazenado
O exemplo a seguir mostra como criar e executar um procedimento armazenado com a metastore do BigQuery.
Acessar a página do BigQuery.
No editor de consultas, adicione o seguinte código de exemplo para a instrução
CREATE PROCEDURE
.CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigQuery metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
Substitua:
PROJECT_ID
: o ID do seu projeto Google Cloud .BQ_DATASET_ID
: o ID do conjunto de dados no BigQuery que contém o procedimento.PROCEDURE_NAME
: o nome do procedimento que você está criando ou substituindo.REGION
: o local da sua conexão do Spark.LOCATION
: o local dos seus recursos do BigQuery.SPARK_CONNECTION_ID
: o ID da sua conexão do Spark.CATALOG_NAME
: o nome do catálogo que você está usando.WAREHOUSE_DIRECTORY
: o URI da pasta do Cloud Storage que contém o data warehouse.NAMESPACE_NAME
: o namespace que você está usando.
A seguir
- Configure os recursos opcionais da metastore do BigQuery.
- Acesse e consulte tabelas do Spark no console do BigQuery.