访问 Apache Spark 中的元数据

本页面介绍如何创建运行 Spark 的 Dataproc 集群。

概览

您可以在 Dataproc Metastore 服务实例与 Dataplex 数据湖关联之后创建集群,以确保集群可以依赖 Hive Metastore 端点来获取对 Dataplex 元数据的访问权限。

您可以使用标准接口(例如 Hive Metastore)访问 Dataplex 中管理的元数据,以便为 Spark 查询提供支持。这些查询在 Dataproc 集群上运行。

对于 Parquet 数据,请将 Spark 属性 spark.sql.hive.convertMetastoreParquet 设置为 false 以避免执行错误。更多详情

创建 Dataproc 集群

运行以下命令创建 Dataproc 集群,并指定与 Dataplex 数据湖关联的 Dataproc Metastore 服务:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

探索元数据

运行 DQL 查询来探索元数据,并运行 Spark 查询来查询数据。

准备工作

  1. 在 Dataproc 集群的主节点上打开 SSH 会话。

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. 在主节点命令提示符处,打开一个新的 Python REPL。

    python3
    

列出数据库

数据湖中的每个 Dataplex 可用区都会映射到一个 Metastore 数据库。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

列出表

列出其中一个可用区中的表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

查询数据

查询其中一个表中的数据。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

在元数据中创建表和分区

运行 DDL 查询,使用 Apache Spark 在 Dataplex 元数据中创建表和分区。

如需详细了解支持的数据类型、文件格式和行格式,请参阅支持的值

准备工作

在创建表之前,请创建映射到包含底层数据的 Cloud Storage 存储桶的 Dataplex 资源。如需了解详情,请参阅添加素材资源

创建表

支持 Parquet、ORC、AVRO、CSV 和 JSON 表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

修改表

Dataplex 不允许您更改表的位置或修改表的分区列。更改表不会自动将 userManaged 设置为 true

在 Spark SQL 中,您可以重命名表添加列设置表的文件格式

重命名表

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

添加列

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

设置文件格式

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

删除表

从 Dataplex 的 metadata API 中删除表不会删除 Cloud Storage 中的底层数据。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

添加分区

在创建分区后,Dataplex 不允许更改分区。不过,该分区可能会被丢弃。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

您可以添加包含同一分区键和不同分区值的多个分区,如上例所示。

删除分区

如需删除分区,请运行以下命令:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

查询 Iceberg 表

您可以使用 Apache Spark 查询 Iceberg 表。

准备工作

设置与 Iceberg 的 Spark SQL 会话。

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

创建 Iceberg 表

如需创建 Iceberg 表,请运行以下命令:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

探索 Iceberg 概况和历史

您可以使用 Apache Spark 获取 Iceberg 表的快照和历史记录。

准备工作

设置 Iceberg 支持的 PySpark 会话。

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

获取 Iceberg 表的历史记录

如需获取 Iceberg 表的历史记录,请运行以下命令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

获取 Iceberg 表的快照

如需获取 Iceberg 表的快照,请运行以下命令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

支持的数据类型和文件格式

支持的数据类型定义如下:

数据类型
原初
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
数组 ARRAY < DATA_TYPE >
结构 STRUCT < COLUMN : DATA_TYPE >

支持的文件格式定义如下:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

如需详细了解文件格式,请参阅存储格式

支持的行格式定义如下:

  • 受限 [字段由 CHAR 终止]
  • SERDE SERDE_NAME [包含 SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

后续步骤