将 BigQuery 元存储区与 BigQuery 中的表搭配使用

本文档介绍了如何将 BigQuery 元存储区与 BigQuery 表和 Spark 搭配使用。

借助 BigQuery 元存储,您可以创建和使用 标准(内置)表适用于 Apache Spark Iceberg 的 BigQuery 表,以及 BigQuery 中的 BigLake 外部表

准备工作

  1. 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery 和 Dataproc API。

    启用 API

  3. 可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。

所需的角色

如需获得将 BigQuery 元存储库用作元数据存储库时使用 Spark 和 Dataproc 所需的权限,请让您的管理员为您授予以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

连接到表

  1. 在 Google Cloud 控制台中创建数据集

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    替换以下内容:

    • PROJECT_ID:用于创建数据集的 Google Cloud 项目的 ID。
    • DATASET_NAME:数据集的名称。
  2. 创建 Cloud 资源连接

  3. 创建标准 BigQuery 表。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    替换以下内容:

    • TABLE_NAME:表格的名称。
  4. 将数据插入标准 BigQuery 表中。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. 创建适用于 Apache Iceberg 的 BigQuery 表

    例如,如需创建表,请运行以下 CREATE 语句。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    替换以下内容:

    • ICEBERG_TABLE_NAME:Iceberg 表的名称。例如 iceberg_managed_table
    • CONNECTION_NAME:连接的名称。您在上一步中创建了此文件。例如 myproject.us.myconnection
    • STORAGE_URI:完全限定的 Cloud Storage URI。例如 gs://mybucket/table
  6. 将数据插入 Apache Iceberg 的 BigQuery 表中。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. 创建只读 Iceberg 表

    例如,如需创建只读 Iceberg 表,请运行以下 CREATE 语句。

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    替换以下内容:

    • READONLY_ICEBERG_TABLE_NAME:只读表的名称。
    • BUCKET_PATH:包含外部表数据的 Cloud Storage 存储桶的路径,格式为 ['gs://bucket_name/[folder_name/]file_name']
  8. 在 PySpark 中,查询标准表、托管 Iceberg 表和只读 Iceberg 表。

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    替换以下内容:

    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹的 URI。
    • CATALOG_NAME:您使用的目录的名称。
    • MATERIALIZATION_NAMESPACE:用于存储临时结果的命名空间。
  9. 使用无服务器 Spark 运行 PySpark 脚本。

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    替换以下内容:

    • SCRIPT_PATH:批量作业使用的脚本的路径。
    • PROJECT_ID:要用于运行批量作业的 Google Cloud 项目的 ID。
    • REGION:工作负载运行的区域。
    • YOUR_BUCKET:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1

后续步骤