Dataproc Metastore에서 Apache Iceberg 테이블 사용

이 페이지에서는 Dataproc 클러스터에 연결된 Dataproc Metastore 서비스와 함께 Apache Iceberg 테이블을 사용하는 방법을 설명합니다. Apache Iceberg는 대규모 분석 데이터 세트를 위한 열린 테이블 형식입니다.

호환성

Iceberg 테이블은 다음 기능을 지원합니다.

요인 선택 삽입 테이블 만들기
Spark
Hive
Presto

시작하기 전에

Spark로 Iceberg 테이블 사용하기

다음 예시에서는 Spark에서 Iceberg 테이블을 사용해야 함을 보여줍니다.

Iceberg 테이블은 읽기 및 쓰기 작업을 지원합니다. 자세한 내용은 Apache Iceberg - Spark를 참조하세요.

Spark 구성

먼저 Spark 셸을 시작하고 Cloud Storage 버킷을 사용하여 데이터를 저장합니다. Spark 설치에 Iceberg를 포함하려면 Iceberg Spark 런타임 JAR 파일을 Spark의 JAR 폴더에 추가합니다. JAR 파일을 다운로드하려면 Apache Iceberg 다운로드를 참조하세요. 다음 명령어는 Apache Iceberg를 지원하는 Spark 셸을 시작합니다.

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive 카탈로그를 사용하여 Iceberg 테이블 만들기

  1. Hive 카탈로그 구성을 설정하여 Spark Scala에서 Iceberg 테이블을 만듭니다.

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. 데이터를 삽입하고 업데이트할 테이블을 만듭니다. 다음은 그 예시입니다.

    1. default 데이터베이스 아래에 example이라는 테이블을 만듭니다.

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. 샘플 데이터를 삽입합니다.

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 열을 기준으로 분할 전략을 지정합니다.

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 테이블을 만듭니다.

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Iceberg 스토리지 핸들러와 SerDe를 테이블 속성으로 추가합니다.

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. 테이블에 데이터를 씁니다.

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. 데이터를 읽습니다.

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. 테이블 스키마를 변경합니다. 다음은 그 예시입니다.

    1. 테이블을 가져오고 새 열 grade를 추가합니다.

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 새 테이블 스키마를 확인합니다.

      table.schema.toString;
      
  4. 더 많은 데이터를 삽입하고 스키마 변경을 확인합니다. 다음은 그 예시입니다.

    1. 테이블에 새 데이터를 추가합니다.

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 삽입된 새 데이터를 확인합니다.

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. 테이블 기록을 봅니다.

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. 스냅샷을 확인합니다.

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. 매니페스트 파일을 봅니다.

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. 데이터 파일을 확인합니다.

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. 실수로 id=6 값을 포함하는 행을 추가하여 뒤로 돌아가서 올바른 테이블 버전을 보려고 하는 경우를 가정해 보겠습니다.

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id를 되돌리려는 버전으로 바꿉니다.

Hadoop 테이블을 사용하여 Iceberg 테이블 만들기

  1. Spark Scala에서 Iceberg 테이블을 만들도록 Hadoop 테이블 구성을 설정합니다.

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. 데이터를 삽입하고 업데이트할 테이블을 만듭니다. 다음은 그 예시입니다.

    1. default 데이터베이스 아래에 example이라는 테이블을 만듭니다.

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. 샘플 데이터를 삽입합니다.

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 열을 기준으로 분할 전략을 지정합니다.

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 테이블을 만듭니다.

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. 테이블에 데이터를 씁니다.

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. 데이터를 읽습니다.

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. 테이블 스키마를 변경합니다. 다음은 그 예시입니다.

    1. 테이블을 가져오고 새 열 grade를 추가합니다.

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 새 테이블 스키마를 확인합니다.

      table.schema.toString;
      
  4. 더 많은 데이터를 삽입하고 스키마 변경을 확인합니다. 다음은 그 예시입니다.

    1. 테이블에 새 데이터를 추가합니다.

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 삽입된 새 데이터를 확인합니다.

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. 테이블 기록을 봅니다.

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. 스냅샷을 확인합니다.

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. 매니페스트 파일을 봅니다.

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. 데이터 파일을 확인합니다.

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 특정 버전의 테이블을 보기 위해 돌아가려면 다음 안내를 따르세요.

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id를 되돌리려는 버전으로 바꾸고 "L"을 끝에 추가합니다. 예를 들면 "3943776515926014142L"입니다.

