Halaman ini menjelaskan cara membuat cluster Dataproc yang menjalankan Spark.
Ringkasan
Buat cluster setelah instance layanan Dataproc Metastore dikaitkan dengan lake Dataplex untuk memastikan bahwa cluster tersebut dapat mengandalkan endpoint Hive Metastore untuk mendapatkan akses ke metadata Dataplex.
Metadata yang dikelola dalam Dataplex dapat diakses menggunakan antarmuka standar, seperti Hive Metastore, untuk mendukung kueri Spark. Kueri berjalan di cluster Dataproc.
Untuk data Parquet, tetapkan properti Spark spark.sql.hive.convertMetastoreParquet
ke false
untuk menghindari error eksekusi. Detail selengkapnya.
Membuat cluster Dataproc
Jalankan perintah berikut untuk membuat cluster Dataproc, dengan menentukan layanan Metastore Dataproc yang terkait dengan lake Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Jelajahi metadata
Jalankan kueri DQL untuk mempelajari metadata dan menjalankan kueri Spark untuk mengkueri data.
Sebelum memulai
Buka sesi SSH pada node utama cluster Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
Di command prompt node utama, buka Python REPL baru.
python3
Mencantumkan database
Setiap zona Dataplex di dalam danau dipetakan ke database metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Membuat daftar tabel
Buat daftar tabel di salah satu zona.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Data kueri
Buat kueri data di salah satu tabel.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Membuat tabel dan partisi dalam metadata
Jalankan kueri DDL untuk membuat tabel dan partisi dalam metadata Dataplex menggunakan Apache Spark.
Untuk mengetahui informasi selengkapnya tentang jenis data, format file, dan format baris yang didukung, lihat Nilai yang didukung.
Sebelum memulai
Sebelum membuat tabel, buat aset Dataplex yang memetakan ke bucket Cloud Storage yang berisi data pokok. Untuk informasi selengkapnya, lihat Menambahkan aset.
Membuat tabel
Tabel Parquet, ORC, AVRO, CSV, dan JSON didukung.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Mengubah tabel
Dataplex tidak memungkinkan Anda mengubah lokasi tabel atau mengedit
kolom partisi untuk tabel. Mengubah tabel tidak otomatis menetapkan userManaged ke true
.
Di Spark SQL, Anda dapat mengganti nama tabel, menambahkan kolom, dan menetapkan format file tabel.
Mengganti nama tabel
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Tambah kolom
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Menetapkan format file
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Menempatkan tabel
Melepaskan tabel dari API metadata Dataplex tidak akan menghapus data pokok di Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Menambahkan partisi
Dataplex tidak mengizinkan perubahan partisi setelah dibuat. Namun, partisi dapat dihapus.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
Anda dapat menambahkan beberapa partisi dari kunci partisi yang sama dan nilai partisi yang berbeda seperti yang ditunjukkan dalam contoh sebelumnya.
Melepaskan partisi
Untuk melepaskan partisi, jalankan perintah berikut:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Membuat kueri tabel Iceberg
Anda dapat membuat kueri tabel Iceberg menggunakan Apache Spark.
Sebelum memulai
Menyiapkan sesi Spark SQL dengan Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Membuat tabel Iceberg
Untuk membuat tabel Iceberg, jalankan perintah berikut:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Jelajahi ringkasan dan sejarah Gunung es
Anda bisa mendapatkan snapshot dan histori tabel Iceberg menggunakan Apache Spark.
Sebelum memulai
Siapkan sesi PySpark dengan dukungan Iceberg.
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Dapatkan histori tabel Iceberg
Untuk mendapatkan histori tabel Iceberg, jalankan perintah berikut:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Dapatkan snapshot tabel Iceberg
Untuk mendapatkan snapshot tabel Iceberg, jalankan perintah berikut:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Jenis data dan format file yang didukung
Jenis data yang didukung ditentukan sebagai berikut:
Jenis data | Nilai |
---|---|
Primitif |
|
Array | ARRAY < DATA_TYPE > |
Struktur | STRUCT < COLUMN : DATA_TYPE > |
Format file yang didukung didefinisikan sebagai berikut:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Untuk mengetahui informasi selengkapnya tentang format file, lihat Format penyimpanan.
Format baris yang didukung didefinisikan sebagai berikut:
- DIBATASI [KOLOM DIHENTIKAN OLEH CHAR]
- SERDE SERDE_NAME [DENGAN SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
Apa langkah selanjutnya?
- Pelajari lebih lanjut cara mengelola metadata.