在 BigQuery Studio 中将 BigQuery 元存储区与 Spark 搭配使用
本文档介绍了如何在 BigQuery Studio 中将 BigQuery 元存储库与 Spark 搭配使用。
您可以在 BigQuery 工作室中使用 Spark 通过 Apache Spark 创建 Iceberg 表。创建表后,您可以从 Spark 查询数据。您还可以使用 SQL 从 BigQuery 控制台中查询相同的数据。
准备工作
- 通过以下注册表单申请在 BigQuery Studio 中使用 Spark。
- 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery 和 Dataflow API。
可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。
所需的角色
如需获得在 BigQuery Studio 中使用 Spark 笔记本所需的权限,请让您的管理员为您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery Studio 元存储表:
项目的 BigQuery Data Editor (
roles/bigquery.dataEditor
) -
通过 Spark 中的笔记本元数据存储表创建 Spark 会话:
用户账号中的 Dataproc Worker (
roles/dataproc.serverlessEditor
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
连接到笔记本
以下示例展示了如何配置 Spark 笔记本以与存储在 BigQuery Metastore 中的 Iceberg 表进行交互。
在本例中,您将设置 Spark 会话、创建命名空间和表、向表中添加一些数据,然后在 BigQuery Studio 中查询数据。
在 Apache Spark 笔记本中,添加必要的 Apache Spark 导入:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
定义目录、命名空间和仓库目录。
catalog = "CATALOG_NAME" namespace = "NAMESPACE_NAME" warehouse_dir = "gs://WAREHOUSE_DIRECTORY"
替换以下内容:
CATALOG_NAME
:用于引用 Spark 表的目录名称。NAMESPACE_NAME
:用于引用 Spark 表的命名空间标签。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。
初始化 Spark 会话。
session.environment_config.execution_config.network_uri = NETWORK_NAME session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())
替换以下内容:
创建目录和命名空间。
spark.sql(f"USE `CATALOG_NAME`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;") spark.sql(f"USE `NAMESPACE_NAME`;")
创建表。
spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME ;")
替换以下内容:
TABLE_NAME
:Iceberg 表的名称。
从 Spark 运行数据操纵语言 (DML)。
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
从 Spark 运行数据定义语言 (DDL)。
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBE TABLE_NAME ;")
将数据插入表中。
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")
从 Spark 查询表。
df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
在新的 Google Cloud 控制台中,查询新数据集中的表。
SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100