Dataproc Metastore で Apache Iceberg テーブルを使用する

このページでは、Dataproc クラスタに接続された Dataproc Metastore サービスで Apache Iceberg テーブルを使用する方法について説明します。Apache Iceberg は、大規模な分析データセットに対応したオープン テーブル形式です。

互換性

Iceberg テーブルは、次の機能をサポートしています。

ドライバ 選択 挿入 テーブルを作成
Spark
Hive
Presto

準備

Spark で Iceberg テーブルを使用する

次の例は、Spark で Iceberg テーブルを使用する必要があることを示しています。

Iceberg テーブルは読み取り / 書き込みオペレーションをサポートしています。詳細については、Apache Iceberg - Spark をご覧ください。

Spark 構成

まず、Spark シェルを起動し、Cloud Storage バケットを使用してデータを保存します。 Iceberg を Spark のインストールに含めるには、Iceberg Spark ランタイム JAR ファイルを Spark の JAR フォルダに追加します。JAR ファイルをダウンロードするには、Apache Iceberg のダウンロードをご覧ください。次のコマンドを実行すると、Apache Iceberg をサポートする Spark シェルが起動します。

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive カタログを使用して Iceberg テーブルを作成する

  1. Spark Scala に Iceberg テーブルを作成するように Hive カタログ構成を設定します。

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. サンプルデータを挿入する

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 列に基づいてパーティション戦略を指定します。

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. テーブルを作成します。

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Iceberg ストレージ ハンドラと SerDe をテーブル プロパティとして追加します。

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. テーブルにデータを書き込みます。

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. データを読み取ります。

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. テーブル スキーマを変更します。次に例を示します。

    1. テーブルを取得して、新しい列 grade を追加します。

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 新しいテーブル スキーマを確認します。

      table.schema.toString;
      
  4. より多くのデータを挿入し、スキーマの進化を表示する。次に例を示します。

    1. テーブルに新しいデータを追加します。

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 挿入された新しいデータを確認します。

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. テーブルの履歴を表示します。

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. スナップショットを表示します。

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. マニフェスト ファイルを表示します。

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. データファイルを表示します。

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. id=6 の値を含む行を追加し、誤った表を表示する必要があるとします。

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id は、復元するバージョンに置き換えます。

Hadoop テーブルを使用して Iceberg テーブルを作成する

  1. Hadoop テーブル構成を設定して、Spark Scala で Iceberg テーブルを作成します。

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. サンプルデータを挿入する

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 列に基づいてパーティション戦略を指定します。

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. テーブルを作成します。

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. テーブルにデータを書き込みます。

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. データを読み取ります。

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. テーブル スキーマを変更します。次に例を示します。

    1. テーブルを取得して、新しい列 grade を追加します。

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 新しいテーブル スキーマを確認します。

      table.schema.toString;
      
  4. より多くのデータを挿入し、スキーマの進化を表示する。次に例を示します。

    1. テーブルに新しいデータを追加します。

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 挿入された新しいデータを確認します。

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. テーブルの履歴を表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. スナップショットを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. マニフェスト ファイルを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. データファイルを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 戻って特別なバージョンのテーブルを閲覧します。

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id は、復元するバージョンに置き換えて "L" を末尾に追加します。例: "3943776515926014142L"

Hive で Iceberg テーブルを使用する

Iceberg は、StorageHandler を使用して Hive を使用して読み取るテーブルをサポートしています。Hive 2.x と 3.1.2 のバージョンのみがサポートされています。詳しくは、Apache Iceberg - Hive をご覧ください。さらに、Iceberg Hive ランタイム JAR ファイルを Hive クラスパスに追加します。JAR ファイルをダウンロードするには、Apache Iceberg のダウンロードをご覧ください。

Hive テーブルを Iceberg のテーブルの上にオーバーレイするには、Hive カタログまたは Hadoop テーブルを使用して Iceberg テーブルを作成する必要があります。また、Iceberg テーブルからデータを読み取るように Hive を構成する必要があります。

Hive の Iceberg テーブル(Hive カタログ)を読み取る

  1. Hive クライアントを開き、Hive クライアント セッションで Iceberg テーブルを読み取るように構成を行います。

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. テーブルのスキーマとデータを読み取ります。次に例を示します。

    1. テーブルのスキーマとテーブル形式が Iceberg かどうかを確認します。

      describe formatted example;
      
    2. テーブルからデータを読み取ります。

      select * from example;
      

Hive の Iceberg テーブル(Hadoop テーブル)を読み取る

  1. Hive クライアントを開き、Hive クライアント セッションで Iceberg テーブルを読み取るように構成を行います。

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. テーブルのスキーマとデータを読み取ります。次に例を示します。

    1. 外部テーブルを作成します(Iceberg テーブルの上に Hive テーブルをオーバーレイします)。

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. テーブルのスキーマとテーブル形式が Iceberg かどうかを確認します。

      describe formatted hadoop_table;
      
    3. テーブルからデータを読み取ります。

      select * from hadoop_table;
      

Presto で Iceberg テーブルを使用する

Presto クエリは Hive コネクタを使用してパーティションの場所を取得します。このため、Iceberg テーブルに対してデータの読み取りと書き込みを行うように Presto を構成する必要があります。詳しくは、Presto/Trino - Hive コネクタPresto/Trino - Iceberg コネクタをご覧ください。

Presto の設定

  1. 各 Dataproc クラスタノードで、iceberg.properties /etc/presto/conf/catalog/iceberg.properties という名前のファイルを作成し、hive.metastore.uri を次のように構成します。

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 は、Hive メタストアの Thrift サービスの正しいホストとポートに置き換えます。

  2. Presto サービスを再起動して構成をプッシュします。

    sudo systemctl restart presto.service
    

Presto に Iceberg テーブルを作成する

  1. Presto クライアントを開き、「Iceberg」コネクタを使用してメタストアを取得します。

    --catalog iceberg --schema default
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. サンプルデータを挿入する

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. テーブルからデータを読み取ります。

      SELECT * FROM iceberg.default.example;
      
    4. 新しいデータを挿入してスナップショットを確認します。

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. スナップショットを表示します。

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      コマンド ORDER BY committed_at DESC LIMIT 1; を追加すると、最新のスナップショット ID を検索できます。

    6. 特定のバージョンのテーブルへのロールバック:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id は、復元するバージョンに置き換えます。

次のステップ