Dataproc Metastore での Iceberg のサポート

このページでは、Dataproc Metastore で Hive メタストアをホストして、Dataproc で Apache Iceberg を使用する方法について説明します。Spark、Hive、Presto を介して Iceberg テーブルを使用する方法に関する情報が記載されています。

特長

Apache Iceberg は、大規模な分析データセットに対応したオープン テーブル形式です。Iceberg はパフォーマンスを大幅に改善し、次のような高度な機能を提供します。

  • 原子性: テーブルの変更は完了するか失敗するかのいずれかです。テーブルの変更の部分 commit はありません。

  • スナップショット分離: 読み取りでは、ロックを保持せずにテーブルのスナップショットを 1 つだけ使用します。

  • 複数の同時書き込み: オプティミスティック同時実行と再試行を使用して、書き込みの競合が発生しても互換性のある更新が行われるようにします。

  • スキーマの進化: 列が ID によって追跡され、追加、ドロップ、更新、名前の変更をサポートします。

  • タイムトラベル: 再現可能なクエリでは、同じテーブルまたはスナップショットを使用できます。変更を簡単に調べることができます。

  • 分散計画: ファイルのプルーニングと述語プッシュダウンをジョブに分散し、ボトルネックとしてメタストアを削除します。

  • 変更履歴とロールバック: テーブルを以前の状態にリセットすることで問題を修正します。

  • 非表示のパーティショニング: 暗黙的に誤った結果やクエリが非常に遅いユーザーのミスを防ぎます。

  • パーティション レイアウトの進化: データ ボリュームやクエリパターンの変化に応じてテーブルのレイアウトを更新できます。

  • スキャン計画とファイル フィルタリング: 不要なメタデータ ファイルをプルーニングし、一致するデータを含まないデータファイルをフィルタすることによって、クエリに必要なファイルを検索します。

互換性

Iceberg は Dataproc や Dataproc Metastore と連携します。SQL テーブルのように機能する、高パフォーマンス形式のテーブルを Spark と Presto に追加できます。Iceberg は、スナップショットの最新バージョンへのポインタを使用しており、バージョンの切り替え時にアトミック性を確保するためのメカニズムを必要とします。テーブルを追跡するには、Hive カタログと Hadoop テーブルの 2 つのオプションを使用できます。

サポートされている機能は次のとおりです。

推進要因 選択 挿入 テーブルを作成
Spark ✓ ✓ ✓
Hive ✓ ✓
Presto ✓ ✓ ✓

前提条件

まず Dataproc クラスタを作成し、その Hive Metastore として Dataproc Metastore サービスを使用します。詳細については、Dataproc クラスタの作成をご覧ください。クラスタを作成したら、ブラウザまたはコマンドラインからクラスタに SSH 接続します。

Spark で Iceberg テーブルを使用する

Iceberg のテーブルは読み取りオペレーションと書き込みオペレーションをサポートしています。詳細については、Apache Iceberg - Spark をご覧ください。

Spark 構成

まず、Spark シェルを起動し、Cloud Storage バケットを使用してデータを保存します。 Iceberg を Spark のインストールに含めるには、Iceberg Spark ランタイム JAR ファイルを Spark の JAR フォルダに追加します。JAR ファイルをダウンロードするには、Apache Iceberg のダウンロードをご覧ください。次のコマンドは、Apache Iceberg をサポートする Spark シェルを起動します。

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive カタログを使用した Iceberg テーブルの作成

  1. Spark Scala に Iceberg テーブルを作成するように Hive カタログ構成を設定します。

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. サンプルデータを挿入する

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 列に基づいてパーティション戦略を指定します。

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. テーブルを作成します。

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Iceberg ストレージ ハンドラと SerDe をテーブル プロパティとして追加します。

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. テーブルにデータを書き込みます。

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. データを読み取ります。

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. テーブル スキーマを変更します。次に例を示します。

    1. テーブルを取得して、新しい列 grade を追加します。

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 新しいテーブル スキーマを確認します。

      table.schema.toString;
      
  4. より多くのデータを挿入し、スキーマの進化を表示する。次に例を示します。

    1. テーブルに新しいデータを追加します。

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 挿入された新しいデータを確認します。

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. テーブルの履歴を表示します。

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. スナップショットを表示します。

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. マニフェスト ファイルを表示します。

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. データファイルを表示します。

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. id=6 の値を含む行を追加し、誤った表を表示する必要があるとします。

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id は、復元するバージョンに置き換えます。

