Accedere ai metadati in Apache Spark

Questa pagina descrive come creare un cluster Dataproc su cui è in esecuzione Spark.

Panoramica

Crea un cluster dopo che l'istanza del servizio Dataproc Metastore è stata associata al lake Dataplex per assicurarti che il cluster possa fare affidamento sull'endpoint Hive Metastore per accedere ai metadati di Dataplex.

È possibile accedere ai metadati gestiti in Dataplex utilizzando interfacce standard, come Hive Metastore, per eseguire query Spark. Le query vengono eseguite sul cluster Dataproc.

Per i dati Parquet, imposta la proprietà Spark spark.sql.hive.convertMetastoreParquet su false per evitare errori di esecuzione. Ulteriori dettagli.

Crea un cluster Dataproc

Esegui i seguenti comandi per creare un cluster Dataproc, specificando il servizio Dataproc Metastore associato al lake Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Esplorare i metadati

Esegui query DQL per esplorare i metadati ed eseguire query Spark per eseguire query sui dati.

Prima di iniziare

  1. Apri una sessione SSH sul nodo principale del cluster Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Al prompt dei comandi del nodo principale, apri una nuova REPL Python.

    python3
    

Elenco database

Ogni zona Dataplex all'interno del lake viene mappata a un database Metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Elenca tabelle

Elenca le tabelle in una delle zone.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Query sui dati

Esegui una query sui dati di una delle tabelle.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Creare tabelle e partizioni nei metadati

Esegui query DDL per creare tabelle e partizioni nei metadati di Dataplex utilizzando Apache Spark.

Per ulteriori informazioni sui tipi di dati, sui formati file e sui formati di riga supportati, consulta Valori supportati.

Prima di iniziare

Prima di creare una tabella, crea una risorsa Dataplex associata al bucket Cloud Storage contenente i dati sottostanti. Per ulteriori informazioni, vedi Aggiungere un asset.

Creare una tabella

Sono supportate le tabelle Parquet, ORC, AVRO, CSV e JSON.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Modificare una tabella

Dataplex non ti consente di modificare la posizione di una tabella o di modificare le colonne di partizione di una tabella. La modifica di una tabella non imposta automaticamente userManaged su true.

In Spark SQL, puoi rinominare una tabella, aggiungere colonne e impostare il formato del file di una tabella.

Rinominare una tabella

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Aggiungi colonne

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Impostare il formato file

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Eliminare una tabella

L'eliminazione di una tabella dall'API di metadati di Dataplex non comporta l'eliminazione dei dati sottostanti in Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Aggiungere una partizione

Dataplex non consente di modificare una partizione una volta creata. Tuttavia, la partizione può essere eliminata.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Puoi aggiungere più partizioni della stessa chiave di partizione e valori di partizione diversi, come mostrato nell'esempio precedente.

Inserire una partizione

Per eliminare una partizione, esegui il seguente comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Esegui query sulle tabelle Iceberg

Puoi eseguire query sulle tabelle Iceberg utilizzando Apache Spark.

Prima di iniziare

Configura una sessione Spark SQL con Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Creare una tabella Iceberg

Per creare una tabella Iceberg, esegui il seguente comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Esplorare lo snapshot e la cronologia di Iceberg

Puoi ottenere snapshot e cronologia delle tabelle Iceberg utilizzando Apache Spark.

Prima di iniziare

Configura una sessione PySpark con il supporto di Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Visualizzare la cronologia delle tabelle Iceberg

Per ottenere la cronologia di una tabella Iceberg, esegui il seguente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Ottenere snapshot delle tabelle Iceberg

Per ottenere uno snapshot di una tabella Iceberg, esegui il seguente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipi di dati e formati di file supportati

I tipi di dati supportati sono definiti come segue:

Tipo di dati Valori
originario
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array ARRAY < DATA_TYPE >
Strutturazione STRUCT < COLUMN : DATA_TYPE >

Di seguito sono riportati i formati file supportati:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Per saperne di più sui formati file, consulta Formati di archiviazione.

Di seguito sono riportati i formati di riga supportati:

  • DELIMITATI [CAMPI TERMINATI DA CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Passaggi successivi