En esta página se describe cómo crear un clúster de Dataproc que ejecute Spark. Puedes usar este clúster para trabajar con metadatos de Dataplex Universal Catalog de lagos, zonas y recursos.
Información general
Crea un clúster después de que la instancia del servicio Dataproc Metastore se asocie al lago de Dataplex Universal Catalog para asegurarte de que el clúster pueda usar el endpoint de Hive Metastore para acceder a los metadatos de Dataplex Universal Catalog.
Se puede acceder a los metadatos gestionados en Dataplex Universal Catalog mediante interfaces estándar, como Hive Metastore, para potenciar las consultas de Spark. Las consultas se ejecutan en el clúster de Dataproc.
En el caso de los datos Parquet, asigna el valor spark.sql.hive.convertMetastoreParquet
a la propiedad de Spark false
para evitar errores de ejecución. Más detalles
Crear una agrupación Dataproc
Ejecuta los siguientes comandos para crear un clúster de Dataproc y especificar el servicio Dataproc Metastore asociado al lago de Universal Catalog de Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Consultar metadatos
Ejecuta consultas de DQL para explorar los metadatos y consultas de Spark para consultar los datos.
Antes de empezar
Abre una sesión SSH en el nodo principal del clúster de Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
En la petición de comandos del nodo principal, abre un nuevo REPL de Python.
python3
Mostrar bases de datos
Cada zona de Dataplex Universal Catalog de un lago se asigna a una base de datos de metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Mostrar lista de tablas
Muestra las tablas de una de las zonas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Consultar datos
Consulta los datos de una de las tablas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Crear tablas y particiones en los metadatos
Ejecuta consultas DDL para crear tablas y particiones en los metadatos de Dataplex Universal Catalog con Apache Spark.
Para obtener más información sobre los tipos de datos, los formatos de archivo y los formatos de fila admitidos, consulta Valores admitidos.
Antes de empezar
Antes de crear una tabla, crea un recurso de Universal Catalog de Dataplex que se asigne al segmento de Cloud Storage que contiene los datos subyacentes. Para obtener más información, consulta el artículo Añadir un recurso.
Crear una tabla
Se admiten tablas Parquet, ORC, AVRO, CSV y JSON.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Modificar una tabla
Dataplex Universal Catalog no te permite cambiar la ubicación de una tabla ni editar las columnas de partición de una tabla. Si modificas una tabla, no se asigna automáticamente el valor true
a userManaged.
En Spark SQL, puede cambiar el nombre de una tabla, añadir columnas y definir el formato de archivo de una tabla.
Cambiar el nombre de una tabla
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Añade las columnas
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Definir el formato del archivo
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Eliminar una tabla
Si eliminas una tabla de la API de metadatos de Dataplex Universal Catalog, no se eliminarán los datos subyacentes de Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Añadir una partición
Dataplex Universal Catalog no permite modificar una partición una vez creada. Sin embargo, la partición se puede eliminar.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
Puede añadir varias particiones de la misma clave de partición y diferentes valores de partición, como se muestra en el ejemplo anterior.
Eliminar una partición
Para eliminar una partición, ejecuta el siguiente comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Consultar tablas Iceberg
Puedes consultar tablas de Iceberg con Apache Spark.
Antes de empezar
Configura una sesión de Spark SQL con Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Crear una tabla Iceberg
Para crear una tabla Iceberg, ejecuta el siguiente comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Consultar el historial y la vista general de Iceberg
Puedes obtener las versiones y el historial de las tablas de Iceberg con Apache Spark.
Antes de empezar
Configura una sesión de PySpark con compatibilidad con Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Obtener el historial de tablas de Iceberg
Para obtener el historial de una tabla Iceberg, ejecuta el siguiente comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Obtener capturas de tablas Iceberg
Para obtener una instantánea de una tabla Iceberg, ejecuta el siguiente comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipos de datos y formatos de archivo admitidos
Los tipos de datos admitidos se definen de la siguiente manera:
Tipo de datos | Valores |
---|---|
Primitive |
|
Matriz | ARRAY < DATA_TYPE > |
Estructura | STRUCT < COLUMN : DATA_TYPE > |
Estos son los formatos de archivo admitidos:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Para obtener más información sobre los formatos de archivo, consulta Formatos de almacenamiento.
Estos son los formatos de fila admitidos:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]