Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Iceberg などの追加コンポーネントをインストールできます。このページでは、必要に応じて、Dataproc クラスタに Iceberg コンポーネントをインストールする方法について説明します。
概要
Apache Iceberg は、大規模な分析データセットに対応したオープン テーブル形式です。SQL テーブルの信頼性とシンプルさをビッグデータにもたらし、Spark、Trino、PrestoDB、Flink、Hive などのエンジンが同じテーブルを同時に安全に操作できるようにします。
Dataproc クラスタにインストールすると、Apache Iceberg コンポーネントは Iceberg ライブラリをインストールし、クラスタで Iceberg と連携するように Spark と Hive を構成します。
Iceberg の主な機能
Iceberg の機能は次のとおりです。
- スキーマの進化: テーブル全体を書き換えることなく、列の追加、削除、名前の変更を行います。
- タイムトラベル: 監査またはロールバックの目的で、過去のテーブル スナップショットをクエリします。
- 非表示パーティショニング: パーティションの詳細をユーザーに公開せずに、クエリを高速化するためにデータ レイアウトを最適化します。
- ACID トランザクション: データの整合性を確保し、競合を防ぎます。
互換性のある Dataproc イメージ バージョン
Iceberg コンポーネントは、2.2.47 以降のイメージ バージョンで作成された Dataproc クラスタにインストールできます。クラスタにインストールされている Iceberg のバージョンは、2.2 リリース バージョンのページに記載されています。
Iceberg 関連のプロパティ
Iceberg クラスタで Dataproc を作成すると、次の Spark プロパティと Hive プロパティが Iceberg と連携するように構成されます。
構成ファイル | プロパティ | デフォルト値 |
---|---|---|
/etc/spark/conf/spark-defaults.conf |
spark.sql.extensions |
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
spark.driver.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
spark.executor.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar |
iceberg.engine.hive.enabled |
true |
Iceberg オプション コンポーネントをインストールする
Dataproc クラスタの作成時に Iceberg コンポーネントをインストールします。Dataproc クラスタ イメージ バージョン リスト ページには、最新の Dataproc クラスタ イメージ バージョンに含まれる Iceberg コンポーネント バージョンが表示されます。
Google Cloud コンソール
Iceberg コンポーネントをインストールする Dataproc クラスタを作成するには、 Google Cloud コンソールで次の操作を行います。
- Dataproc の [クラスタの作成] ページを開きます。[クラスタの設定] パネルが選択されています。
- [コンポーネント] セクションの [オプション コンポーネント] で、[Iceberg] コンポーネントを選択します。
- 他のクラスタ設定を確認または指定して、[作成] をクリックします。
Google Cloud CLI
Iceberg コンポーネントをインストールする Dataproc クラスタを作成するには、--optional-components
フラグを指定した gcloud dataproc clusters create
コマンドを使用します。
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --optional-components=ICEBERG \ other flags ...
次のように置き換えます。
- CLUSTER_NAME: 新しいクラスタ名。
- REGION: クラスタ リージョン。
REST API
Iceberg オプション コンポーネントをインストールする Dataproc クラスタを作成するには、clusters.create
リクエストの一部として Iceberg SoftwareConfig.Component
を指定します。
Spark と Hive で Iceberg テーブルを使用する
Iceberg のオプション コンポーネントがクラスタにインストールされている Dataproc クラスタを作成したら、Spark と Hive を使用して Iceberg テーブルのデータを読み書きできます。
Spark
Iceberg 用の Spark セッションを構成する
gcloud CLI コマンドをローカルで使用するか、Dataproc クラスタのマスターノードで実行されている spark-shell
または pyspark
REPL(Read-Eval-Print Loop)を使用して、Iceberg の Spark 拡張機能を有効にし、Iceberg テーブルを使用するように Spark カタログを設定できます。
gcloud
次の gcloud CLI の例をローカル ターミナル ウィンドウまたは Cloud Shell で実行して、Spark ジョブを送信し、Spark プロパティを設定して Iceberg の Spark セッションを構成します。
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \ other flags ...
次のように置き換えます。
- CLUSTER_NAME: クラスタ名。
- REGION: Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と FOLDER: Cloud Storage 内の Iceberg カタログのロケーション。
spark-shell
Dataproc クラスタの spark-shell
REPL を使用して Iceberg の Spark セッションを構成するには、次の操作を行います。
SSH を使用して、Dataproc クラスタのマスターノードに接続します。
SSH セッション ターミナルで次のコマンドを実行して、Iceberg 用に Spark セッションを構成します。
spark-shell \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
次のように置き換えます。
- CLUSTER_NAME: クラスタ名。
- REGION: Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と FOLDER: Cloud Storage 内の Iceberg カタログのロケーション。
pyspark シェル
Dataproc クラスタの pyspark
REPL を使用して Iceberg の Spark セッションを構成するには、次の操作を行います。
SSH を使用して、Dataproc クラスタのマスターノードに接続します。
SSH セッション ターミナルで次のコマンドを実行して、Iceberg 用に Spark セッションを構成します。
pyspark \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
次のように置き換えます。
- CLUSTER_NAME: クラスタ名。
- REGION: Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と FOLDER: Cloud Storage 内の Iceberg カタログのロケーション。
Iceberg テーブルにデータを書き込む
Spark を使用して Iceberg テーブルにデータを書き込むことができます。次のコード スニペットは、サンプルデータを使用して DataFrame
を作成し、Cloud Storage に Iceberg テーブルを作成して、データを Iceberg テーブルに書き込みます。
PySpark
# Create a DataFrame with sample data. data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) # Create an Iceberg table in Cloud Storage. spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME ( id integer, name string) USING iceberg LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""") # Write the DataFrame to the Iceberg table in Cloud Storage. data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Scala
// Create a DataFrame with sample data. val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name") // Create an Iceberg table in Cloud Storage. spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME ( id integer, name string) USING iceberg LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""") // Write the DataFrame to the Iceberg table in Cloud Storage. data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Iceberg テーブルからデータを読み取る
Spark を使用して Iceberg テーブルからデータを読み取ることができます。次のコード スニペットは、テーブルを読み取り、その内容を表示します。
PySpark
# Read Iceberg table data into a DataFrame. df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME") # Display the data. df.show()
Scala
// Read Iceberg table data into a DataFrame. val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME") // Display the data. df.show()
Spark SQL
SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME
Hive
Hive で Iceberg テーブルを作成する
Dataproc クラスタは、Iceberg と連携するように Hive を事前に構成します。
このセクションのコード スニペットを実行するには、次の操作を行います。
SSH を使用して、Dataproc クラスタのマスターノードに接続します。
SSH ターミナル ウィンドウで
beeline
を起動します。beeline -u jdbc:hive2://
Hive でパーティション分割されていない Iceberg テーブルまたはパーティション分割された Iceberg テーブルを作成できます。
パーティション分割されていないテーブル
Hive にパーティション分割されていない Iceberg テーブルを作成します。
CREATE TABLE my_table ( id INT, name STRING, created_at TIMESTAMP ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
パーティション分割テーブル
PARTITIONED BY
句でパーティション列を指定して、Hive にパーティション分割された Iceberg テーブルを作成します。
CREATE TABLE my_partitioned_table ( id INT, name STRING ) PARTITIONED BY (date_sk INT) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Hive の Iceberg テーブルにデータを挿入する
標準の Hive INSERT
ステートメントを使用して、Iceberg テーブルにデータを挿入できます。
SET hive.execution.engine=mr; INSERT INTO my_table SELECT 1, 'Alice', current_timestamp();
制限事項
- DML(データ操作言語)オペレーションでは、MR(MapReduce)実行エンジンのみがサポートされます。
- Hive
3.1.3
では MR 実行は非推奨になりました。
Hive の Iceberg テーブルからデータを読み取る
Iceberg テーブルからデータを読み取るには、SELECT
ステートメントを使用します。
SELECT * FROM my_table;
Hive で Iceberg テーブルを削除する
Hive で Iceberg テーブルを削除するには、DROP TABLE
ステートメントを使用します。
DROP TABLE my_table;
次のステップ
- Iceberg クイック スタートガイドをご覧ください。