Questa pagina descrive come creare un cluster Dataproc che esegue Spark.
Panoramica
Puoi creare un cluster dopo che il servizio Dataproc Metastore è associata al lake Dataplex per garantire che può fare affidamento sull'endpoint Hive Metastore per ottenere l'accesso Metadati Dataplex.
È possibile accedere ai metadati gestiti in Dataplex utilizzando interfacce standard, come Hive Metastore, per eseguire query Spark. Le query vengono eseguite il di un cluster Dataproc.
Per i dati Parquet, imposta la proprietà Spark spark.sql.hive.convertMetastoreParquet
su
false
per evitare errori di esecuzione. Ulteriori dettagli.
Crea un cluster Dataproc
Esegui i seguenti comandi per creare un cluster Dataproc, specificando il servizio Dataproc Metastore associato al lake Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Esplorare i metadati
Esegui query DQL per esplorare i metadati ed eseguire query Spark per eseguire query sui dati.
Prima di iniziare
Apri una sessione SSH sul nodo principale del cluster Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
Nel prompt dei comandi del nodo primario, apri un nuovo REPL Python.
python3
Elenco database
Ogni zona Dataplex all'interno del lake viene mappata a un database Metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Elenca tabelle
Elenca le tabelle in una delle zone.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Query sui dati
Esegui una query sui dati in una delle tabelle.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
crea tabelle e partizioni nei metadati
Esegui query DDL per creare tabelle e partizioni nei metadati di Dataplex utilizzando Apache Spark.
Per ulteriori informazioni sui tipi di dati, sui formati file e sui formati di riga supportati, consulta Valori supportati.
Prima di iniziare
Prima di creare una tabella, crea una risorsa Dataplex associata al bucket Cloud Storage contenente i dati sottostanti. Per ulteriori informazioni, vedi Aggiungere un asset.
Creare una tabella
Sono supportate le tabelle Parquet, ORC, AVRO, CSV e JSON.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Modificare una tabella
Dataplex non consente di alterare la posizione di una tabella o di modificare
le colonne di partizione di una tabella. La modifica di una tabella non viene impostata automaticamente
userManaged
a true
.
In Spark SQL, puoi rinominare una tabella, aggiungere colonne e impostare il formato del file di una tabella.
Rinominare una tabella
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Aggiungi colonne
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Impostare il formato file
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Trascina una tabella
L'eliminazione di una tabella dall'API di metadati di Dataplex non elimina i dati sottostanti in Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Aggiungere una partizione
Dataplex non consente di modificare una partizione una volta creata. Tuttavia, la partizione può essere eliminata.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
Puoi aggiungere più partizioni della stessa chiave di partizione e di diversi valori di partizione, come mostrato nell'esempio precedente.
Rilascia una partizione
Per rilasciare una partizione, esegui questo comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Esegui query su tabelle Iceberg
Puoi eseguire query sulle tabelle Iceberg utilizzando Apache Spark.
Prima di iniziare
Configura una sessione Spark SQL con Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Creare una tabella Iceberg
Per creare una tabella Iceberg, esegui il seguente comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Esplora istantanea e storia dell'Iceberg
Puoi ottenere snapshot e cronologia delle tabelle Iceberg utilizzando Apache Spark.
Prima di iniziare
Configurare una sessione PySpark con l'assistenza Iceberg.
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Visualizzare la cronologia delle tabelle Iceberg
Per ottenere la cronologia di una tabella Iceberg, esegui il seguente comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Ottenere snapshot delle tabelle Iceberg
Per ottenere uno snapshot di una tabella Iceberg, esegui il seguente comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Formati file e tipi di dati supportati
I tipi di dati supportati sono definiti come segue:
Tipo di dati | Valori |
---|---|
originario |
|
Array | ARRAY < DATA_TYPE > |
Strutturazione | STRUCT < COLUMN : DATA_TYPE > |
I formati file supportati sono definiti come segue:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Per ulteriori informazioni sui formati file, vedi Formati di archiviazione.
I formati di riga supportati sono definiti come segue:
- DELIMITATI [CAMPI TERMINATI DA CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
Passaggi successivi
- Scopri di più sulla gestione dei metadati.