将 BigQuery Metastore 与 Dataproc Serverless 搭配使用
本文档介绍了如何将 BigQuery 元存储空间与 Dataproc Serverless 搭配使用。
准备工作
- 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery 和 Dataproc API。
可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。
所需的角色
如需获得将 BigQuery 元存储库用作元数据存储库时使用 Spark 和 Dataproc Serverless 所需的权限,请让管理员为您授予以下 IAM 角色:
-
在 Spark 中创建 BigQuery Metastore 表:
-
项目中 Dataproc Serverless 服务账号的 Dataproc Worker (
roles/dataproc.worker
) -
项目中 Dataproc Serverless 服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) 角色 -
针对项目中的 Dataproc Serverless 服务账号的 Storage Object Admin (
roles/storage.objectAdmin
)
-
项目中 Dataproc Serverless 服务账号的 Dataproc Worker (
-
在 BigQuery 中查询 BigQuery 元存储表:
-
项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) 角色 -
项目的 BigQuery User (
roles/bigquery.user
) -
项目的 Storage Object Viewer (
roles/storage.objectViewer
)
-
项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
常规工作流程
如需将 BigQuery 与 Dataproc Serverless 搭配使用,请按以下一般步骤操作:
- 创建一个文件,其中包含您要在 BigQuery Metastore 中运行的命令。
- 连接到您选择的开源软件引擎。
- 使用您选择的方法(例如 Spark SQL 或 PySpark)提交批量作业。
将 BigQuery Metastore 与 Spark 连接
以下说明介绍了如何将 Dataproc Serverless 连接到 BigQuery Metastore:
如需提交 Spark SQL 批量作业,请完成以下步骤。
使用您要在 BigQuery 元存储区中运行的 Spark SQL 命令创建一个 SQL 文件。例如,以下命令会创建一个命名空间和一个表。
CREATE NAMESPACE `
CATALOG_NAME `.NAMESPACE_NAME ; CREATE TABLE `CATALOG_NAME `.NAMESPACE_NAME .TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY ';替换以下内容:
CATALOG_NAME
:引用 Spark 表的目录名称。NAMESPACE_NAME
:引用 Spark 表的命名空间名称。TABLE_NAME
:Spark 表的表名称。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。
运行以下
gcloud dataproc batches submit spark-sql
gcloud CLI 命令,提交 Spark SQL 批量作业:gcloud dataproc batches submit spark-sql
SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID /regions/REGION /subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME =org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME .catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME .gcp_project=PROJECT_ID ,spark.sql.catalog.CATALOG_NAME .gcp_location=LOCATION ,spark.sql.catalog.CATALOG_NAME .warehouse=WAREHOUSE_DIRECTORY "替换以下内容:
SQL_SCRIPT_PATH
:批量作业使用的 SQL 文件的路径。PROJECT_ID
:要运行批量作业的 Google Cloud 项目的 ID。REGION
:工作负载运行所在的区域。SUBNET_NAME
(可选):REGION
中已启用专用 Google 访问通道且符合其他会话子网要求的 VPC 子网的名称。LOCATION
:批量作业的运行位置。BUCKET_PATH
:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。WAREHOUSE_FOLDER
位于此存储桶中。存储桶的gs://
URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如mybucketname1
。
如需详细了解如何提交 Spark 批处理作业,请参阅运行 Spark 批处理工作负载。
如需提交 PySpark 批量作业,请完成以下步骤。
创建一个 Python 文件,其中包含您要在 BigQuery 元存储区中运行的 PySpark 命令。
例如,以下命令会设置一个 Spark 环境,以与存储在 BigQuery Metastore 中的 Iceberg 表进行交互。然后,该命令会在该命名空间中创建一个新命名空间和一个 Iceberg 表。
from pyspark.sql import SparkSession spark = SparkSession.builder .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.
CATALOG_NAME ", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME .catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME .gcp_project", "PROJECT_ID ") \ .config("spark.sql.catalog.CATALOG_NAME .gcp_location", "LOCATION ") \ .config("spark.sql.catalog.CATALOG_NAME .warehouse", "WAREHOUSE_DIRECTORY ") \ .getOrCreate() spark.sql("USE `CATALOG_NAME `;") spark.sql("CREATE NAMESPACE IF NOT EXISTSNAMESPACE_NAME ;") spark.sql("USENAMESPACE_NAME ;") spark.sql("CREATE TABLETABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY ';")替换以下内容:
PROJECT_ID
:要运行批量作业的 Google Cloud 项目的 ID。LOCATION
:BigQuery 资源所在的位置。CATALOG_NAME
:引用 Spark 表的目录名称。TABLE_NAME
:Spark 表的表名称。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。NAMESPACE_NAME
:引用 Spark 表的命名空间名称。
使用以下
gcloud dataproc batches submit pyspark
命令提交批量作业。gcloud dataproc batches submit pyspark
PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar --properties="spark.sql.catalog.CATALOG_NAME =org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME .catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME .gcp_project=PROJECT_ID ,spark.sql.catalog.CATALOG_NAME .gcp_location=LOCATION ,spark.sql.catalog.CATALOG_NAME .warehouse=WAREHOUSE_DIRECTORY "替换以下内容:
PYTHON_SCRIPT_PATH
:批量作业使用的 Python 脚本的路径。PROJECT_ID
:要运行批量作业的 Google Cloud 项目的 ID。REGION
:工作负载运行所在的区域。BUCKET_PATH
:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的gs://
URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如mybucketname1
。
如需详细了解如何提交 PySpark 批处理作业,请参阅 PySpark gcloud 参考文档。