将 BigQuery 元存储空间与 Spark 存储过程搭配使用

本文档介绍了如何将 Apache Spark 存储过程与 BigQuery Metastore 搭配使用。

准备工作

  1. 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery 和 Dataflow API。

    启用 API

  3. 可选:详细了解以下内容:

所需的角色

如需使用 Spark 存储过程,请查看存储过程所需的角色,并授予必要的角色。

如需获得使用 Spark 和存储过程(并将 BigQuery 元存储作为元数据存储)所需的权限,请让您的管理员为您授予以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建和运行存储过程

以下示例展示了如何使用 BigQuery Metastore 创建和运行存储过程。

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,为 CREATE PROCEDURE 语句添加以下示例代码。

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigQuery metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目的 ID。
    • BQ_DATASET_ID:BigQuery 中包含该过程的数据集的 ID。
    • PROCEDURE_NAME:您要创建或替换的流程的名称。
    • REGION:Spark 连接的位置。
    • LOCATION:BigQuery 资源的位置。
    • SPARK_CONNECTION_ID:您的 Spark 连接的 ID。
    • CATALOG_NAME:您使用的目录的名称。
    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹的 URI。
    • NAMESPACE_NAME:您使用的命名空间。

后续步骤