将 BigQuery 元存储空间与 Dataproc Serverless 搭配使用

本文档介绍了如何将 BigQuery 元存储空间与 Dataproc Serverless 搭配使用。

准备工作

  1. 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery 和 Dataproc API。

    启用 API

  3. 可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。

所需的角色

如需获得使用 Spark 和 Dataproc Serverless 并将 BigQuery 元存储库用作元数据存储所需的权限,请让您的管理员为您授予以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

一般工作流程

如需将 BigQuery 与 Dataproc Serverless 搭配使用,请按照以下常规步骤操作:

  1. 创建一个文件,其中包含您要在 BigQuery 元存储区中运行的命令。
  2. 连接到您选择的开源软件引擎。
  3. 使用您选择的方法(例如 Spark SQL 或 PySpark)提交批量作业。

将 BigQuery Metastore 与 Spark 相关联

以下说明介绍了如何将 Dataproc Serverless 连接到 BigQuery Metastore:

SparkSQL

如需提交 Spark SQL 批量作业,请完成以下步骤。

  1. 使用您要在 BigQuery 元存储区中运行的 Spark SQL 命令创建一个 SQL 文件。例如,以下命令会创建一个命名空间和一个表。

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    替换以下内容:

    • CATALOG_NAME:引用 Spark 表的目录名称。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
    • TABLE_NAME:Spark 表的表名称。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
  2. 运行以下 gcloud dataproc batches submit spark-sql gcloud CLI 命令,提交 Spark SQL 批量作业:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    替换以下内容:

    • SQL_SCRIPT_PATH:批量作业使用的 SQL 文件的路径。
    • PROJECT_ID:要用于运行批量作业的 Google Cloud 项目的 ID。
    • REGION:工作负载运行的区域。
    • SUBNET_NAME(可选):REGION 中已启用Google 专用访问且符合其他会话子网要求的 VPC 子网的名称。
    • LOCATION:运行批量作业的位置。
    • BUCKET_PATH:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。WAREHOUSE_FOLDER 位于此存储桶中。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1

    如需详细了解如何提交 Spark 批处理作业,请参阅运行 Spark 批处理工作负载

PySpark

如需提交 PySpark 批量作业,请完成以下步骤。

  1. 创建一个 Python 文件,其中包含您要在 BigQuery 元存储区中运行的 PySpark 命令。

    例如,以下命令会设置一个 Spark 环境,以与存储在 BigQuery Metastore 中的 Iceberg 表进行交互。然后,该命令会在该命名空间中创建一个新命名空间和一个 Iceberg 表。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    替换以下内容:

    • PROJECT_ID:要用于运行批量作业的 Google Cloud 项目的 ID。
    • LOCATION:BigQuery 资源所在的位置
    • CATALOG_NAME:引用 Spark 表的目录名称。
    • TABLE_NAME:Spark 表的表名称。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
  2. 使用以下 gcloud dataproc batches submit pyspark 命令提交批量作业。

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    替换以下内容:

    • PYTHON_SCRIPT_PATH:批量作业使用的 Python 脚本的路径。
    • PROJECT_ID:要用于运行批量作业的 Google Cloud 项目的 ID。
    • REGION:工作负载运行的区域。
    • BUCKET_PATH:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1

    如需详细了解如何提交 PySpark 批处理作业,请参阅 PySpark gcloud 参考文档

后续步骤