Dataproc の Iceberg オプション コンポーネント

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Iceberg などの追加コンポーネントをインストールできます。このページでは、必要に応じて、Dataproc クラスタに Iceberg コンポーネントをインストールする方法について説明します。

概要

Apache Iceberg は、大規模な分析データセットに対応したオープン テーブル形式です。SQL テーブルの信頼性とシンプルさをビッグデータにもたらし、Spark、Trino、PrestoDB、Flink、Hive などのエンジンが同じテーブルを同時に安全に操作できるようにします。

Dataproc クラスタにインストールすると、Apache Iceberg コンポーネントは Iceberg ライブラリをインストールし、クラスタで Iceberg と連携するように Spark と Hive を構成します。

Iceberg の主な機能

Iceberg の機能は次のとおりです。

  • スキーマの進化: テーブル全体を書き換えることなく、列の追加、削除、名前の変更を行います。
  • タイムトラベル: 監査またはロールバックの目的で、過去のテーブル スナップショットをクエリします。
  • 非表示パーティショニング: パーティションの詳細をユーザーに公開せずに、クエリを高速化するためにデータ レイアウトを最適化します。
  • ACID トランザクション: データの整合性を確保し、競合を防ぎます。

互換性のある Dataproc イメージ バージョン

Iceberg コンポーネントは、2.2.47 以降のイメージ バージョンで作成された Dataproc クラスタにインストールできます。クラスタにインストールされている Iceberg のバージョンは、2.2 リリース バージョンのページに記載されています。

Iceberg クラスタで Dataproc を作成すると、次の Spark プロパティと Hive プロパティが Iceberg と連携するように構成されます。

構成ファイル プロパティ デフォルト値
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

Iceberg オプション コンポーネントをインストールする

Dataproc クラスタの作成時に Iceberg コンポーネントをインストールします。Dataproc クラスタ イメージ バージョン リスト ページには、最新の Dataproc クラスタ イメージ バージョンに含まれる Iceberg コンポーネント バージョンが表示されます。

Google Cloud コンソール

Iceberg コンポーネントをインストールする Dataproc クラスタを作成するには、 Google Cloud コンソールで次の操作を行います。

  1. Dataproc の [クラスタの作成] ページを開きます。[クラスタの設定] パネルが選択されています。
  2. [コンポーネント] セクションの [オプション コンポーネント] で、[Iceberg] コンポーネントを選択します。
  3. 他のクラスタ設定を確認または指定して、[作成] をクリックします。

Google Cloud CLI

Iceberg コンポーネントをインストールする Dataproc クラスタを作成するには、--optional-components フラグを指定した gcloud dataproc clusters create コマンドを使用します。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

次のように置き換えます。

REST API

Iceberg オプション コンポーネントをインストールする Dataproc クラスタを作成するには、clusters.create リクエストの一部として Iceberg SoftwareConfig.Component を指定します。

Spark と Hive で Iceberg テーブルを使用する

Iceberg のオプション コンポーネントがクラスタにインストールされている Dataproc クラスタを作成したら、Spark と Hive を使用して Iceberg テーブルのデータを読み書きできます。

Spark

Iceberg 用の Spark セッションを構成する

gcloud CLI コマンドをローカルで使用するか、Dataproc クラスタのマスターノードで実行されている spark-shell または pyspark REPL(Read-Eval-Print Loop)を使用して、Iceberg の Spark 拡張機能を有効にし、Iceberg テーブルを使用するように Spark カタログを設定できます。

gcloud

次の gcloud CLI の例をローカル ターミナル ウィンドウまたは Cloud Shell で実行して、Spark ジョブを送信し、Spark プロパティを設定して Iceberg の Spark セッションを構成します。

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

次のように置き換えます。

  • CLUSTER_NAME: クラスタ名。
  • REGION: Compute Engine のリージョン。
  • CATALOG_NAME: Iceberg カタログ名。
  • BUCKETFOLDER: Cloud Storage 内の Iceberg カタログのロケーション。

spark-shell

Dataproc クラスタの spark-shell REPL を使用して Iceberg の Spark セッションを構成するには、次の操作を行います。

  1. SSH を使用して、Dataproc クラスタのマスターノードに接続します。

  2. SSH セッション ターミナルで次のコマンドを実行して、Iceberg 用に Spark セッションを構成します。

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

次のように置き換えます。

  • CLUSTER_NAME: クラスタ名。
  • REGION: Compute Engine のリージョン。
  • CATALOG_NAME: Iceberg カタログ名。
  • BUCKETFOLDER: Cloud Storage 内の Iceberg カタログのロケーション。

pyspark シェル

Dataproc クラスタの pyspark REPL を使用して Iceberg の Spark セッションを構成するには、次の操作を行います。

  1. SSH を使用して、Dataproc クラスタのマスターノードに接続します。

  2. SSH セッション ターミナルで次のコマンドを実行して、Iceberg 用に Spark セッションを構成します。

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

次のように置き換えます。

  • CLUSTER_NAME: クラスタ名。
  • REGION: Compute Engine のリージョン。
  • CATALOG_NAME: Iceberg カタログ名。
  • BUCKETFOLDER: Cloud Storage 内の Iceberg カタログのロケーション。

Iceberg テーブルにデータを書き込む

Spark を使用して Iceberg テーブルにデータを書き込むことができます。次のコード スニペットは、サンプルデータを使用して DataFrame を作成し、Cloud Storage に Iceberg テーブルを作成して、データを Iceberg テーブルに書き込みます。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Iceberg テーブルからデータを読み取る

Spark を使用して Iceberg テーブルからデータを読み取ることができます。次のコード スニペットは、テーブルを読み取り、その内容を表示します。

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

Hive で Iceberg テーブルを作成する

Dataproc クラスタは、Iceberg と連携するように Hive を事前に構成します。

このセクションのコード スニペットを実行するには、次の操作を行います。

  1. SSH を使用して、Dataproc クラスタのマスターノードに接続します。

  2. SSH ターミナル ウィンドウで beeline を起動します。

    beeline -u jdbc:hive2://
    

Hive でパーティション分割されていない Iceberg テーブルまたはパーティション分割された Iceberg テーブルを作成できます。

パーティション分割されていないテーブル

Hive にパーティション分割されていない Iceberg テーブルを作成します。

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

パーティション分割テーブル

PARTITIONED BY 句でパーティション列を指定して、Hive にパーティション分割された Iceberg テーブルを作成します。

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

Hive の Iceberg テーブルにデータを挿入する

標準の Hive INSERT ステートメントを使用して、Iceberg テーブルにデータを挿入できます。

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

制限事項

  • DML(データ操作言語)オペレーションでは、MR(MapReduce)実行エンジンのみがサポートされます。
  • Hive 3.1.3 では MR 実行は非推奨になりました。

Hive の Iceberg テーブルからデータを読み取る

Iceberg テーブルからデータを読み取るには、SELECT ステートメントを使用します。

SELECT * FROM my_table;

Hive で Iceberg テーブルを削除する

Hive で Iceberg テーブルを削除するには、DROP TABLE ステートメントを使用します。

DROP TABLE my_table;

次のステップ