Mengakses metadata di Apache Spark

Halaman ini menjelaskan cara membuat cluster Dataproc yang menjalankan Spark.

Ringkasan

Buat cluster setelah instance layanan Dataproc Metastore dikaitkan dengan lake Dataplex untuk memastikan bahwa cluster tersebut dapat mengandalkan endpoint Hive Metastore untuk mendapatkan akses ke metadata Dataplex.

Metadata yang dikelola dalam Dataplex dapat diakses menggunakan antarmuka standar, seperti Hive Metastore, untuk mendukung kueri Spark. Kueri berjalan di cluster Dataproc.

Untuk data Parquet, tetapkan properti Spark spark.sql.hive.convertMetastoreParquet ke false untuk menghindari error eksekusi. Detail selengkapnya.

Membuat cluster Dataproc

Jalankan perintah berikut untuk membuat cluster Dataproc, dengan menentukan layanan Metastore Dataproc yang terkait dengan lake Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Jelajahi metadata

Jalankan kueri DQL untuk mempelajari metadata dan menjalankan kueri Spark untuk mengkueri data.

Sebelum memulai

  1. Buka sesi SSH pada node utama cluster Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Di command prompt node utama, buka Python REPL baru.

    python3
    

Mencantumkan database

Setiap zona Dataplex di dalam danau dipetakan ke database metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Membuat daftar tabel

Buat daftar tabel di salah satu zona.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Data kueri

Buat kueri data di salah satu tabel.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Membuat tabel dan partisi dalam metadata

Jalankan kueri DDL untuk membuat tabel dan partisi dalam metadata Dataplex menggunakan Apache Spark.

Untuk mengetahui informasi selengkapnya tentang jenis data, format file, dan format baris yang didukung, lihat Nilai yang didukung.

Sebelum memulai

Sebelum membuat tabel, buat aset Dataplex yang memetakan ke bucket Cloud Storage yang berisi data pokok. Untuk informasi selengkapnya, lihat Menambahkan aset.

Membuat tabel

Tabel Parquet, ORC, AVRO, CSV, dan JSON didukung.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Mengubah tabel

Dataplex tidak memungkinkan Anda mengubah lokasi tabel atau mengedit kolom partisi untuk tabel. Mengubah tabel tidak otomatis menetapkan userManaged ke true.

Di Spark SQL, Anda dapat mengganti nama tabel, menambahkan kolom, dan menetapkan format file tabel.

Mengganti nama tabel

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Tambah kolom

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Menetapkan format file

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Menempatkan tabel

Melepaskan tabel dari API metadata Dataplex tidak akan menghapus data pokok di Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Menambahkan partisi

Dataplex tidak mengizinkan perubahan partisi setelah dibuat. Namun, partisi dapat dihapus.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Anda dapat menambahkan beberapa partisi dari kunci partisi yang sama dan nilai partisi yang berbeda seperti yang ditunjukkan dalam contoh sebelumnya.

Melepaskan partisi

Untuk melepaskan partisi, jalankan perintah berikut:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Membuat kueri tabel Iceberg

Anda dapat membuat kueri tabel Iceberg menggunakan Apache Spark.

Sebelum memulai

Menyiapkan sesi Spark SQL dengan Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Membuat tabel Iceberg

Untuk membuat tabel Iceberg, jalankan perintah berikut:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Jelajahi ringkasan dan sejarah Gunung es

Anda bisa mendapatkan snapshot dan histori tabel Iceberg menggunakan Apache Spark.

Sebelum memulai

Siapkan sesi PySpark dengan dukungan Iceberg.

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Dapatkan histori tabel Iceberg

Untuk mendapatkan histori tabel Iceberg, jalankan perintah berikut:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Dapatkan snapshot tabel Iceberg

Untuk mendapatkan snapshot tabel Iceberg, jalankan perintah berikut:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Jenis data dan format file yang didukung

Jenis data yang didukung ditentukan sebagai berikut:

Jenis data Nilai
Primitif
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array ARRAY < DATA_TYPE >
Struktur STRUCT < COLUMN : DATA_TYPE >

Format file yang didukung didefinisikan sebagai berikut:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Untuk mengetahui informasi selengkapnya tentang format file, lihat Format penyimpanan.

Format baris yang didukung didefinisikan sebagai berikut:

  • DIBATASI [KOLOM DIHENTIKAN OLEH CHAR]
  • SERDE SERDE_NAME [DENGAN SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Apa langkah selanjutnya?