将 BigQuery Metastore 与 Spark 存储过程搭配使用
本文档介绍了如何将 Apache Spark 存储过程与 BigQuery Metastore 搭配使用。
准备工作
- 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery 和 Dataflow API。
可选:详细了解以下内容:
- 了解 BigQuery 元存储区的工作原理以及为何应使用它。
- 了解 BigQuery Spark 存储过程的运作方式,并在开始执行任务之前完成相应准备。
所需的角色
如需使用 Spark 存储过程,请查看存储过程所需的角色,并授予必要的角色。
如需获得使用 Spark 和存储过程(并将 BigQuery 元存储作为元数据存储)所需的权限,请让管理员为您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery Metastore 表:
-
项目中 Spark Connection 服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) 角色 -
项目中 Spark 连接服务账号的 Storage Object Admin (
roles/storage.objectAdmin
)
-
项目中 Spark Connection 服务账号的 BigQuery Data Editor (
-
在 BigQuery 中查询 BigQuery 元存储表:
-
项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) 角色 -
项目的 BigQuery User (
roles/bigquery.user
) -
项目的 Storage Object Viewer (
roles/storage.objectViewer
)
-
项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
创建和运行存储过程
以下示例展示了如何使用 BigQuery Metastore 创建和运行存储过程。
转到 BigQuery 页面。
在查询编辑器中,为
CREATE PROCEDURE
语句添加以下示例代码。CREATE OR REPLACE PROCEDURE `
PROJECT_ID .BQ_DATASET_ID .PROCEDURE_NAME `() WITH CONNECTION `PROJECT_ID .REGION .SPARK_CONNECTION_ID ` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME .warehouse", "WAREHOUSE_DIRECTORY "), ("spark.sql.catalog.CATALOG_NAME .gcp_location", "LOCATION "), ("spark.sql.catalog.CATALOG_NAME .gcp_project", "PROJECT_ID "), ("spark.sql.catalog.CATALOG_NAME ", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME .catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigQuery metastore Iceberg") \ .getOrCreate() spark.sql("USECATALOG_NAME ;") spark.sql("CREATE NAMESPACE IF NOT EXISTSNAMESPACE_NAME ;") spark.sql("USENAMESPACE_NAME ;") spark.sql("CREATE TABLETABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY '") spark.sql("DESCRIBETABLE_NAME ;") spark.sql("INSERT INTOTABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * fromTABLE_NAME ;") spark.sql("ALTER TABLETABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBETABLE_NAME ;") """; CALL `PROJECT_ID .BQ_DATASET_ID .PROCEDURE_NAME `();替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目的 ID。BQ_DATASET_ID
:BigQuery 中包含该过程的数据集的 ID。PROCEDURE_NAME
:您要创建或替换的过程的名称。REGION
:Spark 连接的位置。LOCATION
:BigQuery 资源的位置。SPARK_CONNECTION_ID
:您的 Spark 连接的 ID。CATALOG_NAME
:您使用的目录的名称。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹的 URI。NAMESPACE_NAME
:您使用的命名空间。