Menggunakan tabel Apache Iceberg dengan Dataproc Metastore

Halaman ini menjelaskan cara menggunakan tabel Apache Iceberg dengan layanan Dataproc Metastore yang dilampirkan ke cluster Dataproc. Apache Iceberg adalah format tabel terbuka untuk set data analisis yang besar.

Kompatibilitas

Tabel Iceberg mendukung fitur berikut.

Pendorong Pilih Sisipkan Buat Tabel
Spark
Hive
Presto

Sebelum memulai

Menggunakan tabel Iceberg dengan Spark

Contoh berikut menunjukkan bahwa Anda harus menggunakan tabel Iceberg dengan Spark.

Tabel Iceberg mendukung operasi baca dan tulis. Untuk informasi selengkapnya, lihat Apache Iceberg - Spark.

Konfigurasi Spark

Pertama, mulai shell Spark dan gunakan bucket Cloud Storage untuk menyimpan data. Untuk menyertakan Iceberg dalam penginstalan Spark, tambahkan file JAR Runtime Spark Iceberg ke folder JAR Spark. Untuk mendownload file JAR, lihat Download Apache Iceberg. Perintah berikut memulai shell Spark dengan dukungan untuk Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Menggunakan Hive Catalog untuk membuat tabel Iceberg

  1. Siapkan konfigurasi Katalog Hive untuk membuat tabel Iceberg di spark scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.

    1. Buat tabel bernama example di database default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Masukkan data sampel:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Tentukan strategi partisi berdasarkan kolom id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Buat tabel:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Tambahkan Pengendali Penyimpanan dan SerDe Iceberg sebagai properti tabel:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Tulis data ke tabel:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Baca data:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Mengubah skema tabel. Berikut adalah contohnya.

    1. Dapatkan tabel dan tambahkan kolom baru grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Periksa skema tabel baru:

      table.schema.toString;
      
  4. Sisipkan lebih banyak data dan lihat evolusi skema. Berikut adalah contohnya.

    1. Tambahkan data baru ke tabel:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Periksa data baru yang dimasukkan:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Lihat histori tabel:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Lihat ringkasan:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Lihat file manifes:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Lihat file data:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Anggap Anda melakukan kesalahan dengan menambahkan baris dengan nilai id=6 dan ingin kembali untuk melihat versi tabel yang benar:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ganti snapshot-id dengan versi yang ingin Anda kembalikan.

Menggunakan Tabel Hadoop untuk membuat tabel Iceberg

  1. Siapkan konfigurasi Tabel Hadoop untuk membuat tabel Iceberg di spark scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.

    1. Buat tabel bernama example di database default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Masukkan data sampel:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Tentukan strategi partisi berdasarkan kolom id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Buat tabel:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Tulis data ke tabel:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Baca data:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Mengubah skema tabel. Berikut adalah contohnya.

    1. Dapatkan tabel dan tambahkan kolom baru grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Periksa skema tabel baru:

      table.schema.toString;
      
  4. Sisipkan lebih banyak data dan lihat evolusi skema. Berikut adalah contohnya.

    1. Tambahkan data baru ke tabel:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Periksa data baru yang dimasukkan:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Lihat histori tabel:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Lihat ringkasan:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Lihat file manifes:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Lihat file data:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kembali untuk melihat versi tabel tertentu:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ganti snapshot-id dengan versi yang ingin Anda kembalikan dan tambahkan "L" ke bagian akhir. Misalnya, "3943776515926014142L".

Menggunakan tabel Iceberg di Hive

Iceberg mendukung tabel yang dibaca menggunakan Hive dengan menggunakan StorageHandler. Perhatikan bahwa hanya versi Hive 2.x dan 3.1.2 yang didukung. Untuk informasi selengkapnya, lihat Apache Iceberg - Hive. Selain itu, tambahkan file JAR Runtime Iceberg Hive ke classpath Hive. Untuk mendownload file JAR, lihat Download Apache Iceberg.

Untuk menempatkan tabel Hive di atas tabel Iceberg, Anda harus membuat tabel Iceberg menggunakan Katalog Hive atau Tabel Hadoop. Selain itu, Anda harus mengonfigurasi Hive dengan benar untuk membaca data dari tabel Iceberg.

Membaca tabel Iceberg (Katalog Hive) di Hive

  1. Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg di sesi klien Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Membaca skema dan data tabel. Berikut adalah contohnya.

    1. Periksa skema tabel dan apakah format tabelnya adalah Iceberg:

      describe formatted example;
      
    2. Baca data dari tabel:

      select * from example;
      

Membaca tabel Iceberg (Tabel Hadoop) di Hive

  1. Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg di sesi klien Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Membaca skema dan data tabel. Berikut adalah contohnya.

    1. Buat tabel eksternal (tumpang-tindih tabel Hive di atas tabel Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Periksa skema tabel dan apakah format tabelnya adalah Iceberg:

      describe formatted hadoop_table;
      
    3. Baca data dari tabel:

      select * from hadoop_table;
      

Menggunakan tabel Iceberg di Presto

Kueri Presto menggunakan konektor Hive untuk mendapatkan lokasi partisi, sehingga Anda harus mengonfigurasi Presto dengan benar untuk membaca dan menulis data di tabel Iceberg. Untuk mengetahui informasi selengkapnya, lihat Presto/Trino - Konektor Hive dan Presto/Trino - Konektor Iceberg.

Konfigurasi Presto

  1. Di setiap node cluster Dataproc, buat file bernama iceberg.properties /etc/presto/conf/catalog/iceberg.properties dan konfigurasi hive.metastore.uri sebagai berikut:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ganti example.net:9083 dengan host dan port yang benar untuk layanan Thrift metastore Hive Anda.

  2. Mulai ulang layanan Presto untuk mendorong konfigurasi:

    sudo systemctl restart presto.service
    

Membuat tabel Iceberg di Presto

  1. Buka klien Presto dan gunakan konektor "Iceberg" untuk mendapatkan metastore:

    --catalog iceberg --schema default
    
  2. Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.

    1. Buat tabel bernama example di database default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Masukkan data sampel:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Baca data dari tabel:

      SELECT * FROM iceberg.default.example;
      
    4. Masukkan lebih banyak data baru untuk memeriksa snapshot:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Lihat ringkasan:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Dengan menambahkan perintah ORDER BY committed_at DESC LIMIT 1;, Anda dapat menemukan ID snapshot terbaru.

    6. Melakukan roll back ke versi tabel tertentu:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ganti snapshot-id dengan versi yang ingin Anda kembalikan.

Langkah selanjutnya