在 BigQuery Metastore 中创建包含元数据的 Apache Iceberg 表

本文档介绍了如何使用 Dataproc Jobs 服务、Spark SQL CLI 或在 Dataproc 集群上运行的 Zeppelin 网页界面,在 BigQuery Metastore 中创建包含元数据的 Apache Iceberg 表。

准备工作

如果您尚未创建 Google Cloud 项目、Cloud Storage 存储桶和 Dataproc 集群,请先创建这些资源。

  1. 设置项目

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Make sure that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    5. Install the Google Cloud CLI.
    6. To initialize the gcloud CLI, run the following command:

      gcloud init
    7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    8. Make sure that billing is enabled for your Google Cloud project.

    9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    10. Install the Google Cloud CLI.
    11. To initialize the gcloud CLI, run the following command:

      gcloud init

  2. 在您的项目中创建 Cloud Storage 存储桶

    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a name that meets the bucket naming requirements.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select a storage class.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.

  3. 创建 Dataproc 集群。 为节省资源和费用,您可以创建单节点 Dataproc 集群来运行本文档中介绍的示例。

  4. 向自定义服务账号授予角色(如有必要):默认情况下,Dataproc 集群虚拟机使用 Compute Engine 默认服务账号与 Dataproc 交互。如果您想在创建集群时指定自定义服务账号,则该服务账号必须具有 Dataproc Worker 角色 (roles/dataproc.worker) 角色或具有所需 Worker 角色权限的自定义角色。

OSS 数据库与 BigQuery 数据集的映射

请注意开源数据库和 BigQuery 数据集术语之间的以下映射:

OSS 数据库 BigQuery 数据集
命名空间、数据库 数据集
分区表或非分区表
查看 查看

创建 Iceberg 表

本部分介绍了如何通过将 Spark SQL 代码提交到在 Dataproc 集群上运行的 Dataproc 服务Spark SQL CLIZeppelin 组件 网页界面,在 BigQuery Metastore 中创建包含元数据的 Iceberg 表。

Dataproc 作业

您可以使用 Google Cloud 控制台Google Cloud CLI 将作业提交到 Dataproc 集群,也可以通过 HTTP REST 请求或程序化 gRPC Dataproc Cloud 客户端库调用 Dataproc Jobs API 将作业提交到 Dataproc 服务。

本部分中的示例展示了如何使用 gcloud CLI、Google Cloud 控制台或 Dataproc REST API 向 Dataproc 服务提交 Dataproc Spark SQL 作业,以便在 BigQuery 中创建包含元数据的 Iceberg 表。

准备作业文件

请按以下步骤创建 Spark SQL 作业文件。该文件包含用于创建和更新 Iceberg 表的 Spark SQL 命令。

  1. 在本地终端窗口或 Cloud Shell 中,使用文本编辑器(例如 vinano)将以下命令复制到 iceberg-table.sql 文件中,然后将该文件保存在当前目录中。

    USE CATALOG_NAME;
    CREATE NAMESPACE IF NOT EXISTS example_namespace;
    USE example_namespace;
    DROP TABLE IF EXISTS example_table;
    CREATE TABLE example_table (id int, data string) USING ICEBERG LOCATION 'gs://BUCKET/WAREHOUSE_FOLDER';
    INSERT INTO example_table VALUES (1, 'first row');
    ALTER TABLE example_table ADD COLUMNS (newDoubleCol double);
    DESCRIBE TABLE example_table;
    

    替换以下内容:

    • CATALOG_NAME:Iceberg 目录名称。
    • BUCKETWAREHOUSE_FOLDER:用于 Iceberg 仓库的 Cloud Storage 存储桶和文件夹。
  2. 使用 gsutil 工具将本地 iceberg-table.sql 复制到 Cloud Storage 中的存储桶。

    gsutil cp iceberg-table.sql gs://BUCKET/
    

