Accede a los metadatos en Apache Spark

En esta página, se describe cómo crear un clúster de Dataproc que ejecute Spark.

Descripción general

Crea un clúster después de que la instancia del servicio de Dataproc Metastore se asocie con el lake de Dataplex para asegurarte de que el clúster pueda depender del extremo de Hive Metastore para obtener acceso a los metadatos de Dataplex.

Se puede acceder a los metadatos administrados en Dataplex mediante interfaces estándar, como Hive Metastore, para potenciar las consultas de Spark. Las consultas se ejecutan en el clúster de Dataproc.

Para los datos de Parquet, establece la propiedad spark.sql.hive.convertMetastoreParquet de Spark en false para evitar errores de ejecución. Más detalles.

Crea un clúster de Dataproc

Ejecuta los siguientes comandos para crear un clúster de Dataproc y especifica el servicio de Dataproc Metastore asociado con el lake de Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Explorar metadatos

Ejecutar consultas de DQL para explorar los metadatos y ejecutar consultas de Spark para consultar datos

Antes de comenzar

  1. Abre una sesión SSH en el nodo principal del clúster de Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. En el símbolo del sistema del nodo principal, abre un nuevo REPL de Python.

    python3
    

Enumerar bases de datos

Cada zona de Dataplex dentro del lake se asigna a una base de datos de almacén de metadatos.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Enumerar tablas

Enumera las tablas en una de las zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consulta datos

Consulta los datos en una de las tablas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Crea tablas y particiones en metadatos

Ejecutar consultas de DDL para crear tablas y particiones en los metadatos de Dataplex con Apache Spark

Para obtener más información sobre los tipos de datos, formatos de archivo y formatos de fila admitidos, consulta Valores admitidos.

Antes de comenzar

Antes de crear una tabla, crea un recurso de Dataplex que se asigne al bucket de Cloud Storage que contiene los datos subyacentes. Para obtener más información, consulta Cómo agregar un activo.

Crea una tabla

Se admiten las tablas Parquet, ORC, AVRO, CSV y JSON.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Modificar una tabla

Dataplex no te permite modificar la ubicación de una tabla ni editar sus columnas de partición. La modificación de una tabla no establece automáticamente userManaged en true.

En Spark SQL, puedes cambiar el nombre de una tabla, agregar columnas y establecer el formato de archivo de una tabla.

Cambia el nombre de una tabla

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Agregar columnas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Establece el formato de archivo

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Eliminar una tabla

Quitar una tabla de la API de metadatos de Dataplex no borra los datos subyacentes en Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Cómo agregar una partición

Dataplex no permite modificar una partición una vez creada. Sin embargo, la partición se puede descartar.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Puedes agregar varias particiones con la misma clave de partición y valores de partición diferentes, como se muestra en el ejemplo anterior.

Cómo descartar una partición

Para descartar una partición, ejecuta el siguiente comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Cómo consultar tablas de Iceberg

Puedes consultar tablas de Iceberg con Apache Spark.

Antes de comenzar

Configurar una sesión de Spark SQL con Iceberg

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Crea una tabla de Iceberg

Para crear una tabla de Iceberg, ejecuta el siguiente comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Explora la historia y la instantánea de Iceberg

Puedes obtener instantáneas y el historial de las tablas de Iceberg con Apache Spark.

Antes de comenzar

Configurar una sesión de PySpark con la asistencia de Iceberg

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Obtén historia de las tablas de Iceberg

Para obtener el historial de una tabla de Iceberg, ejecuta el siguiente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Obtén instantáneas de las tablas de Iceberg

Para obtener una instantánea de una tabla de Iceberg, ejecuta el siguiente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de datos y formatos de archivo admitidos

Los tipos de datos admitidos se definen de la siguiente manera:

Tipo de datos Valores
Básico
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array ARRAY < DATA_TYPE >
Estructuración STRUCT < COLUMN : DATA_TYPE >

Los formatos de archivo admitidos se definen de la siguiente manera:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para obtener más información sobre los formatos de archivo, consulta Formatos de almacenamiento.

Los formatos de fila admitidos se definen de la siguiente manera:

  • DELIMITADO [CAMPOS RESCINDIDOS POR CHAR]
  • SERDE SERDE_NAME [CON SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Próximos pasos