在 BigQuery 工作台中将 BigQuery 元数据存储与 Spark 搭配使用
本文档介绍如何在 BigQuery 数据洞察中将 BigQuery 元数据存储区与 Spark 搭配使用。
您可以使用 BigQuery 工作室中的 Spark 在 BigQuery 工作室中使用 Apache Spark 创建 Iceberg 表。创建表后,您可以从 Spark 查询数据。您还可以使用 SQL 从 BigQuery 控制台中查询相同的数据。
准备工作
- 通过以下注册表单在 BigQuery 工作室中申请访问 Spark 的权限。
- 为 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery API 和 Dataproc API。
可选:了解 BigQuery 元数据库的工作原理以及为什么要使用它。
所需的角色
如需获得在 BigQuery 工作室中使用 Spark 笔记本所需的权限,请让管理员向您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery 工作室元数据库表:
BigQuery Data Editor (
roles/bigquery.dataEditor
) 项目角色 -
通过 Spark 中的笔记本元数据存储表创建 Spark 会话:
Dataproc Worker (
roles/dataproc.serverlessEditor
) 在用户账号上
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
连接到笔记本
以下示例展示了如何配置 Spark 笔记本以与存储在 BigQuery 元数据库中的 Iceberg 表进行交互。
在此示例中,您将设置 Spark 会话、创建命名空间和表、向表中添加一些数据,然后在 BigQuery 工作室中查询数据。
在 Apache Spark 笔记本中,添加必要的 Apache Spark 导入:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
定义目录、命名空间和仓库目录。
catalog = "
CATALOG_NAME " namespace = "NAMESPACE_NAME " warehouse_dir = "gs://WAREHOUSE_DIRECTORY "替换以下内容:
CATALOG_NAME
:用于引用 Spark 表的目录名称。NAMESPACE_NAME
:用于引用 Spark 表的命名空间标签。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。
初始化 Spark 会话。
session.environment_config.execution_config.network_uri = "
NETWORK_NAME " session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME "] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME .catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME .gcp_project"] = "PROJECT_ID " session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME .gcp_location"] = "LOCATION " session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME .warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())替换以下内容:
创建清单和命名空间。
spark.sql(f"USE `
CATALOG_NAME `;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME `;") spark.sql(f"USE `NAMESPACE_NAME `;")创建表。
spark.sql("CREATE OR REPLACE TABLE
TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBETABLE_NAME ;")替换以下内容:
TABLE_NAME
:Iceberg 表的名称。
通过 Spark 运行数据操纵语言 (DML)。
spark.sql("INSERT INTO
TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * fromTABLE_NAME ;") df.show()通过 Spark 运行数据定义语言 (DDL)。
spark.sql("ALTER TABLE
TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBETABLE_NAME ;")将数据插入到表中。
spark.sql("INSERT INTO
TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")从 Spark 查询该表。
df = spark.sql("SELECT * from
TABLE_NAME ;") df.show()通过 Google Cloud 控制台在新数据集中查询该表。
SELECT * FROM `
PROJECT_ID .NAMESPACE_NAME .TABLE_NAME ` LIMIT 100