Mengakses metadata di Apache Spark

Halaman ini menjelaskan cara membuat cluster Dataproc yang menjalankan Spark.

Ringkasan

Anda membuat cluster setelah instance layanan Dataproc Metastore dikaitkan dengan data lake Dataplex untuk memastikan bahwa cluster dapat mengandalkan endpoint Hive Metastore untuk mendapatkan akses ke metadata Dataplex.

Metadata yang dikelola dalam Dataplex dapat diakses menggunakan antarmuka standar, seperti Hive Metastore, untuk mendukung kueri Spark. Kueri berjalan di cluster Dataproc.

Untuk data Parquet, tetapkan properti Spark spark.sql.hive.convertMetastoreParquet ke false untuk menghindari error eksekusi. Detail selengkapnya.

Membuat cluster Dataproc

Jalankan perintah berikut untuk membuat cluster Dataproc, yang menentukan layanan Metastore Dataproc yang terkait dengan data lake Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Menjelajahi metadata

Jalankan kueri DQL untuk menjelajahi metadata dan menjalankan kueri Spark untuk mengkueri data.

Sebelum memulai

  1. Buka sesi SSH di node utama cluster Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Di command prompt node utama, buka REPL Python baru.

    python3
    

Mencantumkan database

Setiap zona Dataplex dalam data lake dipetakan ke database metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Membuat daftar tabel

Cantumkan tabel di salah satu zona.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Data kueri

Buat kueri data di salah satu tabel.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Membuat tabel dan partisi dalam metadata

Jalankan kueri DDL untuk membuat tabel dan partisi dalam metadata Dataplex menggunakan Apache Spark.

Untuk mengetahui informasi selengkapnya tentang jenis data, format file, dan format baris yang didukung, lihat Nilai yang didukung.

Sebelum memulai

Sebelum membuat tabel, buat aset Dataplex yang dipetakan ke bucket Cloud Storage yang berisi data pokok. Untuk informasi selengkapnya, lihat Menambahkan aset.

Membuat tabel

Tabel Parquet, ORC, AVRO, CSV, dan JSON didukung.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Mengubah tabel

Dataplex tidak mengizinkan Anda mengubah lokasi tabel atau mengedit kolom partisi untuk tabel. Mengubah tabel tidak otomatis menetapkan userManaged ke true.

Di Spark SQL, Anda dapat mengganti nama tabel, menambahkan kolom, dan menetapkan format file tabel.

Mengganti nama tabel

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Tambah kolom

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Menetapkan format file

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Menghapus tabel

Menghapus tabel dari API metadata Dataplex tidak akan menghapus data yang mendasarinya di Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Menambahkan partisi

Dataplex tidak mengizinkan perubahan partisi setelah dibuat. Namun, partisi dapat dihapus.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Anda dapat menambahkan beberapa partisi dengan kunci partisi yang sama dan nilai partisi yang berbeda seperti yang ditunjukkan pada contoh sebelumnya.

Menghapus partisi

Untuk menghapus partisi, jalankan perintah berikut:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Membuat kueri tabel Iceberg

Anda dapat membuat kueri tabel Iceberg menggunakan Apache Spark.

Sebelum memulai

Siapkan sesi Spark SQL dengan Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Membuat tabel Iceberg

Untuk membuat tabel Iceberg, jalankan perintah berikut:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Menjelajahi snapshot dan histori Iceberg

Anda bisa mendapatkan snapshot dan histori tabel Iceberg menggunakan Apache Spark.

Sebelum memulai

Siapkan sesi PySpark dengan dukungan Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Mendapatkan histori tabel Iceberg

Untuk mendapatkan histori tabel Iceberg, jalankan perintah berikut:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Mendapatkan snapshot tabel Iceberg

Untuk mendapatkan snapshot tabel Iceberg, jalankan perintah berikut:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Jenis data dan format file yang didukung

Jenis data yang didukung ditentukan sebagai berikut:

Jenis data Nilai
Primitif
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array ARRAY < DATA_TYPE >
Struktur STRUCT < COLUMN : DATA_TYPE >

Berikut adalah format file yang didukung:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Untuk informasi selengkapnya tentang format file, lihat Format penyimpanan.

Berikut adalah format baris yang didukung:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Langkah selanjutnya