Halaman ini menjelaskan cara membuat cluster Dataproc yang menjalankan Spark.
Ringkasan
Anda membuat cluster setelah instance layanan Dataproc Metastore dikaitkan dengan data lake Dataplex untuk memastikan bahwa cluster dapat mengandalkan endpoint Hive Metastore untuk mendapatkan akses ke metadata Dataplex.
Metadata yang dikelola dalam Dataplex dapat diakses menggunakan antarmuka standar, seperti Hive Metastore, untuk mendukung kueri Spark. Kueri berjalan di cluster Dataproc.
Untuk data Parquet, tetapkan properti Spark spark.sql.hive.convertMetastoreParquet
ke
false
untuk menghindari error eksekusi. Detail selengkapnya.
Membuat cluster Dataproc
Jalankan perintah berikut untuk membuat cluster Dataproc, yang menentukan layanan Metastore Dataproc yang terkait dengan data lake Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Menjelajahi metadata
Jalankan kueri DQL untuk menjelajahi metadata dan menjalankan kueri Spark untuk mengkueri data.
Sebelum memulai
Buka sesi SSH di node utama cluster Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
Di command prompt node utama, buka REPL Python baru.
python3
Mencantumkan database
Setiap zona Dataplex dalam data lake dipetakan ke database metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Membuat daftar tabel
Cantumkan tabel di salah satu zona.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Data kueri
Buat kueri data di salah satu tabel.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Membuat tabel dan partisi dalam metadata
Jalankan kueri DDL untuk membuat tabel dan partisi dalam metadata Dataplex menggunakan Apache Spark.
Untuk mengetahui informasi selengkapnya tentang jenis data, format file, dan format baris yang didukung, lihat Nilai yang didukung.
Sebelum memulai
Sebelum membuat tabel, buat aset Dataplex yang dipetakan ke bucket Cloud Storage yang berisi data pokok. Untuk informasi selengkapnya, lihat Menambahkan aset.
Membuat tabel
Tabel Parquet, ORC, AVRO, CSV, dan JSON didukung.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Mengubah tabel
Dataplex tidak mengizinkan Anda mengubah lokasi tabel atau mengedit
kolom partisi untuk tabel. Mengubah tabel tidak otomatis menetapkan
userManaged
ke true
.
Di Spark SQL, Anda dapat mengganti nama tabel, menambahkan kolom, dan menetapkan format file tabel.
Mengganti nama tabel
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Tambah kolom
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Menetapkan format file
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Menghapus tabel
Menghapus tabel dari API metadata Dataplex tidak akan menghapus data yang mendasarinya di Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Menambahkan partisi
Dataplex tidak mengizinkan perubahan partisi setelah dibuat. Namun, partisi dapat dihapus.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
Anda dapat menambahkan beberapa partisi dengan kunci partisi yang sama dan nilai partisi yang berbeda seperti yang ditunjukkan pada contoh sebelumnya.
Menghapus partisi
Untuk menghapus partisi, jalankan perintah berikut:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Membuat kueri tabel Iceberg
Anda dapat membuat kueri tabel Iceberg menggunakan Apache Spark.
Sebelum memulai
Siapkan sesi Spark SQL dengan Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Membuat tabel Iceberg
Untuk membuat tabel Iceberg, jalankan perintah berikut:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Menjelajahi snapshot dan histori Iceberg
Anda bisa mendapatkan snapshot dan histori tabel Iceberg menggunakan Apache Spark.
Sebelum memulai
Siapkan sesi PySpark dengan dukungan Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Mendapatkan histori tabel Iceberg
Untuk mendapatkan histori tabel Iceberg, jalankan perintah berikut:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Mendapatkan snapshot tabel Iceberg
Untuk mendapatkan snapshot tabel Iceberg, jalankan perintah berikut:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Jenis data dan format file yang didukung
Jenis data yang didukung ditentukan sebagai berikut:
Jenis data | Nilai |
---|---|
Primitif |
|
Array | ARRAY < DATA_TYPE > |
Struktur | STRUCT < COLUMN : DATA_TYPE > |
Berikut adalah format file yang didukung:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Untuk informasi selengkapnya tentang format file, lihat Format penyimpanan.
Berikut adalah format baris yang didukung:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
Langkah selanjutnya
- Pelajari lebih lanjut cara mengelola metadata.