Acceder a metadatos en Apache Spark

En esta página se describe cómo crear un clúster de Dataproc que ejecute Spark. Puedes usar este clúster para trabajar con metadatos de Dataplex Universal Catalog de lagos, zonas y recursos.

Información general

Crea un clúster después de que la instancia del servicio Dataproc Metastore se asocie al lago de Dataplex Universal Catalog para asegurarte de que el clúster pueda usar el endpoint de Hive Metastore para acceder a los metadatos de Dataplex Universal Catalog.

Se puede acceder a los metadatos gestionados en Dataplex Universal Catalog mediante interfaces estándar, como Hive Metastore, para potenciar las consultas de Spark. Las consultas se ejecutan en el clúster de Dataproc.

En el caso de los datos Parquet, asigna el valor spark.sql.hive.convertMetastoreParquet a la propiedad de Spark false para evitar errores de ejecución. Más detalles

Crear una agrupación Dataproc

Ejecuta los siguientes comandos para crear un clúster de Dataproc y especificar el servicio Dataproc Metastore asociado al lago de Universal Catalog de Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Consultar metadatos

Ejecuta consultas de DQL para explorar los metadatos y consultas de Spark para consultar los datos.

Antes de empezar

  1. Abre una sesión SSH en el nodo principal del clúster de Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. En la petición de comandos del nodo principal, abre un nuevo REPL de Python.

    python3
    

Mostrar bases de datos

Cada zona de Dataplex Universal Catalog de un lago se asigna a una base de datos de metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Mostrar lista de tablas

Muestra las tablas de una de las zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consultar datos

Consulta los datos de una de las tablas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Crear tablas y particiones en los metadatos

Ejecuta consultas DDL para crear tablas y particiones en los metadatos de Dataplex Universal Catalog con Apache Spark.

Para obtener más información sobre los tipos de datos, los formatos de archivo y los formatos de fila admitidos, consulta Valores admitidos.

Antes de empezar

Antes de crear una tabla, crea un recurso de Universal Catalog de Dataplex que se asigne al segmento de Cloud Storage que contiene los datos subyacentes. Para obtener más información, consulta el artículo Añadir un recurso.

Crear una tabla

Se admiten tablas Parquet, ORC, AVRO, CSV y JSON.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Modificar una tabla

Dataplex Universal Catalog no te permite cambiar la ubicación de una tabla ni editar las columnas de partición de una tabla. Si modificas una tabla, no se asigna automáticamente el valor true a userManaged.

En Spark SQL, puede cambiar el nombre de una tabla, añadir columnas y definir el formato de archivo de una tabla.

Cambiar el nombre de una tabla

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Añade las columnas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Definir el formato del archivo

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Eliminar una tabla

Si eliminas una tabla de la API de metadatos de Dataplex Universal Catalog, no se eliminarán los datos subyacentes de Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Añadir una partición

Dataplex Universal Catalog no permite modificar una partición una vez creada. Sin embargo, la partición se puede eliminar.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Puede añadir varias particiones de la misma clave de partición y diferentes valores de partición, como se muestra en el ejemplo anterior.

Eliminar una partición

Para eliminar una partición, ejecuta el siguiente comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Consultar tablas Iceberg

Puedes consultar tablas de Iceberg con Apache Spark.

Antes de empezar

Configura una sesión de Spark SQL con Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Crear una tabla Iceberg

Para crear una tabla Iceberg, ejecuta el siguiente comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Consultar el historial y la vista general de Iceberg

Puedes obtener las versiones y el historial de las tablas de Iceberg con Apache Spark.

Antes de empezar

Configura una sesión de PySpark con compatibilidad con Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Obtener el historial de tablas de Iceberg

Para obtener el historial de una tabla Iceberg, ejecuta el siguiente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Obtener capturas de tablas Iceberg

Para obtener una instantánea de una tabla Iceberg, ejecuta el siguiente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de datos y formatos de archivo admitidos

Los tipos de datos admitidos se definen de la siguiente manera:

Tipo de datos Valores
Primitive
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Matriz ARRAY < DATA_TYPE >
Estructura STRUCT < COLUMN : DATA_TYPE >

Estos son los formatos de archivo admitidos:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para obtener más información sobre los formatos de archivo, consulta Formatos de almacenamiento.

Estos son los formatos de fila admitidos:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Siguientes pasos