将 BigQuery 元数据存储与 BigQuery 中的表搭配使用

本文档介绍如何将 BigQuery 元数据库与 BigQuery 表和 Spark 搭配使用。

借助 BigQuery Metastore,您可以创建和使用标准(内置)表适用于 Apache Iceberg 的 BigQuery 表以及来自 BigQuery 的 BigLake 外部表

准备工作

  1. 为 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery API 和 Dataproc API。

    启用 API

  3. 可选:了解 BigQuery 元数据库的工作原理以及为什么要使用它。

  4. 可选:如果您使用的是 Iceberg,请使用以下命令将 BigQuery 元数据库 Iceberg 表与 BigQuery 中的其他类似表类型进行比较:

标准 BigQuery 表 BigLake 外部表 适用于 Apache Iceberg 的 BigLake 外部表(也称为 BigLake Iceberg 表) BigQuery metastore Iceberg 表预览版 适用于 Apache Iceberg 的 BigQuery 表(也称为 Iceberg 托管式表 / BigQuery Iceberg 表)(预览版
主要特性 全托管式体验 受管控(精细的访问权限控制)且跨开源和 BigQuery 引擎正常运行 BigLake 外部表功能 + 数据一致性、架构更新。无法通过 Spark 或其他开放引擎创建。 BigLake Iceberg 表功能 + 可通过外部引擎更改。无法使用 DDL 或 bq 命令行工具创建。 BigLake Iceberg 表功能 + 开放数据和元数据的管理开销较低
数据存储 BigQuery 代管式存储空间 托管在用户管理的存储桶中的开放格式数据
开放模型 BigQuery Storage Read API(通过连接器) 开放文件格式 (Parquet) 开放库 (Iceberg) 开源兼容(Iceberg metadata 快照)
治理 统一的 BigQuery 治理
写入(DML 和流式处理) 通过 BigQuery 连接器、API、高吞吐量 DML、CDC 仅通过外部引擎写入 通过 BigQuery 连接器、API、高吞吐量 DML、CDC

所需的角色

如需获得将 Spark 和 Dataproc 与 BigQuery Metastore 搭配使用作为元数据存储库所需的权限,请让管理员向您授予以下 IAM 角色:

  • 在 Spark 中创建 BigQuery 元数据表:
    • 针对项目中 Dataproc Serverless 服务账号的 Dataproc Worker (roles/dataproc.worker)
    • 针对项目中 Dataproc Serverless 服务账号的 BigQuery Data Editor (roles/bigquery.dataEditor)
    • 针对项目中 Dataproc Serverless 服务账号的 Storage Object Admin (roles/storage.objectAdmin)
  • 在 BigQuery 中查询 BigQuery 元数据存储表:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

连接到表

  1. 在 Google Cloud 控制台中创建数据集

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    替换以下内容:

    • PROJECT_ID:要创建数据集的 Google Cloud 项目的 ID。
    • DATASET_NAME:数据集的名称。
  2. 创建 Cloud 资源连接

  3. 创建标准 BigQuery 表。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    替换以下内容:

    • TABLE_NAME:表的名称。
  4. 将数据插入到标准 BigQuery 表中。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. 为 Apache Iceberg 创建 BigQuery 表

    例如,如需创建表,请运行以下 CREATE 语句。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    替换以下内容:

    • ICEBERG_TABLE_NAME:Iceberg 表的名称,例如 iceberg_managed_table
    • CONNECTION_NAME:您的连接的名称。您在上一步中创建了此连接。例如 myproject.us.myconnection
    • STORAGE_URI:完全限定的 Cloud Storage URI。例如 gs://mybucket/table
  6. 将数据插入 Apache Iceberg 的 BigQuery 表中。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. 创建只读 Iceberg 表

    例如,如需创建只读 Iceberg 表,请运行以下 CREATE 语句。

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    替换以下内容:

    • READONLY_ICEBERG_TABLE_NAME:只读表的名称。
    • BUCKET_PATH:包含外部表数据的 Cloud Storage 存储桶的路径,格式为 ['gs://bucket_name/[folder_name/]file_name']
  8. 通过 PySpark 查询标准表、受管 Iceberg 表和只读 Iceberg 表。

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    替换以下内容:

    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹的 URI。
    • CATALOG_NAME:您要使用的目录的名称。
    • MATERIALIZATION_NAMESPACE:用于存储临时结果的命名空间。
  9. 使用 Serverless Spark 运行 PySpark 脚本。

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar

    替换以下内容:

    • SCRIPT_PATH:批量作业使用的脚本的路径。
    • PROJECT_ID:要运行批量作业的 Google Cloud 项目的 ID。
    • REGION:工作负载运行所在的区域。
    • YOUR_BUCKET:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1

后续步骤