将 BigQuery 元存储空间与 Dataproc 搭配使用
本文档介绍了如何将 BigQuery Metastore 与 Dataproc on Compute Engine 搭配使用。此连接可为您提供一个可跨开源软件引擎(例如 Apache Spark)使用的单个共享元存储。
准备工作
- 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery 和 Dataproc API。
可选:了解 BigQuery 元存储区的工作原理以及为何应使用它。
所需的角色
如需获得将 BigQuery 元存储库用作元数据存储库时使用 Spark 和 Dataproc 所需的权限,请让您的管理员为您授予以下 IAM 角色:
-
创建 Dataproc 集群:
项目中 Compute Engine 默认服务账号上的 Dataproc Worker (
roles/dataproc.worker
) -
在 Spark 中创建 BigQuery 元存储表:
-
项目中 Dataproc VM 服务账号上的 Dataproc Worker (
roles/dataproc.worker
) -
项目中 Dataproc VM 服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) -
对项目中 Dataproc 虚拟机服务账号的 Storage Object Admin (
roles/storage.objectAdmin
)
-
项目中 Dataproc VM 服务账号上的 Dataproc Worker (
-
在 BigQuery 中查询 BigQuery 元存储表:
-
针对项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) 角色 -
针对项目的 BigQuery User (
roles/bigquery.user
) -
对项目使用 Storage Object Viewer (
roles/storage.objectViewer
)
-
针对项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
一般工作流程
如需将 Dataproc on Compute Engine 与 BigQuery 元存储库搭配使用,请按照以下常规步骤操作:
- 创建 Dataproc 集群或配置现有集群。
- 连接到您的首选开源软件引擎,例如 Spark。
- 使用 JAR 文件在集群上安装 Apache Iceberg 目录插件。
- 根据您使用的开源软件引擎,根据需要创建和管理 BigQuery 元存储资源。
- 在 BigQuery 中,访问和使用您的 BigQuery Metastore 资源。
将 BigQuery Metastore 连接到 Spark
以下说明介绍了如何使用 Interactive Spark SQL 将 Dataproc 连接到 BigQuery Metastore。
下载 Iceberg 目录插件
如需将 BigQuery Metastore 与 Dataproc 和 Spark 连接,您必须使用 BigQuery Metastore Iceberg 目录插件 jar 文件。
此文件默认包含在 Dataproc 映像版本 2.2 中。如果您的 Dataproc 集群无法直接访问互联网,您必须下载该插件,并将其上传到您的 Dataproc 集群可以访问的 Cloud Storage 存储桶。
下载 BigQuery 元存储空间 Apache Iceberg 目录插件。
配置 Dataproc 集群
在连接到 BigQuery 元存储之前,您必须设置 Dataproc 集群。
为此,您可以创建新的集群或使用现有集群。然后,您可以使用此集群运行交互式 Spark SQL 并管理 BigQuery 元存储资源。
创建集群所在区域中的子网必须启用了专用 Google 访问通道 (PGA)。默认情况下,使用 2.2(默认)或更高映像版本创建的 Dataproc 集群虚拟机仅具有内部 IP 地址。如需允许集群虚拟机与 Google API 通信,请在创建集群所在区域的
default
(或用户指定的网络名称,如果适用)网络子网上启用专用 Google 访问通道。如果您想运行本指南中的 Zeppelin 网页界面示例,则必须使用或创建启用了 Zeppelin 可选组件的 Dataproc 集群。
新集群
如需创建新的 Dataproc 集群,请运行以下 gcloud
dataproc clusters create
命令。此配置包含使用 BigQuery Metastore 所需的设置。
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
替换以下内容:
CLUSTER_NAME
:Dataproc 集群的名称。PROJECT_ID
:您要创建集群的 Google Cloud 项目的 ID。LOCATION
:您要创建集群的 Google Cloud 区域。
现有集群
如需配置现有集群,请将以下 Iceberg Spark 运行时添加到集群中。
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
您可以使用以下任一选项添加运行时:
初始化脚本。将运行时依赖项添加到创建时运行的自定义初始化脚本。
将运行时依赖项添加到脚本后,请按照说明创建、重新创建和更新集群。
手动安装。手动添加 Iceberg 目录插件 JAR 文件,并配置 Spark 属性以在集群中添加运行时。
提交 Spark 作业
如需提交 Spark 作业,请使用以下方法之一:
gcloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
替换以下内容:
PROJECT_ID
:包含 Dataproc 集群的 Google Cloud 项目的 ID。CLUSTER_NAME
:您用于运行 Spark SQL 作业的 Dataproc 集群的名称。REGION
:集群所在的 Compute Engine 区域。LOCATION
:BigQuery 资源的位置。CATALOG_NAME
:您要与 SQL 作业搭配使用的 Spark 目录的名称。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹。此值以gs://
开头。SPARK_SQL_COMMAND
:您要运行的 Spark SQL 查询。此查询包含用于创建资源的命令。例如,创建命名空间和表。
Interactive Spark
连接到 Spark 并安装目录插件
如需为 BigQuery Metastore 安装目录插件,请使用 SSH 连接到 Dataproc 集群。
- 在 Google Cloud 控制台中,前往虚拟机实例页面。
如需连接到 Dataproc 虚拟机实例,请点击虚拟机实例列表中的 SSH。输出类似于以下内容:
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
在终端中,运行以下 BigQuery 元存储空间初始化命令:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
替换以下内容:
CATALOG_NAME
:您要与 SQL 作业搭配使用的 Spark 目录的名称。PROJECT_ID
:您的 Spark 目录关联的 BigQuery 元存储目录的 Google Cloud 项目 ID。LOCATION
:BigQuery 元存储空间的 Google Cloud 位置。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹。此值以gs://
开头。
成功连接到集群后,Spark 终端会显示
spark-sql
提示。spark-sql (default)>
管理 BigQuery 元存储空间资源
您现在已连接到 BigQuery 元存储区。您可以根据存储在 BigQuery 元存储区中的元数据查看现有资源或创建新资源。
例如,请尝试在交互式 Spark SQL 会话中运行以下命令,以创建 Iceberg 命名空间和表。
使用自定义 Iceberg 目录:
USE `CATALOG_NAME`;
创建命名空间:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
使用创建的命名空间:
USE NAMESPACE_NAME;
创建 Iceberg 表:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
插入表格行:
INSERT INTO TABLE_NAME VALUES (1, "first row");
添加表格列:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
查看表元数据:
DESCRIBE EXTENDED TABLE_NAME;
列出命名空间中的表:
SHOW TABLES;
Zeppelin 笔记本
在 Google Cloud 控制台中,前往 Dataproc 集群页面。
点击您要使用的集群的名称。
系统随即会打开集群详情页面。
在导航菜单中,点击网站界面。
在组件网关下,点击 Zeppelin。系统随即会打开 Zeppelin 笔记本页面。
在导航菜单中,点击记事本,然后点击 + 创建新记事。
在对话框中,输入一个记事本名称。将 Spark 保留为默认解释器。
点击创建。系统会创建一个新笔记本。
在记事本中,点击“设置”菜单,然后点击解释器。
在搜索解释器字段中,搜索 Spark。
点击修改。
在 Spark.jars 字段中,输入 Spark jar 的 URI。
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
点击保存。
点击确定。
将以下 PySpark 代码复制到您的 Zeppelin 笔记中。
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
替换以下内容:
CATALOG_NAME
:要用于 SQL 作业的 Spark 目录的名称。PROJECT_ID
:包含 Dataproc 集群的 Google Cloud 项目的 ID。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹。此值以gs://
开头。NAMESPACE_NAME
:引用 Spark 表的命名空间名称。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。TABLE_NAME
:Spark 表的表名称。
点击“运行”图标或按
Shift-Enter
运行代码。作业完成后,状态消息会显示“Spark Job Finished”,并且输出会显示表内容: