本文档介绍了如何运行 Dataproc Serverless for Spark SQL 和 PySpark 批处理工作负载,以创建将元数据存储在 BigQuery Metastore 中的 Apache Iceberg 表。如需了解运行 Spark 代码的其他方法,请参阅在 BigQuery 记事本中运行 PySpark 代码和运行 Apache Spark 工作负载
准备工作
如果您尚未创建 Google Cloud 项目和 Cloud Storage 存储桶,请先创建这些资源。
设置项目
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
在您的项目中创建 Cloud Storage 存储桶。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
向 Compute Engine 默认服务账号
PROJECT_NUMBER-compute@developer.gserviceaccount.com
授予 BigQuery Data Editor (roles/bigquery.dataEditor
) 角色。如需查看相关说明,请参阅授予单个角色。Google Cloud CLI 示例:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role roles/bigquery.dataEditor
注意:
- PROJECT_ID 和 PROJECT_NUMBER 列在 Google Cloud 控制台信息中心的项目信息部分。
确保您将运行 Dataproc Serverless 批处理工作负载的区域 VPC 子网已启用专用 Google 访问通道。如需了解详情,请参阅创建 Iceberg 表。
OSS 资源与 BigQuery 资源的映射
请注意开源资源与 BigQuery 资源术语之间的以下对应关系:
OSS 资源 | BigQuery 资源 |
---|---|
命名空间、数据库 | 数据集 |
分区表或非分区表 | 表 |
查看 | 查看 |
创建 Iceberg 表
本部分介绍了如何使用 Dataproc Serverless Spark SQL 和 PySpark 批处理工作负载在 BigQuery Metastore 中创建包含元数据的 Iceberg 表。
Spark SQL
运行 Spark SQL 工作负载以创建 Iceberg 表
以下步骤介绍了如何运行 Dataproc Serverless Spark SQL 批处理工作负载,以创建将表元数据存储在 BigQuery Metastore 中的 Iceberg 表。
在本地或 Cloud Shell 中,将以下 Spark SQL 命令复制到
iceberg-table.sql
文件中。USE CATALOG_NAME; CREATE NAMESPACE IF NOT EXISTS example_namespace; DROP TABLE IF EXISTS example_table; CREATE TABLE example_table (id int, data string) USING ICEBERG LOCATION 'gs://BUCKET/WAREHOUSE_FOLDER'; INSERT INTO example_table VALUES (1, 'first row'); ALTER TABLE example_table ADD COLUMNS (newDoubleCol double); DESCRIBE TABLE example_table;
替换以下内容:
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 WAREHOUSE_FOLDER:用作 Iceberg 仓库目录的 Cloud Storage 存储桶和文件夹。
在本地或在 Cloud Shell 中,从包含
iceberg-table.sql
的目录运行以下命令,以提交 Spark SQL 工作负载。gcloud dataproc batches submit spark-sql iceberg-table.sql \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=2.2 \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER"
注意:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- BUCKET_NAME:Cloud Storage 存储分区的名称。Spark 会先将工作负载依赖项上传到此存储桶中的
/dependencies
文件夹,然后再运行批处理工作负载。WAREHOUSE_FOLDER 位于此存储桶中。 - SUBNET_NAME:
REGION
中已启用专用 Google 访问通道且符合其他会话子网要求的 VPC 子网的名称。 - LOCATION:受支持的 BigQuery 位置。默认位置为“美国”。
--version
:Dataproc Serverless 运行时版本 2.2 或更高版本。--properties
目录媒体资源。
在 BigQuery 中查看表元数据
在 Google Cloud 控制台中,转到 BigQuery 页面。
查看 Iceberg 表元数据。
PySpark
以下步骤介绍了如何运行 Dataproc Serverless PySpark 批处理工作负载,以创建将表元数据存储在 BigQuery Metastore 中的 Iceberg 表。
- 在本地或 Cloud Shell 中,将以下 PySpark 代码复制到
iceberg-table.py
文件中。catalog = "CATALOG_NAME" namespace = "NAMESPACE" spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;") # Create table and display schema spark.sql("DROP TABLE IF EXISTS example_iceberg_table") spark.sql("CREATE TABLE example_iceberg_table (id int, data string) USING ICEBERG") spark.sql("DESCRIBE example_iceberg_table;") # Insert table data. spark.sql("INSERT INTO example_iceberg_table VALUES (1, 'first row');") # Alter table, then display schema. spark.sql("ALTER TABLE example_iceberg_table ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE example_iceberg_table;")
替换以下内容:
- CATALOG_NAME 和 NAMESPACE:Iceberg 目录名称和命名空间组合用于标识 Iceberg 表 (
catalog.namespace.table_name
)。
- CATALOG_NAME 和 NAMESPACE:Iceberg 目录名称和命名空间组合用于标识 Iceberg 表 (
-
在本地或在 Cloud Shell 中,从包含
iceberg-table.py
的目录运行以下命令,以提交 PySpark 工作负载。gcloud dataproc batches submit pyspark iceberg-table.py \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=2.2 \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER"
注意:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- BUCKET_NAME:Cloud Storage 存储分区的名称。Spark 会先将工作负载依赖项上传到此存储桶中的
/dependencies
文件夹,然后再运行批处理工作负载。 - SUBNET_NAME:
REGION
中已启用专用 Google 访问通道且符合其他会话子网要求的 VPC 子网的名称。 --version
:Dataproc Serverless 运行时版本 2.2 或更高版本。- LOCATION:受支持的 BigQuery 位置。 默认位置为“美国”。
- BUCKET 和 WAREHOUSE_FOLDER:用作 Iceberg 仓库目录的 Cloud Storage 存储桶和文件夹。
--properties
:目录属性。
- 在 BigQuery 中查看表架构。
- 在 Google Cloud 控制台中,前往 BigQuery 页面。 进入 BigQuery Studio
- 查看 Iceberg 表元数据。