Hadoop テーブルを使用した Iceberg テーブルの作成

  1. Hadoop テーブル構成を設定して、Spark Scala で Iceberg テーブルを作成します。

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. サンプルデータを挿入する

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 列に基づいてパーティション戦略を指定します。

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. テーブルを作成します。

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. テーブルにデータを書き込みます。

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. データを読み取ります。

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. テーブル スキーマを変更します。次に例を示します。

    1. テーブルを取得して、新しい列 grade を追加します。

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 新しいテーブル スキーマを確認します。

      table.schema.toString;
      
  4. より多くのデータを挿入し、スキーマの進化を表示する。次に例を示します。

    1. テーブルに新しいデータを追加します。

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 挿入された新しいデータを確認します。

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. テーブルの履歴を表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. スナップショットを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. マニフェスト ファイルを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. データファイルを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 戻って特別なバージョンのテーブルを閲覧します。

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id は、復元するバージョンに置き換えて "L" を末尾に追加します。例: "3943776515926014142L"

Hive で Iceberg テーブルを使用する

Iceberg は StorageHandler を使用して、Hive を介したテーブルの読み取りをサポートしています。Hive 2.x と 3.1.2 のバージョンのみがサポートされています。詳しくは、Apache Iceberg - Hive をご覧ください。さらに、Iceberg Hive ランタイム JAR ファイルを Hive クラスパスに追加します。JAR ファイルをダウンロードするには、Apache Iceberg のダウンロードをご覧ください。

Hive テーブルを Iceberg テーブルの上にオーバーレイするには、Hive カタログまたは Hadoop テーブルを使用して Iceberg テーブルを作成する必要があります。さらに、Iceberg テーブルからデータを読み取るように Hive を構成する必要があります。

Hive のアイスベア テーブル(Hive カタログ)の読み取り

  1. Hive クライアントを開き、Hive クライアント セッションで Iceberg テーブルを読み取るように構成を行います。

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. テーブルのスキーマとデータを読み取ります。次に例を示します。

    1. テーブルのスキーマとテーブル形式が Iceberg かどうかを確認します。

      describe formatted example;
      
    2. テーブルからデータを読み取ります。

      select * from example;
      

Hive の Iceberg テーブル(Hadoop テーブル)の読み取り

  1. Hive クライアントを開き、Hive クライアント セッションで Iceberg テーブルを読み取るように構成を行います。

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. テーブルのスキーマとデータを読み取ります。次に例を示します。

    1. 外部テーブルを作成します(Iceberg テーブルの上に Hive テーブルをオーバーレイします)。

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. テーブルのスキーマとテーブル形式が Iceberg かどうかを確認します。

      describe formatted hadoop_table;
      
    3. テーブルからデータを読み取ります。

      select * from hadoop_table;
      

Presto での Iceberg テーブルの使用

Presto クエリは Hive コネクタを使用してパーティションの場所を取得します。このため、Iceberg テーブルに対してデータの読み取りと書き込みを行うように Presto を構成する必要があります。詳しくは、Presto/Trino - Hive コネクタPresto/Trino - Iceberg コネクタをご覧ください。

Presto の設定

  1. 各 Dataproc クラスタノードで、iceberg.properties /etc/presto/conf/catalog/iceberg.properties という名前のファイルを作成し、hive.metastore.uri を次のように構成します。

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 は、Hive メタストアの Thrift サービスの正しいホストとポートに置き換えます。

  2. Presto サービスを再起動して構成をプッシュします。

    sudo systemctl restart presto.service
    

Presto での Iceberg テーブルの作成

  1. Presto クライアントを開き、「Iceberg」コネクタを使用してメタストアを取得します。

    --catalog iceberg --schema default
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. サンプルデータを挿入する

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. テーブルからデータを読み取ります。

      SELECT * FROM iceberg.default.example;
      
    4. 新しいデータを挿入してスナップショットを確認します。

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. スナップショットを表示します。

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      コマンド ORDER BY committed_at DESC LIMIT 1; を追加すると、最新のスナップショット ID を検索できます。

    6. 特定のバージョンのテーブルへのロールバック:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id は、復元するバージョンに置き換えます。

次のステップ