将 BigQuery 元存储区与 BigQuery 中的表搭配使用
本文档介绍了如何将 BigQuery 元存储区与 BigQuery 表和 Spark 搭配使用。
借助 BigQuery 元存储,您可以创建和使用 标准(内置)表、适用于 Apache Spark Iceberg 的 BigQuery 表,以及 BigQuery 中的 BigLake 外部表。
准备工作
- 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery 和 Dataproc API。
可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。
所需的角色
如需获得将 BigQuery 元存储库用作元数据存储库时使用 Spark 和 Dataproc 所需的权限,请让您的管理员为您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery 元存储表:
-
项目中 Dataproc Serverless 服务账号上的 Dataproc Worker (
roles/dataproc.worker
) -
项目中 Dataproc Serverless 服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) -
对项目中 Dataproc Serverless 服务账号的 Storage Object Admin (
roles/storage.objectAdmin
) 角色
-
项目中 Dataproc Serverless 服务账号上的 Dataproc Worker (
-
在 BigQuery 中查询 BigQuery 元存储表:
-
针对项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) 角色 -
针对项目的 BigQuery User (
roles/bigquery.user
) -
对项目使用 Storage Object Viewer (
roles/storage.objectViewer
)
-
针对项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
连接到表
在 Google Cloud 控制台中创建数据集。
CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;
替换以下内容:
PROJECT_ID
:用于创建数据集的 Google Cloud 项目的 ID。DATASET_NAME
:数据集的名称。
创建 Cloud 资源连接。
创建标准 BigQuery 表。
CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);
替换以下内容:
TABLE_NAME
:表格的名称。
将数据插入标准 BigQuery 表中。
INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
创建适用于 Apache Iceberg 的 BigQuery 表。
例如,如需创建表,请运行以下
CREATE
语句。CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME( name STRING,id INT64 ) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( file_format = 'PARQUET', table_format = 'ICEBERG', storage_uri = 'STORAGE_URI');
替换以下内容:
ICEBERG_TABLE_NAME
:Iceberg 表的名称。例如iceberg_managed_table
。CONNECTION_NAME
:连接的名称。您在上一步中创建了此文件。例如myproject.us.myconnection
。STORAGE_URI
:完全限定的 Cloud Storage URI。例如gs://mybucket/table
。
将数据插入 Apache Iceberg 的 BigQuery 表中。
INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
创建只读 Iceberg 表。
例如,如需创建只读 Iceberg 表,请运行以下
CREATE
语句。CREATE OR REPLACE EXTERNAL TABLE `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME WITH CONNECTION `CONNECTION_NAME` OPTIONS ( format = 'ICEBERG', uris = ['BUCKET_PATH'], require_partition_filter = FALSE);
替换以下内容:
READONLY_ICEBERG_TABLE_NAME
:只读表的名称。BUCKET_PATH
:包含外部表数据的 Cloud Storage 存储桶的路径,格式为['gs://bucket_name/[folder_name/]file_name']
。
在 PySpark 中,查询标准表、托管 Iceberg 表和只读 Iceberg 表。
from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.conf.set("viewsEnabled","true") # Use the bqms_catalog spark.sql("USE `CATALOG_NAME`;") spark.sql("USE NAMESPACE DATASET_NAME;") # Configure spark for temp results spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") # List the tables in the dataset df = spark.sql("SHOW TABLES;") df.show(); # Query a standard BigQuery table sql = """SELECT * FROM DATASET_NAME.TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Managed Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Readonly Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
替换以下内容:
WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹的 URI。CATALOG_NAME
:您使用的目录的名称。MATERIALIZATION_NAMESPACE
:用于存储临时结果的命名空间。
使用无服务器 Spark 运行 PySpark 脚本。
gcloud dataproc batches submit pyspark SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=YOUR_BUCKET \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
替换以下内容:
SCRIPT_PATH
:批量作业使用的脚本的路径。PROJECT_ID
:要用于运行批量作业的 Google Cloud 项目的 ID。REGION
:工作负载运行的区域。YOUR_BUCKET
:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的gs://
URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如mybucketname1
。