This page describes how to create a Dataproc cluster running Spark.
Overview
You create a cluster after the Dataproc Metastore service instance is associated with the Dataplex lake to ensure that the cluster can rely on the Hive Metastore endpoint to gain access to Dataplex metadata.
Metadata managed within Dataplex can be accessed using standard interfaces, such as Hive Metastore, to power Spark queries. The queries run on the Dataproc cluster.
For Parquet data, set Spark property spark.sql.hive.convertMetastoreParquet
to
false
to avoid execution errors. More details.
Create a Dataproc cluster
Run the following commands to create a Dataproc cluster, specifying the Dataproc Metastore service associated with the Dataplex lake:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explore metadata
Run DQL queries to explore the metadata and run Spark queries to query data.
Before you begin
Open an SSH session on the Dataproc cluster's primary node.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
At the primary node command prompt, open a new Python REPL.
python3
List databases
Each Dataplex zone within the lake maps to a metastore database.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
List tables
List tables in one of the zones.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Query data
Query the data in one of the tables.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Create tables and partitions in metadata
Run DDL queries to create tables and partitions in Dataplex metadata using Apache Spark.
For more information about the supported data types, file formats, and row formats, see Supported values.
Before you begin
Before you create a table, create a Dataplex asset that maps to the Cloud Storage bucket containing the underlying data. For more information, see Add an asset.
Create a table
Parquet, ORC, AVRO, CSV, and JSON tables are supported.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Alter a table
Dataplex does not allow you to alter the location of a table or edit the
partition columns for a table. Altering a table does not automatically set
userManaged
to true
.
In Spark SQL, you can rename a table, add columns, and set the file format of a table.
Rename a table
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Add columns
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Set the file format
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Drop a table
Dropping a table from Dataplex's metadata API doesn't delete the underlying data in Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Add a partition
Dataplex does not allow altering a partition once created. However, the partition can be dropped.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
You can add multiple partitions of the same partition key and different partition values as shown in the preceding example.
Drop a partition
To drop a partition, run the following command:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Query Iceberg tables
You can query Iceberg tables using Apache Spark.
Before you begin
Set up a Spark SQL session with Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Create an Iceberg table
To create an Iceberg table, run the following command:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Explore Iceberg snapshot and history
You can get snapshots and history of Iceberg tables using Apache Spark.
Before you begin
Set up a PySpark session with Iceberg support:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Get history of Iceberg tables
To get the history of an Iceberg table, run the following command:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Get snapshots of Iceberg tables
To get a snapshot of an Iceberg table, run the following command:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Supported data types and file formats
The supported data types are defined as follows:
Data type | Values |
---|---|
Primitive |
|
Array | ARRAY < DATA_TYPE > |
Structure | STRUCT < COLUMN : DATA_TYPE > |
The following are the supported file formats:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
For more information about the file formats, see Storage formats.
The following are the supported row formats:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
What's next
- Learn more about managing metadata.