在 Apache Spark 中访问元数据

本页面介绍了如何创建 Dataproc 集群。

概览

在 Dataproc Metastore 服务实例与 Dataplex 数据湖相关联后,您需要创建集群,以确保集群可以依赖 Hive Metastore 端点来访问 Dataplex 元数据。

您可以使用标准 API 访问在 Dataplex 中管理的元数据 等接口(例如 Hive Metastore)来为 Spark 查询提供支持。查询在 Dataproc 集群上运行。

对于 Parquet 数据,请将 Spark 属性 spark.sql.hive.convertMetastoreParquet 设置为 false,以避免执行错误。更多详情

创建 Dataproc 集群

运行以下命令以创建 Dataproc 集群,并指定与 Dataplex 数据湖关联的 Dataproc Metastore 服务:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

探索元数据

运行 DQL 查询以探索元数据,并运行 Spark 查询以查询数据。

准备工作

  1. 在 Dataproc 集群的主节点上打开 SSH 会话。

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. 在主节点命令提示符处,打开一个新的 Python REPL。

    python3
    

列出数据库

数据湖中的每个 Dataplex 可用区都会映射到一个元数据存储库。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

列出表

列出某个可用区中的表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

查询数据

查询其中一个表中的数据。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

在元数据中创建表和分区

运行 DDL 查询,以使用 Apache Spark 在 Dataplex 元数据中创建表和分区。

如需详细了解支持的数据类型、文件格式和行格式,请参阅支持的值

准备工作

在创建表之前,请创建一个映射到包含底层数据的 Cloud Storage 存储桶的 Dataplex 资产。如需了解详情,请参阅添加资产

创建表

支持 Parquet、ORC、AVRO、CSV 和 JSON 表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

修改表

Dataplex 不允许您更改表的位置或修改表的分区列。更改表格不会自动设置 userManaged 发送至 true

在 Spark SQL 中,您可以重命名表添加列设置表的文件格式

重命名表

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

添加列

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

设置文件格式

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

删除表

从 Dataplex 的元数据 API 中删除表不会删除 Cloud Storage 中的底层数据。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

添加分区

创建分区后,Dataplex 不允许对其进行更改。但是,可以丢弃该分区。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

您可以添加具有相同分区键和不同分区值的多个分区,如上例所示。

删除分区

如需删除分区,请运行以下命令:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

查询 Iceberg 表

您可以使用 Apache Spark 查询 Iceberg 表。

准备工作

与 Iceberg 设置 Spark SQL 会话。

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

创建 Iceberg 表

如需创建 Iceberg 表,请运行以下命令:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

探索冰山概况和历史

您可以使用 Apache Spark 获取 Iceberg 表的快照和历史记录。

准备工作

设置支持 Iceberg 的 PySpark 会话。

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

获取 Iceberg 表的历史记录

如需获取 Iceberg 表的历史记录,请运行以下命令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

获取 Iceberg 表的快照

如需获取 Iceberg 表的快照,请运行以下命令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

支持的数据类型和文件格式

支持的数据类型定义如下:

数据类型
基元
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
数组 ARRAY < DATA_TYPE >
结构 STRUCT < COLUMN : DATA_TYPE >

支持的文件格式定义如下:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

如需详细了解文件格式,请参阅存储格式

支持的行格式定义如下:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

后续步骤