本文档介绍了如何使用 Dataproc Jobs 服务、Spark SQL CLI 或在 Dataproc 集群上运行的 Zeppelin 网页界面,在 BigLake metastore 中创建包含元数据的 Apache Iceberg 表。
准备工作
如果您尚未创建,请创建 Google Cloud 项目、Cloud Storage 存储桶和 Dataproc 集群。
设置项目
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
Install the Google Cloud CLI.
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
Install the Google Cloud CLI.
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
在项目中创建 Cloud Storage 存储桶。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
创建 Dataproc 集群。 为了节省资源和费用,您可以创建单节点 Dataproc 集群来运行本文档中提供的示例。
在创建集群的区域内,子网必须启用专用 Google 访问通道 (PGA)。
如果您想运行本指南中的 Zeppelin 网页界面示例,则必须使用或创建已启用 Zeppelin 可选组件的 Dataproc 集群。
向自定义服务账号授予角色(如有需要):默认情况下,Dataproc 集群虚拟机使用 Compute Engine 默认服务账号与 Dataproc 进行交互。如果您要在创建集群时指定自定义服务账号,该服务账号必须具有 Dataproc Worker 角色 (
roles/dataproc.worker
) 或具有所需 Worker 角色权限的自定义角色。在本地终端窗口或 Cloud Shell 中,使用文本编辑器(如
vi
或nano
)将以下命令复制到iceberg-table.sql
文件中,然后将该文件保存在当前目录中。USE CATALOG_NAME; CREATE NAMESPACE IF NOT EXISTS example_namespace; USE example_namespace; DROP TABLE IF EXISTS example_table; CREATE TABLE example_table (id int, data string) USING ICEBERG LOCATION 'gs://BUCKET/WAREHOUSE_FOLDER'; INSERT INTO example_table VALUES (1, 'first row'); ALTER TABLE example_table ADD COLUMNS (newDoubleCol double); DESCRIBE TABLE example_table;
替换以下内容:
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 WAREHOUSE_FOLDER:用于 Iceberg 数据仓库的 Cloud Storage 存储桶和文件夹。
使用 gcloud CLI 将本地
iceberg-table.sql
复制到 Cloud Storage 的存储桶中。gcloud storage cp iceberg-table.sql gs://BUCKET/
在本地终端窗口或 Cloud Shell 中,运行以下
curl
命令,将iceberg-spark-runtime-3.5_2.12-1.6.1
JAR 文件下载到当前目录。curl -o iceberg-spark-runtime-3.5_2.12-1.6.1.jar https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar
使用 gcloud CLI 将本地
iceberg-spark-runtime-3.5_2.12-1.6.1
JAR 文件从当前目录复制到 Cloud Storage 的存储桶中。gcloud storage cp iceberg-spark-runtime-3.5_2.12-1.6.1.jar gs://BUCKET/
在本地终端窗口或 Cloud Shell 中运行以下 gcloud dataproc jobs submit spark-sql 命令,以提交 Spark SQL 作业来创建 Iceberg 表。
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars="gs://BUCKET/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER" \ -f="gs://BUCKETiceberg-table.sql"
注意:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- CLUSTER_NAME:Dataproc 集群的名称。
- REGION:您的集群所在的 Compute Engine 区域。
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 WAREHOUSE_FOLDER:用于 Iceberg 数据仓库的 Cloud Storage 存储桶和文件夹。
- LOCATION:受支持的 BigQuery 位置。默认位置为“US”。
--jars
:列出的 jar 是创建 BigQuery metastore中表元数据所必需的。--properties
:目录属性。-f
:您复制到 Cloud Storage 中存储桶的iceberg-table.sql
作业文件。
在作业完成后,查看终端输出中的表格说明。
Time taken: 2.194 seconds id int data string newDoubleCol double Time taken: 1.479 seconds, Fetched 3 row(s) Job JOB_ID finished successfully.
查看 BigQuery 中的表元数据
在 Google Cloud 控制台中,前往 BigQuery 页面。
查看 Iceberg 表元数据。
在 Google Cloud 控制台中,前往 Dataproc 提交作业。
前往“提交作业”页面,然后填写以下字段:
- 作业 ID:接受建议的 ID 或插入您自己的 ID。
- 区域:选择集群所在的区域。
- 集群:选择您的集群。
- 作业类型:选择
SparkSql
。 - 查询来源类型:选择
Query file
。 - 查询文件:插入
gs://BUCKET/iceberg-table.sql
- Jar 文件:插入以下内容:
gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
- 属性:点击
key
value
输入字段的列表,然后复制以下键和值对以定义五个属性。# 键 值 1. spark.sql.catalog.CATALOG_NAME
org.apache.iceberg.spark.SparkCatalog
2. spark.sql.catalog.CATALOG_NAME.catalog-impl
org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
3. spark.sql.catalog.CATALOG_NAME.gcp_project
PROJECT_ID
4. spark.sql.catalog.CATALOG_NAME.gcp_location
LOCATION
5. spark.sql.catalog.CATALOG_NAME.warehouse
gs://BUCKET/WAREHOUSE_FOLDER
添加属性 五次,以创建包含五个
注意:
- CATALOG_NAME:Iceberg 目录名称。
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。区域是集群所在的区域。
- LOCATION:受支持的 BigQuery 位置。默认位置为“US”。
- BUCKET 和 WAREHOUSE_FOLDER:用于 Iceberg 数据仓库的 Cloud Storage 存储桶和文件夹。
点击提交
如需监控作业进度和查看作业输出,请前往 Google Cloud 控制台中的 Dataproc 作业页面,然后点击
Job ID
以打开作业详细信息页面。查看 BigQuery 中的表元数据
在 Google Cloud 控制台中,前往 BigQuery 页面。
查看 Iceberg 表元数据。
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- CLUSTER_NAME:Dataproc 集群的名称。
- REGION:您的集群所在的 Compute Engine 区域。
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 WAREHOUSE_FOLDER:用于 Iceberg 数据仓库的 Cloud Storage 存储桶和文件夹。 LOCATION:受支持的 BigQuery 位置。 默认位置为“US”。
jarFileUris
:列出的 jar 文件是创建 BigQuery Metastore 中表元数据所必需的。properties
:目录属性。queryFileUri
:您复制到 Cloud Storage 中存储桶的iceberg-table.sql
作业文件。在 Google Cloud 控制台中,前往 BigQuery 页面。
查看 Iceberg 表元数据。
使用 SSH 连接到您的 Dataproc 集群主节点。
在 SSH 会话终端中,使用
vi
或nano
文本编辑器将以下命令复制到iceberg-table.sql
文件中。SET CATALOG_NAME = `CATALOG_NAME`; SET BUCKET = `BUCKET`; SET WAREHOUSE_FOLDER = `WAREHOUSE_FOLDER`; USE `${CATALOG_NAME}`; CREATE NAMESPACE IF NOT EXISTS `${CATALOG_NAME}`.example_namespace; DROP TABLE IF EXISTS `${CATALOG_NAME}`.example_namespace.example_table; CREATE TABLE `${CATALOG_NAME}`.example_namespace.example_table (id int, data string) USING ICEBERG LOCATION 'gs://${BUCKET}/${WAREHOUSE_FOLDER}'; INSERT INTO `${CATALOG_NAME}`.example_namespace.example_table VALUES (1, 'first row'); ALTER TABLE `${CATALOG_NAME}`.example_namespace.example_table ADD COLUMNS (newDoubleCol double); DESCRIBE TABLE `${CATALOG_NAME}`.example_namespace.example_table;
替换以下内容:
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 WAREHOUSE_FOLDER:用于 Iceberg 数据仓库的 Cloud Storage 存储桶和文件夹。
在 SSH 会话终端中,运行以下
spark-sql
命令以创建 Iceberg 表。spark-sql \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1 \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER \ -f iceberg-table.sql
替换以下内容:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- LOCATION:受支持的 BigQuery 位置。默认位置为“US”。
查看 BigQuery 中的表元数据
在 Google Cloud 控制台中,前往 BigQuery 页面。
查看 Iceberg 表元数据。
在 Google Cloud 控制台中,前往 Dataproc 集群页面。
选择您的集群名称以打开集群详情页面。
点击网页界面标签页,显示指向集群上安装的默认组件和可选组件网页界面的组件网关链接列表。
点击 Zeppelin 链接以打开 Zeppelin 网页界面。
在 Zeppelin 网页界面中,点击匿名菜单,然后点击解释器以打开解释器页面。
将两个 jar 添加到 Zeppelin Spark 解释器,如下所示:
- 在
Search interpreters
框中输入“Spark”,滚动到 Spark 解释器部分。 - 点击 edit(修改)。
将以下内容粘贴到 spark.jars 字段中:
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
在 Spark 解释器部分底部点击保存,然后点击确定以更新解释器,并使用新设置重新启动 Spark 解释器。
- 在
在 Zeppelin 笔记本菜单中,点击创建新笔记。
在创建新笔记对话框中,输入笔记本的名称,并接受默认的 spark 解释器。点击创建以打开笔记本。
填充变量后,将以下 PySpark 代码复制到 Zeppelin 笔记本中。
%pyspark
from pyspark.sql import SparkSession
project_id = "PROJECT_ID" catalog = "CATALOG_NAME" namespace = "NAMESPACE" location = "LOCATION" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \ .config(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config(f"spark.sql.catalog.{catalog}.gcp_project", f"{project_id}") \ .config(f"spark.sql.catalog.{catalog}.gcp_location", f"{location}") \ .config(f"spark.sql.catalog.{catalog}.warehouse", f"{warehouse_dir}") \ .getOrCreate()
spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;")
\# Create table and display schema (without LOCATION) spark.sql("DROP TABLE IF EXISTS example_iceberg_table") spark.sql("CREATE TABLE example_iceberg_table (id int, data string) USING ICEBERG") spark.sql("DESCRIBE example_iceberg_table;")
\# Insert table data. spark.sql("INSERT INTO example_iceberg_table VALUES (1, 'first row');")
\# Alter table, then display schema. spark.sql("ALTER TABLE example_iceberg_table ADD COLUMNS (newDoubleCol double);")
\# Select and display the contents of the table. spark.sql("SELECT * FROM example_iceberg_table").show()替换以下内容:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- CATALOG_NAME 和 NAMESPACE:Iceberg 目录名称和命名空间组合起来,用于标识 Iceberg 表 (
catalog.namespace.table_name
)。 - LOCATION:受支持的 BigQuery 位置。默认位置为“US”。
- BUCKET 和 WAREHOUSE_DIRECTORY:用作 Iceberg 数据仓库目录的 Cloud Storage 存储桶和文件夹。
点击运行图标或按
Shift-Enter
以运行代码。作业完成后,状态消息会显示“Spark Job Finished”,输出内容会显示表格内容:查看 BigQuery 中的表元数据
在 Google Cloud 控制台中,前往 BigQuery 页面。
查看 Iceberg 表元数据。
开源数据库到 BigQuery 数据集的映射
请注意以下开源数据库与 BigQuery 数据集术语之间的映射:
开源数据库 BigQuery 数据集 命名空间、数据库 数据集 分区表或未分区表 表格 查看 查看 创建 Iceberg 表
本部分介绍了如何通过向 Dataproc 服务、Spark SQL CLI 和 Zeppelin 组件网页界面(在 Dataproc 集群上运行)提交 Spark SQL 代码,在 BigLake metastore 中创建包含元数据的 Iceberg 表。
Dataproc 作业
您可以通过以下方式将作业提交到 Dataproc 服务:使用 Google Cloud 控制台或 Google Cloud CLI 将作业提交到 Dataproc 集群,或通过 HTTP REST 请求,或通过程序化的 gRPC Dataproc Cloud 客户端库调用 Dataproc Jobs API。
本部分中的示例介绍了如何向 Dataproc 服务提交 Dataproc Spark SQL 作业,以使用 gcloud CLI、 Google Cloud 控制台或 Dataproc REST API 在 BigQuery 中创建包含元数据的 Iceberg 表。
准备作业文件
请执行以下步骤来创建 Spark SQL 作业文件。该文件包含用于创建和更新 Iceberg 表的 Spark SQL 命令。
接下来,下载
iceberg-spark-runtime-3.5_2.12-1.6.1
JAR 文件并将其复制到 Cloud Storage。提交 Spark SQL 作业
选择一个标签页,按照说明使用 gcloud CLI、Google Cloud 控制台或 Dataproc REST API 将 Spark SQL 作业提交到 Dataproc 服务。
gcloud
控制台
请执行以下步骤,使用 Google Cloud 控制台将 Spark SQL 作业提交到 Dataproc 服务,以在 BigLake metastore 中创建包含元数据的 Iceberg 表。
REST
您可以使用 Dataproc jobs.submit API 将 Spark SQL 作业提交到 Dataproc 服务,以在 BigLake metastore 中创建包含元数据的 Iceberg 表。
在使用任何请求数据之前,请先进行以下替换:
HTTP 方法和网址:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
请求 JSON 正文:
{ "projectId": "PROJECT_ID", "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "statusHistory": [], "reference": { "jobId": "", "projectId": "PROJECT_ID" }, "sparkSqlJob": { "properties": { "spark.sql.catalog."CATALOG_NAME": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog."CATALOG_NAME".catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog", "spark.sql.catalog."CATALOG_NAME".gcp_project": "PROJECT_ID", "spark.sql.catalog."CATALOG_NAME".gcp_location": "LOCATION", "spark.sql.catalog."CATALOG_NAME".warehouse": "gs://BUCKET/WAREHOUSE_FOLDER" }, "jarFileUris": [ "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" ], "scriptVariables": {}, "queryFileUri": "gs://BUCKET/iceberg-table.sql" } } }
如需发送您的请求,请展开以下选项之一:
您应该收到类似以下内容的 JSON 响应:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "..." }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "..." }, "status": { "state": "PENDING", "stateStartTime": "..." }, "submittedBy": "USER", "sparkSqlJob": { "queryFileUri": "gs://BUCKET/iceberg-table.sql", "properties": { "spark.sql.catalog.USER_catalog": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.USER_catalog.catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog", "spark.sql.catalog.USER_catalog.gcp_project": "PROJECT_ID", "spark.sql.catalog.USER_catalog.gcp_location": "LOCATION", "spark.sql.catalog.USER_catalog.warehouse": "gs://BUCKET/WAREHOUSE_FOLDER" }, "jarFileUris": [ "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar", "gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" ] }, "driverControlFilesUri": "gs://dataproc-...", "driverOutputResourceUri": "gs://dataproc-.../driveroutput", "jobUuid": "...", "region": "REGION" }
如需监控作业进度和查看作业输出,请前往 Google Cloud 控制台中的 Dataproc 作业页面,然后点击
Job ID
以打开作业详细信息页面。查看 BigQuery 中的表元数据
Spark SQL CLI
以下步骤介绍了如何使用在 Dataproc 集群的主节点上运行的 Spark SQL CLI 创建包含存储在 BigLake metastore 中的表元数据的 Iceberg 表。
Zeppelin 网页界面
以下步骤介绍了如何使用在 Dataproc 集群的主节点上运行的 Zeppelin 网页界面,创建包含存储在 BigLake metastore 中的表元数据的 Iceberg 表。