接下来,下载并将 iceberg-spark-runtime-3.5_2.12-1.5.2 JAR 文件复制到 Cloud Storage。

  1. 在本地终端窗口或 Cloud Shell 中,运行以下 curl 命令将 iceberg-spark-runtime-3.5_2.12-1.5.2 JAR 文件下载到当前目录。

    curl -o iceberg-spark-runtime-3.5_2.12-1.5.2.jar https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar
    
  2. 使用 gsutil 工具将本地 iceberg-spark-runtime-3.5_2.12-1.5.2 JAR 文件从当前目录复制到 Cloud Storage 中的存储桶。

    gsutil cp iceberg-spark-runtime-3.5_2.12-1.5.2.jar gs://BUCKET/
    

提交 Spark SQL 作业

选择一个标签页,然后按照说明使用 gcloud CLI、Google Cloud 控制台或 Dataproc REST API 将 Spark SQL 作业提交到 Dataproc 服务。

gcloud

  1. 在本地终端窗口或 Cloud Shell 中,以本地运行方式运行以下 gcloud dataproc jobs submit spark-sql 命令,以提交 Spark SQL 作业来创建 Iceberg 表。

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars="gs://BUCKET/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar" \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER" \
        -f="gs://BUCKETiceberg-table.sql"
    

    注意:

    • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分。
    • CLUSTER_NAME:您的 Dataproc 集群的名称。
    • REGION:集群所在的 Compute Engine 区域
    • CATALOG_NAME:Iceberg 目录名称。
    • BUCKETWAREHOUSE_FOLDER:用于 Iceberg 仓库的 Cloud Storage 存储桶和文件夹。
    • LOCATION:受支持的 BigQuery 位置。默认位置为“美国”。
    • --jars:列出的 jar 是创建 BigQuery Metastore 中表元数据的必要条件。
    • --properties目录属性
    • -f:您复制到 Cloud Storage 存储桶中的 iceberg-table.sql 作业文件。
  2. 作业完成后,在终端输出中查看表格说明。

    Time taken: 2.194 seconds
    id                      int
    data                    string
    newDoubleCol            double
    Time taken: 1.479 seconds, Fetched 3 row(s)
    Job JOB_ID finished successfully.
    
  3. 如需在 BigQuery 中查看表元数据,请执行以下操作:

    1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

      进入 BigQuery Studio

    2. 查看 Iceberg 表元数据。

控制台

请执行以下步骤,使用 Google Cloud 控制台将 Spark SQL 作业提交到 Dataproc 服务,以便在 BigQuery Metastore 中创建包含元数据的 Iceberg 表。

  1. 在 Google Cloud 控制台中,前往 Dataproc 的提交作业部分。

    前往“提交作业”页面,然后填写以下字段:

    • 作业 ID:接受建议的 ID 或插入您自己的 ID。
    • 区域:选择集群所在的区域。
    • 集群:选择您的集群。
    • 作业类型:选择 SparkSql
    • 查询来源类型:选择 Query file
    • 查询文件:插入 gs://BUCKET/iceberg-table.sql
    • Jar 文件:插入以下内容:
      gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
      
    • 属性:点击 添加属性 五次,以创建包含五个 key value 输入字段的列表,然后复制以下对以定义五个属性。
      #
      1.
      spark.sql.catalog.CATALOG_NAME
      
      org.apache.iceberg.spark.SparkCatalog
      
      2.
      spark.sql.catalog.CATALOG_NAME.catalog-impl
      
      org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
      
      3.
      spark.sql.catalog.CATALOG_NAME.gcp_project
      
      PROJECT_ID
      
      4.
      spark.sql.catalog.CATALOG_NAME.gcp_location
      
      LOCATION
      
      5.
      spark.sql.catalog.CATALOG_NAME.warehouse
      
      gs://BUCKET/WAREHOUSE_FOLDER
      

    注意:

    • CATALOG_NAME:Iceberg 目录名称。
    • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。您集群所在的区域
    • LOCATION:受支持的 BigQuery 位置。默认位置为“美国”。
    • BUCKETWAREHOUSE_FOLDER:用于 Iceberg 仓库的 Cloud Storage 存储桶和文件夹。
  2. 点击提交

  3. 如需监控作业进度和查看作业输出,请前往 Google Cloud 控制台中的 Dataproc 作业页面,然后点击 Job ID 以打开作业详情页面。

  4. 如需在 BigQuery 中查看表元数据,请执行以下操作:

    1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

      进入 BigQuery Studio

    2. 查看 Iceberg 表元数据。