Hive에서 Iceberg 테이블 사용하기

Iceberg는 StorageHandler를 사용하여 Hive를 사용하는 테이블을 지원합니다. Hive 2.x 및 3.1.2 버전만 지원됩니다. 자세한 내용은 Apache Iceberg - Hive를 참조하세요. 또한 Iceberg Hive 런타임 JAR 파일을 Hive 클래스 경로에 추가합니다. JAR 파일을 다운로드하려면 Apache Iceberg 다운로드를 참조하세요.

Iceberg 테이블 위에 Hive 테이블을 오버레이하려면 Hive 카탈로그 또는 Hadoop 테이블을 사용하여 Iceberg 테이블을 만들어야 합니다. 또한 Iceberg 테이블에서 데이터를 읽으려면 Hive를 구성해야 합니다.

Hive에서 Iceberg 테이블(Hive 카탈로그) 읽기

  1. Hive 클라이언트를 열고 Hive 클라이언트 세션에서 Iceberg 테이블을 읽도록 구성을 설정합니다.

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. 테이블 스키마 및 데이터를 읽습니다. 다음은 그 예시입니다.

    1. 테이블 스키마를 확인하고 테이블 형식이 Iceberg인지 확인합니다.

      describe formatted example;
      
    2. 테이블에서 데이터를 읽습니다.

      select * from example;
      

Hive에서 Iceberg 테이블(Hadoop 테이블) 읽기

  1. Hive 클라이언트를 열고 Hive 클라이언트 세션에서 Iceberg 테이블을 읽도록 구성을 설정합니다.

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. 테이블 스키마 및 데이터를 읽습니다. 다음은 그 예시입니다.

    1. 외부 테이블을 만듭니다(Iceberg 테이블 위에 Hive 테이블 오버레이).

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. 테이블 스키마를 확인하고 테이블 형식이 Iceberg인지 확인합니다.

      describe formatted hadoop_table;
      
    3. 테이블에서 데이터를 읽습니다.

      select * from hadoop_table;
      

Presto에서 Iceberg 테이블 사용하기

Presto 쿼리는 Hive 커넥터를 사용하여 파티션 위치를 가져오므로 Presto가 Iceberg 테이블에서 데이터를 읽고 쓰도록 구성해야 합니다. 자세한 내용은 Presto/Trino - Hive 커넥터Presto/Trino - Iceberg 커넥터를 참조하세요.

Presto 구성

  1. 각 Dataproc 클러스터 노드에서 iceberg.properties /etc/presto/conf/catalog/iceberg.properties라는 파일을 만들고 hive.metastore.uri를 다음과 같이 구성합니다.

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083을 Hive Metastore Thrift 서비스의 올바른 호스트 및 포트로 바꿉니다.

  2. Presto 서비스를 다시 시작하여 구성을 푸시합니다.

    sudo systemctl restart presto.service
    

Presto에서 Iceberg 테이블 만들기

  1. Presto 클라이언트를 열고 'Iceberg' 커넥터를 사용하여 Metastore를 가져옵니다.

    --catalog iceberg --schema default
    
  2. 데이터를 삽입하고 업데이트할 테이블을 만듭니다. 다음은 그 예시입니다.

    1. default 데이터베이스 아래에 example이라는 테이블을 만듭니다.

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. 샘플 데이터를 삽입합니다.

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. 테이블에서 데이터를 읽습니다.

      SELECT * FROM iceberg.default.example;
      
    4. 새 데이터를 삽입하여 스냅샷을 확인합니다.

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. 스냅샷을 확인합니다.

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      ORDER BY committed_at DESC LIMIT 1; 명령어를 추가하면 최신 스냅샷 ID를 찾을 수 있습니다.

    6. 테이블의 특정 버전으로 롤백합니다.

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id를 되돌리려는 버전으로 바꿉니다.

다음 단계