在 BigQuery 工作台中将 BigQuery 元数据存储与 Spark 搭配使用
本文档介绍如何在 BigQuery 数据洞察中将 BigQuery 元数据存储区与 Spark 搭配使用。
您可以使用 BigQuery 工作室中的 Spark 在 BigQuery 工作室中使用 Apache Spark 创建 Iceberg 表。创建表后,您可以从 Spark 查询数据。您还可以使用 SQL 从 BigQuery 控制台中查询相同的数据。
准备工作
- 通过以下注册表单在 BigQuery 工作室中申请访问 Spark 的权限。
- 为 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery API 和 Dataproc API。
可选:了解 BigQuery 元数据库的工作原理以及为什么要使用它。
所需的角色
如需获得在 BigQuery 工作室中使用 Spark 笔记本所需的权限,请让管理员向您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery 工作室元数据库表:
BigQuery Data Editor (
roles/bigquery.dataEditor
) 项目角色 -
通过 Spark 中的笔记本元数据存储表创建 Spark 会话:
Dataproc Worker (
roles/dataproc.serverlessEditor
) 在用户账号上
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
连接到笔记本
以下示例展示了如何配置 Spark 笔记本以与存储在 BigQuery 元数据库中的 Iceberg 表进行交互。
在此示例中,您将设置 Spark 会话、创建命名空间和表、向表中添加一些数据,然后在 BigQuery 工作室中查询数据。
在 Apache Spark 笔记本中,添加必要的 Apache Spark 导入:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
定义目录、命名空间和仓库目录。
catalog = "CATALOG_NAME" namespace = "NAMESPACE_NAME" warehouse_dir = "gs://WAREHOUSE_DIRECTORY"
替换以下内容:
CATALOG_NAME
:用于引用 Spark 表的目录名称。NAMESPACE_NAME
:用于引用 Spark 表的命名空间标签。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。
初始化 Spark 会话。
session.environment_config.execution_config.network_uri = "NETWORK_NAME" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())
替换以下内容:
创建清单和命名空间。
spark.sql(f"USE `CATALOG_NAME`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;") spark.sql(f"USE `NAMESPACE_NAME`;")
创建表。
spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME ;")
替换以下内容:
TABLE_NAME
:Iceberg 表的名称。
通过 Spark 运行数据操纵语言 (DML)。
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
通过 Spark 运行数据定义语言 (DDL)。
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBE TABLE_NAME ;")
将数据插入到表中。
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")
从 Spark 查询该表。
df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
通过 Google Cloud 控制台在新数据集中查询该表。
SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100