REST

您可以使用 Dataproc jobs.submit API 将 Spark SQL 作业提交到 Dataproc 服务,以便在 BigQuery Metastore 中创建包含元数据的 Iceberg 表。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分。
  • CLUSTER_NAME:您的 Dataproc 集群的名称。
  • REGION:集群所在的 Compute Engine 区域
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETWAREHOUSE_FOLDER:用于 Iceberg 仓库的 Cloud Storage 存储桶和文件夹。
  • LOCATION:受支持的 BigQuery 位置。 默认位置为“美国”。
  • jarFileUris:列出的 jar 是创建 BigQuery Metastore 中表元数据的必要条件。
  • properties目录属性
  • queryFileUri:您复制到 Cloud Storage 存储桶中的 iceberg-table.sql 作业文件。

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

请求 JSON 正文:

{
  "projectId": "PROJECT_ID",
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "statusHistory": [],
    "reference": {
      "jobId": "",
      "projectId": "PROJECT_ID"
    },
    "sparkSqlJob": {
      "properties": {
        "spark.sql.catalog."CATALOG_NAME": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog."CATALOG_NAME".catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
        "spark.sql.catalog."CATALOG_NAME".gcp_project": "PROJECT_ID",
        "spark.sql.catalog."CATALOG_NAME".gcp_location": "LOCATION",
        "spark.sql.catalog."CATALOG_NAME".warehouse": "gs://BUCKET/WAREHOUSE_FOLDER"
      },
      "jarFileUris": [
        "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"
      ],
      "scriptVariables": {},
      "queryFileUri": "gs://BUCKET/iceberg-table.sql"
    }
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "..."
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "..."
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "..."
  },
  "submittedBy": "USER",
  "sparkSqlJob": {
    "queryFileUri": "gs://BUCKET/iceberg-table.sql",
    "properties": {
      "spark.sql.catalog.USER_catalog": "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.USER_catalog.catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
      "spark.sql.catalog.USER_catalog.gcp_project": "PROJECT_ID",
      "spark.sql.catalog.USER_catalog.gcp_location": "LOCATION",
      "spark.sql.catalog.USER_catalog.warehouse": "gs://BUCKET/WAREHOUSE_FOLDER"
    },
    "jarFileUris": [
      "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar",
      "gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"
    ]
  },
  "driverControlFilesUri": "gs://dataproc-...",
  "driverOutputResourceUri": "gs://dataproc-.../driveroutput",
  "jobUuid": "...",
  "region": "REGION"
}

如需监控作业进度和查看作业输出,请前往 Google Cloud 控制台中的 Dataproc 作业页面,然后点击 Job ID 以打开作业详情页面。

如需在 BigQuery 中查看表元数据,请执行以下操作:

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    进入 BigQuery Studio

  2. 查看 Iceberg 表元数据。

Spark SQL CLI

以下步骤介绍了如何使用在 Dataproc 集群的主节点上运行的 Spark SQL CLI 创建将表元数据存储在 BigQuery Metastore 中的 Iceberg 表。

  1. 使用 SSH 连接到 Dataproc 集群的主节点。

  2. 在 SSH 会话终端中,使用 vinano 文本编辑器将以下命令复制到 iceberg-table.sql 文件中。

    SET CATALOG_NAME = `CATALOG_NAME`;
    SET BUCKET = `BUCKET`;
    SET WAREHOUSE_FOLDER = `WAREHOUSE_FOLDER`;
    USE `${CATALOG_NAME}`;
    CREATE NAMESPACE IF NOT EXISTS `${CATALOG_NAME}`.example_namespace;
    DROP TABLE IF EXISTS `${CATALOG_NAME}`.example_namespace.example_table;
    CREATE TABLE `${CATALOG_NAME}`.example_namespace.example_table (id int, data string) USING ICEBERG LOCATION 'gs://${BUCKET}/${WAREHOUSE_FOLDER}';
    INSERT INTO `${CATALOG_NAME}`.example_namespace.example_table VALUES (1, 'first row');
    ALTER TABLE `${CATALOG_NAME}`.example_namespace.example_table ADD COLUMNS (newDoubleCol double);
    DESCRIBE TABLE `${CATALOG_NAME}`.example_namespace.example_table;
    

    替换以下内容:

    • CATALOG_NAME:Iceberg 目录名称。
    • BUCKETWAREHOUSE_FOLDER:用于 Iceberg 仓库的 Cloud Storage 存储桶和文件夹。
  3. 在 SSH 会话终端中,运行以下 spark-sql 命令以创建冰山表。

    spark-sql \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER \
    -f iceberg-table.sql 
    

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分。
    • LOCATION:受支持的 BigQuery 位置。默认位置为“美国”。
  4. 在 BigQuery 中查看表元数据

    1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

      进入 BigQuery Studio

    2. 查看 Iceberg 表元数据。

