本文档介绍了如何运行 Serverless for Apache Spark SQL 和 PySpark 批处理工作负载,以创建包含存储在 BigLake metastore 中的元数据的 Apache Iceberg 表。如需了解运行 Spark 代码的其他方式,请参阅在 BigQuery 笔记本中运行 PySpark 代码和运行 Apache Spark 工作负载
准备工作
如果您尚未创建项目和 Cloud Storage 存储桶,请先创建这些资源。 Google Cloud
设置项目
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
在项目中创建 Cloud Storage 存储桶。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
向 Compute Engine 默认服务账号
PROJECT_NUMBER-compute@developer.gserviceaccount.com
授予 BigQuery Data Editor (roles/bigquery.dataEditor
) 角色。 如需查看相关说明,请参阅授予单个角色。Google Cloud CLI 示例:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role roles/bigquery.dataEditor
注意:
- PROJECT_ID 和 PROJECT_NUMBER 列在 Google Cloud 控制台信息中心的项目信息部分中。
在本地或 Cloud Shell 中将以下 Spark SQL 命令复制到
iceberg-table.sql
文件中。USE CATALOG_NAME; CREATE NAMESPACE IF NOT EXISTS example_namespace; DROP TABLE IF EXISTS example_table; CREATE TABLE example_table (id int, data string) USING ICEBERG LOCATION 'gs://BUCKET/WAREHOUSE_FOLDER'; INSERT INTO example_table VALUES (1, 'first row'); ALTER TABLE example_table ADD COLUMNS (newDoubleCol double); DESCRIBE TABLE example_table;
替换以下内容:
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 WAREHOUSE_FOLDER:用作 Iceberg 数据仓库目录的 Cloud Storage 存储桶和文件夹。
在本地或在 Cloud Shell 中从包含
iceberg-table.sql
的目录运行以下命令,以提交 Spark SQL 工作负载。gcloud dataproc batches submit spark-sql iceberg-table.sql \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=2.2 \ --subnet=SUBNET_NAME \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER"
注意:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- BUCKET_NAME:Cloud Storage 存储分区的名称。在运行批处理工作负载之前,Spark 会将工作负载依赖项上传到此存储桶中的
/dependencies
文件夹。WAREHOUSE_FOLDER 位于此存储桶中。 --version
:无服务器 Apache Spark 运行时版本 2.2 或更高版本。- SUBNET_NAME:
REGION
中的 VPC 子网的名称。如果您省略此标志,Serverless for Apache Spark 会选择会话区域中的default
子网。Serverless for Apache Spark 会在子网中启用专用 Google 访问通道 (PGA)。如需了解网络连接要求,请参阅 Google Cloud Serverless for Apache Spark 网络配置。 - LOCATION:受支持的 BigQuery 位置。默认位置为“US”。
--properties
目录属性。
查看 BigQuery 中的表元数据
在 Google Cloud 控制台中,前往 BigQuery 页面。
查看 Iceberg 表元数据。
- 在本地或在 Cloud Shell 中将以下 PySpark 代码复制到
iceberg-table.py
文件中。from pyspark.sql import SparkSession spark = SparkSession.builder.appName("iceberg-table-example").getOrCreate() catalog = "CATALOG_NAME" namespace = "NAMESPACE" spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;") # Create table and display schema spark.sql("DROP TABLE IF EXISTS example_iceberg_table") spark.sql("CREATE TABLE example_iceberg_table (id int, data string) USING ICEBERG") spark.sql("DESCRIBE example_iceberg_table;") # Insert table data. spark.sql("INSERT INTO example_iceberg_table VALUES (1, 'first row');") # Alter table, then display schema. spark.sql("ALTER TABLE example_iceberg_table ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE example_iceberg_table;")
替换以下内容:
- CATALOG_NAME 和 NAMESPACE:Iceberg 目录名称和命名空间组合起来,用于标识 Iceberg 表 (
catalog.namespace.table_name
)。
- CATALOG_NAME 和 NAMESPACE:Iceberg 目录名称和命名空间组合起来,用于标识 Iceberg 表 (
-
在本地或在 Cloud Shell 中从包含
iceberg-table.py
的目录运行以下命令,以提交 PySpark 工作负载。gcloud dataproc batches submit pyspark iceberg-table.py \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=2.2 \ --subnet=SUBNET_NAME \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER"
注意:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- BUCKET_NAME:Cloud Storage 存储分区的名称。Spark 会在运行批处理工作负载之前,将工作负载依赖项上传到此存储桶中的
/dependencies
文件夹。 --version
:无服务器 Apache Spark 运行时版本 2.2 或更高版本。- SUBNET_NAME:
REGION
中的 VPC 子网的名称。如果您省略此标志,Serverless for Apache Spark 会选择会话区域中的default
子网。Serverless for Apache Spark 会在子网上启用专用 Google 访问通道 (PGA)。如需了解网络连接要求,请参阅 Google Cloud Serverless for Apache Spark 网络配置。 - LOCATION:受支持的 BigQuery 位置。 默认位置为“US”。
- BUCKET 和 WAREHOUSE_FOLDER:用作 Iceberg 数据仓库目录的 Cloud Storage 存储桶和文件夹。
--properties
:目录属性。
- 在 BigQuery 中查看表架构。
- 在 Google Cloud 控制台中,前往 BigQuery 页面。 进入 BigQuery Studio
- 查看 Iceberg 表元数据。
从 OSS 资源到 BigQuery 资源的映射
请注意以下开源资源与 BigQuery 资源术语之间的映射:
OSS 资源 BigQuery 资源 命名空间、数据库 数据集 分区表或未分区表 表格 视图 视图 创建 Iceberg 表
本部分介绍了如何使用 Serverless for Apache Spark Spark SQL 和 PySpark 批处理工作负载,在 BigLake metastore 中创建包含元数据的 Iceberg 表。
Spark SQL
运行 Spark SQL 工作负载以创建 Iceberg 表
以下步骤介绍了如何运行 Serverless for Apache Spark Spark SQL 批处理工作负载,以创建包含存储在 BigLake metastore 中的表元数据的 Iceberg 表。
PySpark
以下步骤介绍了如何运行 Serverless for Apache Spark PySpark 批处理工作负载,以创建包含存储在 BigLake metastore 中的表元数据的 Iceberg 表。