将 BigQuery 元存储空间与 Spark 存储过程搭配使用
本文档介绍了如何将 Apache Spark 存储过程与 BigQuery Metastore 搭配使用。
准备工作
- 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery 和 Dataflow API。
可选:详细了解以下内容:
- 了解 BigQuery Metastore 的运作方式以及为何应使用它。
- 了解 BigQuery Spark 存储过程的运作方式,并在开始执行任务之前完成相应操作。
所需的角色
如需使用 Spark 存储过程,请查看存储过程所需的角色,并授予必要的角色。
如需获得使用 Spark 和存储过程(并将 BigQuery 元存储作为元数据存储)所需的权限,请让您的管理员为您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery 元存储表:
-
项目中 Spark Connection 服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) -
对项目中的 Spark Connection 服务账号的 Storage Object Admin (
roles/storage.objectAdmin
)
-
项目中 Spark Connection 服务账号的 BigQuery Data Editor (
-
在 BigQuery 中查询 BigQuery 元存储表:
-
针对项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) 角色 -
针对项目的 BigQuery User (
roles/bigquery.user
) -
对项目使用 Storage Object Viewer (
roles/storage.objectViewer
)
-
针对项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
创建和运行存储过程
以下示例展示了如何使用 BigQuery Metastore 创建和运行存储过程。
转到 BigQuery 页面。
在查询编辑器中,为
CREATE PROCEDURE
语句添加以下示例代码。CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigQuery metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目的 ID。BQ_DATASET_ID
:BigQuery 中包含该过程的数据集的 ID。PROCEDURE_NAME
:您要创建或替换的流程的名称。REGION
:Spark 连接的位置。LOCATION
:BigQuery 资源的位置。SPARK_CONNECTION_ID
:您的 Spark 连接的 ID。CATALOG_NAME
:您使用的目录的名称。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹的 URI。NAMESPACE_NAME
:您使用的命名空间。