在 BigQuery Studio 中将 BigQuery 元存储区与 Spark 搭配使用

本文档介绍了如何在 BigQuery Studio 中将 BigQuery 元存储库与 Spark 搭配使用。

您可以在 BigQuery 工作室中使用 Spark 通过 Apache Spark 创建 Iceberg 表。创建表后,您可以从 Spark 查询数据。您还可以使用 SQL 从 BigQuery 控制台中查询相同的数据。

准备工作

  1. 通过以下注册表单申请在 BigQuery Studio 中使用 Spark。
  2. 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  3. 启用 BigQuery 和 Dataflow API。

    启用 API

  4. 可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。

所需的角色

如需获得在 BigQuery Studio 中使用 Spark 笔记本所需的权限,请让您的管理员为您授予以下 IAM 角色:

  • 在 Spark 中创建 BigQuery Studio 元存储表: 项目的 BigQuery Data Editor (roles/bigquery.dataEditor)
  • 通过 Spark 中的笔记本元数据存储表创建 Spark 会话: 用户账号中的 Dataproc Worker (roles/dataproc.serverlessEditor)

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

连接到笔记本

以下示例展示了如何配置 Spark 笔记本以与存储在 BigQuery Metastore 中的 Iceberg 表进行交互。

在本例中,您将设置 Spark 会话、创建命名空间和表、向表中添加一些数据,然后在 BigQuery Studio 中查询数据。

  1. 在 BigQuery Studio 中创建 Spark 笔记本

  2. 在 Apache Spark 笔记本中,添加必要的 Apache Spark 导入:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. 定义目录、命名空间和仓库目录。

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    替换以下内容:

    • CATALOG_NAME:用于引用 Spark 表的目录名称。
    • NAMESPACE_NAME:用于引用 Spark 表的命名空间标签。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
  4. 初始化 Spark 会话。

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    替换以下内容:

    • NETWORK_NAME:运行 Spark 代码的网络的名称或 URI。如果未指定,则使用 default 网络。
    • PROJECT_ID:运行 Spark 代码的 Google Cloud 项目的 ID。
    • LOCATION:运行 Spark 作业的位置
  5. 创建目录和命名空间。

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. 创建表。

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    替换以下内容:

    • TABLE_NAME:Iceberg 表的名称。
  7. 从 Spark 运行数据操纵语言 (DML)。

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. 从 Spark 运行数据定义语言 (DDL)。

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. 将数据插入表中。

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. 从 Spark 查询表。

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. 在新的 Google Cloud 控制台中,查询新数据集中的表。

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

后续步骤