Menggunakan tabel Apache Iceberg dengan Dataproc Metastore

Halaman ini menjelaskan cara menggunakan tabel Apache Iceberg dengan layanan Dataproc Metastore yang terhubung ke cluster Dataproc. Apache Iceberg adalah format tabel terbuka untuk set data analisis berukuran besar.

Kompatibilitas

Tabel gunung es mendukung fitur berikut.

Pendorong Pilih Sisipkan Buat Tabel
Spark
Hive
Presto

Sebelum memulai

Menggunakan tabel Iceberg dengan Spark

Contoh berikut menunjukkan bahwa Anda harus menggunakan tabel Iceberg dengan Spark.

Tabel gunung es mendukung operasi baca dan tulis. Untuk informasi selengkapnya, lihat Apache Iceberg - Spark.

Konfigurasi Spark

Pertama, mulai shell Spark dan gunakan bucket Cloud Storage untuk menyimpan data. Untuk menyertakan Iceberg dalam penginstalan Spark, tambahkan file JAR Iceberg Spark Runtime ke folder JARs Spark. Untuk mendownload file JAR, lihat Download Apache Iceberg. Perintah berikut memulai Spark shell dengan dukungan untuk Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Menggunakan Katalog Hive untuk membuat tabel Iceberg

  1. Siapkan konfigurasi Hive Catalog untuk membuat tabel Iceberg di spark scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.

    1. Buat tabel bernama example pada database default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Sisipkan data sampel:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Tentukan strategi partisi berdasarkan kolom id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Buat tabel:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Tambahkan Iceberg Storage Handler dan SerDe sebagai properti tabel:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Tulis data ke tabel:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Baca datanya:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ubah skema tabel. Berikut adalah contohnya.

    1. Dapatkan tabel dan tambahkan kolom baru grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Periksa skema tabel baru:

      table.schema.toString;
      
  4. Menyisipkan lebih banyak data dan melihat evolusi skema. Berikut adalah contohnya.

    1. Tambahkan data baru ke tabel:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Periksa data baru yang disisipkan:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Lihat histori tabel:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Lihat snapshot:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Lihat file manifes:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Lihat file data:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Asumsikan Anda membuat kesalahan dengan menambahkan baris dengan nilai id=6 dan ingin kembali untuk melihat versi tabel yang benar:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ganti snapshot-id dengan versi yang ingin Anda gunakan kembali.

Menggunakan Tabel Hadoop untuk membuat tabel Iceberg

  1. Siapkan konfigurasi Hadoop Table untuk membuat tabel Iceberg di spark scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.

    1. Buat tabel bernama example pada database default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Sisipkan data sampel:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Tentukan strategi partisi berdasarkan kolom id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Buat tabel:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Tulis data ke tabel:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Baca datanya:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ubah skema tabel. Berikut adalah contohnya.

    1. Dapatkan tabel dan tambahkan kolom baru grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Periksa skema tabel baru:

      table.schema.toString;
      
  4. Menyisipkan lebih banyak data dan melihat evolusi skema. Berikut adalah contohnya.

    1. Tambahkan data baru ke tabel:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Periksa data baru yang disisipkan:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Lihat histori tabel:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Lihat snapshot:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Lihat file manifes:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Lihat file data:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kembali untuk melihat versi tabel tertentu:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ganti snapshot-id dengan versi yang ingin Anda gunakan kembali dan tambahkan "L" ke bagian akhir. Misalnya, "3943776515926014142L".

Menggunakan tabel Iceberg di Hive

Iceberg mendukung pembacaan tabel menggunakan Hive menggunakan StorageHandler. Perhatikan bahwa hanya versi Hive 2.x dan 3.1.2 yang didukung. Untuk informasi selengkapnya, lihat Apache Iceberg - Hive. Selain itu, tambahkan file Iceberg Hive Runtime JAR ke classpath Hive. Untuk mendownload file JAR, lihat Download Apache Iceberg.

Untuk menempatkan tabel Hive di atas tabel Iceberg, Anda harus membuat tabel Iceberg menggunakan Katalog Hive atau Tabel Hadoop. Selain itu, Anda harus mengonfigurasi Hive agar dapat membaca data dari tabel Iceberg.

Baca tabel Iceberg (Katalog Hive) di Hive

  1. Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg pada sesi klien Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Membaca data dan skema tabel. Berikut adalah contohnya.

    1. Periksa skema tabel dan apakah format tabelnya adalah Iceberg:

      describe formatted example;
      
    2. Baca data dari tabel:

      select * from example;
      

Baca tabel Iceberg (Tabel Hadoop) di Hive

  1. Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg pada sesi klien Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Membaca data dan skema tabel. Berikut adalah contohnya.

    1. Buat tabel eksternal (hamparkan tabel Hive di atas tabel Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Periksa skema tabel dan apakah format tabelnya adalah Iceberg:

      describe formatted hadoop_table;
      
    3. Baca data dari tabel:

      select * from hadoop_table;
      

Menggunakan tabel Iceberg di Presto

Kueri Presto menggunakan konektor Hive untuk mendapatkan lokasi partisi, sehingga Anda harus mengonfigurasi Presto sebagaimana mestinya untuk membaca dan menulis data pada tabel Iceberg. Untuk mengetahui informasi selengkapnya, lihat Presto/Trino - Konektor Hive dan Presto/Trino - Konektor Iceberg.

Konfigurasi Presto

  1. Di setiap node cluster Dataproc, buat file bernama iceberg.properties /etc/presto/conf/catalog/iceberg.properties dan konfigurasikan hive.metastore.uri sebagai berikut:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ganti example.net:9083 dengan host dan port yang benar untuk layanan metastore Thrift Hive.

  2. Mulai ulang layanan Presto untuk mengirim konfigurasi:

    sudo systemctl restart presto.service
    

Buat tabel Iceberg di Presto

  1. Buka klien Presto dan gunakan konektor "Iceberg" untuk mendapatkan metastore:

    --catalog iceberg --schema default
    
  2. Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.

    1. Buat tabel bernama example pada database default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Sisipkan data sampel:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Membaca data dari tabel:

      SELECT * FROM iceberg.default.example;
      
    4. Sisipkan lebih banyak data baru untuk memeriksa snapshot:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Lihat snapshot:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Dengan menambahkan perintah ORDER BY committed_at DESC LIMIT 1;, Anda dapat menemukan ID snapshot terbaru.

    6. Melakukan roll back ke versi tabel tertentu:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ganti snapshot-id dengan versi yang ingin Anda gunakan kembali.

Langkah selanjutnya