Zeppelin 网页界面

以下步骤介绍了如何使用在 Dataproc 集群的主节点上运行的 Zeppelin Web 界面创建将表元数据存储在 BigQuery Metastore 中的 Iceberg 表。

  1. 在 Google Cloud 控制台中,前往 Dataproc 集群页面。

    前往“Dataproc 集群”页面

  2. 选择集群名称以打开集群详情页面。

  3. 点击网页界面标签页以显示组件网关链接列表,这些链接指向安装在集群上的默认组件和可选组件的网页界面。

  4. 点击 Zeppelin 链接以打开 Zeppelin 网页界面。

  5. 在 Zeppelin 网页界面中,点击匿名菜单,然后点击解释器以打开解释器页面。

  6. 将两个 jar 添加到 Zeppelin Spark 解释器,如下所示:

    1. Search interpreters 框中输入“Spark”,滚动到 Spark 解释器部分。
    2. 点击 edit(修改)。
    3. 将以下内容粘贴到 spark.jars 字段中:

      https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    4. 点击“Spark 解释器”部分底部的保存,然后点击确定以更新解释器并使用新设置重启 Spark 解释器。

  7. 在 Zeppelin 记事本菜单中,点击创建新记事

  8. 创建新记事对话框中,输入笔记本的名称,并接受默认的 spark 解释器。点击创建以打开该记事本。

  9. 填充变量后,将以下 PySpark 代码复制到 Zeppelin 笔记本中。

    %pyspark
    from pyspark.sql import SparkSession
    project_id = "PROJECT_ID" catalog = "CATALOG_NAME" namespace = "NAMESPACE" location = "LOCATION" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
    spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \ .config(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config(f"spark.sql.catalog.{catalog}.gcp_project", f"{project_id}") \ .config(f"spark.sql.catalog.{catalog}.gcp_location", f"{location}") \ .config(f"spark.sql.catalog.{catalog}.warehouse", f"{warehouse_dir}") \ .getOrCreate()
    spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;")
    \# Create table and display schema (without LOCATION) spark.sql("DROP TABLE IF EXISTS example_iceberg_table") spark.sql("CREATE TABLE example_iceberg_table (id int, data string) USING ICEBERG") spark.sql("DESCRIBE example_iceberg_table;")
    \# Insert table data. spark.sql("INSERT INTO example_iceberg_table VALUES (1, 'first row');")
    \# Alter table, then display schema. spark.sql("ALTER TABLE example_iceberg_table ADD COLUMNS (newDoubleCol double);")
    \# Select and display the contents of the table. spark.sql("SELECT * FROM example_iceberg_table").show()

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分。
    • CATALOG_NAMENAMESPACE:Iceberg 目录名称和命名空间组合用于标识 Iceberg 表 (catalog.namespace.table_name)。
    • LOCATION:受支持的 BigQuery 位置。默认位置为“美国”。
    • BUCKETWAREHOUSE_DIRECTORY:用作 Iceberg 仓库目录的 Cloud Storage 存储桶和文件夹。
  10. 点击“运行”图标或按 Shift-Enter 运行代码。作业完成后,状态消息会显示“Spark Job Finished”,并且输出会显示表内容:

  11. 在 BigQuery 中查看表元数据

    1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

      进入 BigQuery Studio

    2. 查看 Iceberg 表元数据。