将 BigQuery 元数据存储与 BigQuery 中的表搭配使用
本文档介绍如何将 BigQuery 元数据库与 BigQuery 表和 Spark 搭配使用。
借助 BigQuery Metastore,您可以创建和使用标准(内置)表、适用于 Apache Iceberg 的 BigQuery 表以及来自 BigQuery 的 BigLake 外部表。
准备工作
- 为 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery API 和 Dataproc API。
可选:了解 BigQuery 元数据库的工作原理以及为什么要使用它。
可选:如果您使用的是 Iceberg,请使用以下命令将 BigQuery 元数据库 Iceberg 表与 BigQuery 中的其他类似表类型进行比较:
标准 BigQuery 表 | BigLake 外部表 | 适用于 Apache Iceberg 的 BigLake 外部表(也称为 BigLake Iceberg 表) | BigQuery metastore Iceberg 表(预览版) | 适用于 Apache Iceberg 的 BigQuery 表(也称为 Iceberg 托管式表 / BigQuery Iceberg 表)(预览版) | |
---|---|---|---|---|---|
主要特性 | 全托管式体验 | 受管控(精细的访问权限控制)且跨开源和 BigQuery 引擎正常运行 | BigLake 外部表功能 + 数据一致性、架构更新。无法通过 Spark 或其他开放引擎创建。 | BigLake Iceberg 表功能 + 可通过外部引擎更改。无法使用 DDL 或 bq 命令行工具创建。 | BigLake Iceberg 表功能 + 开放数据和元数据的管理开销较低 |
数据存储 | BigQuery 代管式存储空间 | 托管在用户管理的存储桶中的开放格式数据 | |||
开放模型 | BigQuery Storage Read API(通过连接器) | 开放文件格式 (Parquet) | 开放库 (Iceberg) | 开源兼容(Iceberg metadata 快照) | |
治理 | 统一的 BigQuery 治理 | ||||
写入(DML 和流式处理) | 通过 BigQuery 连接器、API、高吞吐量 DML、CDC | 仅通过外部引擎写入 | 通过 BigQuery 连接器、API、高吞吐量 DML、CDC |
所需的角色
如需获得将 Spark 和 Dataproc 与 BigQuery Metastore 搭配使用作为元数据存储库所需的权限,请让管理员向您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery 元数据表:
-
针对项目中 Dataproc Serverless 服务账号的 Dataproc Worker (
roles/dataproc.worker
) -
针对项目中 Dataproc Serverless 服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) -
针对项目中 Dataproc Serverless 服务账号的 Storage Object Admin (
roles/storage.objectAdmin
)
-
针对项目中 Dataproc Serverless 服务账号的 Dataproc Worker (
-
在 BigQuery 中查询 BigQuery 元数据存储表:
-
针对项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) -
针对项目的 BigQuery User (
roles/bigquery.user
) -
针对项目的 Storage Object Viewer (
roles/storage.objectViewer
)
-
针对项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
连接到表
在 Google Cloud 控制台中创建数据集。
CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;
替换以下内容:
PROJECT_ID
:要创建数据集的 Google Cloud 项目的 ID。DATASET_NAME
:数据集的名称。
创建 Cloud 资源连接。
创建标准 BigQuery 表。
CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);
替换以下内容:
TABLE_NAME
:表的名称。
将数据插入到标准 BigQuery 表中。
INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
为 Apache Iceberg 创建 BigQuery 表。
例如,如需创建表,请运行以下
CREATE
语句。CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME( name STRING,id INT64 ) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( file_format = 'PARQUET', table_format = 'ICEBERG', storage_uri = 'STORAGE_URI');
替换以下内容:
ICEBERG_TABLE_NAME
:Iceberg 表的名称,例如iceberg_managed_table
。CONNECTION_NAME
:您的连接的名称。您在上一步中创建了此连接。例如myproject.us.myconnection
。STORAGE_URI
:完全限定的 Cloud Storage URI。例如gs://mybucket/table
。
将数据插入 Apache Iceberg 的 BigQuery 表中。
INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
创建只读 Iceberg 表。
例如,如需创建只读 Iceberg 表,请运行以下
CREATE
语句。CREATE OR REPLACE EXTERNAL TABLE `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME WITH CONNECTION `CONNECTION_NAME` OPTIONS ( format = 'ICEBERG', uris = ['BUCKET_PATH'], require_partition_filter = FALSE);
替换以下内容:
READONLY_ICEBERG_TABLE_NAME
:只读表的名称。BUCKET_PATH
:包含外部表数据的 Cloud Storage 存储桶的路径,格式为['gs://bucket_name/[folder_name/]file_name']
。
通过 PySpark 查询标准表、受管 Iceberg 表和只读 Iceberg 表。
from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.conf.set("viewsEnabled","true") # Use the bqms_catalog spark.sql("USE `CATALOG_NAME`;") spark.sql("USE NAMESPACE DATASET_NAME;") # Configure spark for temp results spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") # List the tables in the dataset df = spark.sql("SHOW TABLES;") df.show(); # Query a standard BigQuery table sql = """SELECT * FROM DATASET_NAME.TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Managed Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Readonly Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
替换以下内容:
WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹的 URI。CATALOG_NAME
:您要使用的目录的名称。MATERIALIZATION_NAMESPACE
:用于存储临时结果的命名空间。
使用 Serverless Spark 运行 PySpark 脚本。
gcloud dataproc batches submit pyspark SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=YOUR_BUCKET \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar
替换以下内容:
SCRIPT_PATH
:批量作业使用的脚本的路径。PROJECT_ID
:要运行批量作业的 Google Cloud 项目的 ID。REGION
:工作负载运行所在的区域。YOUR_BUCKET
:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的gs://
URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如mybucketname1
。