Auf dieser Seite wird beschrieben, wie Sie einen Dataproc-Cluster erstellen, auf dem Spark ausgeführt wird.
Überblick
Sie erstellen einen Cluster, nachdem die Dataproc Metastore-Dienstinstanz mit dem Dataplex-See verknüpft ist, damit der Cluster auf den Hive-Metastore-Endpunkt zugreifen kann, um Zugriff auf Dataplex-Metadaten zu erhalten.
Auf Metadaten, die in Dataplex verwaltet werden, kann über Standardschnittstellen, z. B. Hive Metastore, zugegriffen werden, um Spark-Abfragen zu unterstützen. Die Abfragen werden auf dem Dataproc-Cluster ausgeführt.
Setzen Sie für Parquet-Daten das Spark-Attribut spark.sql.hive.convertMetastoreParquet
auf false
, um Ausführungsfehler zu vermeiden. Weitere Informationen
Dataproc-Cluster erstellen
Führen Sie die folgenden Befehle aus, um einen Dataproc-Cluster zu erstellen und dabei den Dataproc Metastore-Dienst anzugeben, der mit dem Dataplex-Lake verknüpft ist:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Metadaten untersuchen
Führen Sie DQL-Abfragen aus, um die Metadaten zu untersuchen, und führen Sie Spark-Abfragen aus, um Daten abzufragen.
Hinweis
Öffnen Sie eine SSH-Sitzung auf dem Masterknoten des Dataproc-Clusters.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
Öffnen Sie an der Eingabeaufforderung des Master-Knotens eine neue Python-REPL.
python3
Datenbanken auflisten
Jede Dataplex-Zone im Lake wird einer Metastore-Datenbank zugeordnet.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Tabellen auflisten
Tabellen in einer der Zonen auflisten.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Daten abfragen
Fragen Sie die Daten in einer der Tabellen ab.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Tabellen und Partitionen in Metadaten erstellen
Mit DDL-Abfragen können Sie mit Apache Spark Tabellen und Partitionen in Dataplex-Metadaten erstellen.
Weitere Informationen zu den unterstützten Datentypen, Dateiformaten und Zeilenformaten finden Sie unter Unterstützte Werte.
Hinweis
Bevor Sie eine Tabelle erstellen, erstellen Sie ein Dataplex-Asset, das dem Cloud Storage-Bucket mit den zugrunde liegenden Daten zugeordnet wird. Weitere Informationen finden Sie unter Bucket hinzufügen.
Tabelle erstellen
Es werden die Tabellen Parquet, ORC, AVRO, CSV und JSON unterstützt.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Tabellen ändern
Mit Dataplex können Sie den Speicherort einer Tabelle nicht ändern und die Partitionsspalten für eine Tabelle nicht bearbeiten. Beim Ändern einer Tabelle wird userManaged nicht automatisch auf true
gesetzt.
In Spark SQL können Sie eine Tabelle umbenennen, Spalten hinzufügen und das Dateiformat einer Tabelle festlegen.
Tabelle umbenennen
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Spalten hinzufügen
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Dateiformat festlegen
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Tabellen löschen
Durch Löschen einer Tabelle aus der Metadaten-API von Dataplex werden die zugrunde liegenden Daten in Cloud Storage nicht gelöscht.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Partition hinzufügen
In Dataplex ist es nach dem Erstellen nicht mehr möglich, eine Partition zu ändern. Die Partition kann jedoch gelöscht werden.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
Sie können mehrere Partitionen mit demselben Partitionsschlüssel und verschiedenen Partitionswerten hinzufügen, wie im vorherigen Beispiel gezeigt.
Partition löschen
Führen Sie den folgenden Befehl aus, um eine Partition zu löschen:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Eisbergtabellen abfragen
Sie können Iceberg-Tabellen mit Apache Spark abfragen.
Hinweis
eine Spark SQL-Sitzung mit Iceberg einrichten.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Eisbergtabelle erstellen
Führen Sie den folgenden Befehl aus, um eine Iceberg-Tabelle zu erstellen:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Eisberg-Snapshot und -Verlauf ansehen
Mit Apache Spark können Sie Snapshots und den Verlauf von Iceberg-Tabellen abrufen.
Hinweis
eine PySpark-Sitzung mit der Unterstützung von Iceberg einrichten
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Geschichte von Iceberg-Tabellen abrufen
Führen Sie den folgenden Befehl aus, um die Geschichte einer Iceberg-Tabelle abzurufen:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Snapshots von Iceberg-Tabellen abrufen
Führen Sie den folgenden Befehl aus, um einen Snapshot einer Iceberg-Tabelle abzurufen:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Unterstützte Datentypen und Dateiformate
Die unterstützten Datentypen sind so definiert:
Datentyp | Werte |
---|---|
Primitiv |
|
Array | ARRAY < DATA_TYPE > |
Struktur | STRUCT < COLUMN : DATA_TYPE > |
Die unterstützten Dateiformate sind so definiert:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Weitere Informationen zu den Dateiformaten finden Sie unter Speicherformate.
Die unterstützten Zeilenformate sind so definiert:
- DELIMITED [FELDER VON CHAR KÜNDIGT]
- SERDE SERDE_NAME [MIT SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]