このドキュメントでは、Dataproc Jobs サービス、Spark SQL CLI、または Dataproc クラスタで実行されている Zeppelin ウェブ インターフェースを使用して、BigQuery Metastore にメタデータを含む Apache Iceberg テーブルを作成する方法について説明します。
始める前に
まだ作成していない場合は、 Google Cloud プロジェクト、Cloud Storage バケット、Dataproc クラスタを作成します。
プロジェクトを設定する
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
プロジェクトに Cloud Storage バケットを作成します。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
Dataproc クラスタを作成します。リソースと費用を節約するには、単一ノード Dataproc クラスタを作成して、このドキュメントで説明する例を実行します。
クラスタが作成されるリージョンのサブネットで、限定公開の Google アクセス(PGA)を有効にする必要があります。
このガイドの Zeppelin ウェブ インターフェースの例を実行するには、Zeppelin オプション コンポーネントが有効になっている Dataproc クラスタを使用または作成する必要があります。
カスタム サービス アカウントにロールを付与する(必要に応じて): デフォルトでは、Dataproc クラスタ VM は Compute Engine のデフォルトのサービス アカウントを使用して Dataproc とやり取りします。クラスタの作成時にカスタム サービス アカウントを指定する場合は、Dataproc ワーカーロール(
roles/dataproc.worker
)ロールまたは必要なワーカーロール権限を持つカスタムロールが必要です。
OSS データベースと BigQuery データセットのマッピング
オープンソース データベースと BigQuery データセットの用語の対応は次のとおりです。
OSS データベース | BigQuery データセット |
---|---|
Namespace、Database | データセット |
パーティション分割テーブルまたはパーティション分割なしのテーブル | テーブル |
ビュー | 表示 |
Iceberg テーブルを作成する
このセクションでは、Dataproc クラスタで実行されている Dataproc サービス、Spark SQL CLI、Zeppelin コンポーネント ウェブ インターフェースに Spark SQL コードを送信して、BigQuery Metastore にメタデータを含む Iceberg テーブルを作成する方法について説明します。
Dataproc ジョブ
Dataproc サービスにジョブを送信するには、Google Cloud コンソールまたは Google Cloud CLI を使用してジョブを Dataproc クラスタに送信します。または、HTTP REST リクエストまたはプログラムによる gRPC Dataproc Cloud クライアント ライブラリ呼び出しで Dataproc Jobs API に送信します。
このセクションの例では、gcloud CLI、 Google Cloud コンソール、または Dataproc REST API を使用して、Dataproc Spark SQL ジョブを Dataproc サービスに送信し、BigQuery にメタデータを含む Iceberg テーブルを作成する方法を示します。
ジョブファイルを準備する
Spark SQL ジョブファイルを作成する手順は次のとおりです。このファイルには、Iceberg テーブルの作成と更新を行う Spark SQL コマンドがあります。
ローカル ターミナル ウィンドウまたは Cloud Shell で、
vi
やnano
などのテキスト エディタを使用して、次のコマンドをiceberg-table.sql
ファイルにコピーし、ファイルを現在のディレクトリに保存します。USE CATALOG_NAME; CREATE NAMESPACE IF NOT EXISTS example_namespace; USE example_namespace; DROP TABLE IF EXISTS example_table; CREATE TABLE example_table (id int, data string) USING ICEBERG LOCATION 'gs://BUCKET/WAREHOUSE_FOLDER'; INSERT INTO example_table VALUES (1, 'first row'); ALTER TABLE example_table ADD COLUMNS (newDoubleCol double); DESCRIBE TABLE example_table;
次のように置き換えます。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と WAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
gsutil
ツールを使用して、ローカルのiceberg-table.sql
を Cloud Storage のバケットにコピーします。gsutil cp iceberg-table.sql gs://BUCKET/
次に、iceberg-spark-runtime-3.5_2.12-1.5.2
JAR ファイルをダウンロードして Cloud Storage にコピーします。
ローカル ターミナル ウィンドウまたは Cloud Shell で次の
curl
コマンドを実行して、iceberg-spark-runtime-3.5_2.12-1.5.2
JAR ファイルを現在のディレクトリにダウンロードします。curl -o iceberg-spark-runtime-3.5_2.12-1.5.2.jar https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar
gsutil
ツールを使用して、ローカルのiceberg-spark-runtime-3.5_2.12-1.5.2
JAR ファイルを現在のディレクトリから Cloud Storage のバケットにコピーします。gsutil cp iceberg-spark-runtime-3.5_2.12-1.5.2.jar gs://BUCKET/
Spark SQL ジョブを送信する
タブを選択して、手順に沿って gcloud CLI、Google Cloud コンソール、または Dataproc REST API を使用して Spark SQL ジョブを Dataproc サービスに送信します。
gcloud
ローカルのターミナル ウィンドウまたは Cloud Shell で、次の gcloud dataproc jobs submit spark-sql コマンドをローカルに実行して、Spark SQL ジョブを送信し、Iceberg テーブルを作成します。
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars="gs://BUCKET/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER" \ -f="gs://BUCKETiceberg-table.sql"
注:
- PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- CLUSTER_NAME: Dataproc クラスタの名前。
- REGION: クラスタが配置されている Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と WAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
- LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
--jars
: リスト内の JAR は、BigQuery Metastore でテーブル メタデータを作成するために必要です。--properties
: カタログのプロパティ。-f
: Cloud Storage のバケットにコピーしたiceberg-table.sql
ジョブファイル。
ジョブが完了したら、ターミナル出力でテーブルの説明を確認します。
Time taken: 2.194 seconds id int data string newDoubleCol double Time taken: 1.479 seconds, Fetched 3 row(s) Job JOB_ID finished successfully.
BigQuery でテーブルのメタデータを表示する
Google Cloud コンソールで、[BigQuery] ページに移動します。
Iceberg テーブルのメタデータを表示します。
Console
次の手順で、 Google Cloud コンソールを使用して Spark SQL ジョブを Dataproc サービスに送信し、BigQuery Metastore にメタデータを含む Iceberg テーブルを作成します。
Google Cloud コンソールで、Dataproc の [ジョブを送信] に移動します。
[ジョブを送信] ページに移動し、次のフィールドに入力します。
- ジョブ ID: 提案された ID を承諾するか、独自の ID を挿入します。
- リージョン: クラスタが配置されるリージョンを選択します。
- クラスタ: クラスタを選択します。
- Job type:
SparkSql
を選択します。 - クエリソースのタイプ:
Query file
を選択します。 - クエリ ファイル:
gs://BUCKET/iceberg-table.sql
を挿入します。 - JAR ファイル: 次のように挿入します。
gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
- プロパティ:
key
value
入力フィールドのリストを作成します。次に、次のキーと値のペアをコピーして、5 つのプロパティを定義します。# キー 値 1. spark.sql.catalog.CATALOG_NAME
org.apache.iceberg.spark.SparkCatalog
2. spark.sql.catalog.CATALOG_NAME.catalog-impl
org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
3. spark.sql.catalog.CATALOG_NAME.gcp_project
PROJECT_ID
4. spark.sql.catalog.CATALOG_NAME.gcp_location
LOCATION
5. spark.sql.catalog.CATALOG_NAME.warehouse
gs://BUCKET/WAREHOUSE_FOLDER
[プロパティを追加] を 5 回クリックして、5 つの
注:
- CATALOG_NAME: Iceberg カタログ名。
- PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。クラスタが配置されているリージョン。
- LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
- BUCKET と WAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
[送信] をクリックします。
ジョブの進行状況をモニタリングしてジョブ出力を表示するには、 Google Cloud コンソールの Dataproc の [ジョブ] ページに移動し、
Job ID
をクリックして [ジョブの詳細] ページを開きます。BigQuery でテーブルのメタデータを表示する
Google Cloud コンソールで、[BigQuery] ページに移動します。
Iceberg テーブルのメタデータを表示します。
REST
Dataproc の jobs.submit API を使用して Spark SQL ジョブを Dataproc サービスに送信し、BigQuery Metastore にメタデータを含む Iceberg テーブルを作成できます。
リクエストのデータを使用する前に、次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- CLUSTER_NAME: Dataproc クラスタの名前。
- REGION: クラスタが配置されている Compute Engine リージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と WAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。 LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
jarFileUris
: リストされている JAR は、BigQuery Metastore でテーブル メタデータを作成するために必要です。properties
: カタログのプロパティ。queryFileUri
: Cloud Storage のバケットにコピーしたiceberg-table.sql
ジョブファイル。
HTTP メソッドと URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
リクエストの本文(JSON):
{ "projectId": "PROJECT_ID", "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "statusHistory": [], "reference": { "jobId": "", "projectId": "PROJECT_ID" }, "sparkSqlJob": { "properties": { "spark.sql.catalog."CATALOG_NAME": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog."CATALOG_NAME".catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog", "spark.sql.catalog."CATALOG_NAME".gcp_project": "PROJECT_ID", "spark.sql.catalog."CATALOG_NAME".gcp_location": "LOCATION", "spark.sql.catalog."CATALOG_NAME".warehouse": "gs://BUCKET/WAREHOUSE_FOLDER" }, "jarFileUris": [ "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar" ], "scriptVariables": {}, "queryFileUri": "gs://BUCKET/iceberg-table.sql" } } }
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{ "reference": { "projectId": "PROJECT_ID", "jobId": "..." }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "..." }, "status": { "state": "PENDING", "stateStartTime": "..." }, "submittedBy": "USER", "sparkSqlJob": { "queryFileUri": "gs://BUCKET/iceberg-table.sql", "properties": { "spark.sql.catalog.USER_catalog": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.USER_catalog.catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog", "spark.sql.catalog.USER_catalog.gcp_project": "PROJECT_ID", "spark.sql.catalog.USER_catalog.gcp_location": "LOCATION", "spark.sql.catalog.USER_catalog.warehouse": "gs://BUCKET/WAREHOUSE_FOLDER" }, "jarFileUris": [ "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar", "gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar" ] }, "driverControlFilesUri": "gs://dataproc-...", "driverOutputResourceUri": "gs://dataproc-.../driveroutput", "jobUuid": "...", "region": "REGION" }
ジョブの進行状況をモニタリングしてジョブ出力を表示するには、 Google Cloud コンソールの Dataproc の [ジョブ] ページに移動し、Job ID
をクリックして [ジョブの詳細] ページを開きます。
BigQuery でテーブルのメタデータを表示する
Google Cloud コンソールで、[BigQuery] ページに移動します。
Iceberg テーブルのメタデータを表示します。
Spark SQL CLI
次の手順では、Dataproc クラスタのマスタノードで実行されている Spark SQL CLI を使用して、テーブル メタデータが BigQuery Metastore に保存された Iceberg テーブルを作成する方法を示します。
SSH を使用して、Dataproc クラスタのマスターノードに接続します。
SSH セッション ターミナルで、
vi
またはnano
テキスト エディタを使用して、次のコマンドをiceberg-table.sql
ファイルにコピーします。SET CATALOG_NAME = `CATALOG_NAME`; SET BUCKET = `BUCKET`; SET WAREHOUSE_FOLDER = `WAREHOUSE_FOLDER`; USE `${CATALOG_NAME}`; CREATE NAMESPACE IF NOT EXISTS `${CATALOG_NAME}`.example_namespace; DROP TABLE IF EXISTS `${CATALOG_NAME}`.example_namespace.example_table; CREATE TABLE `${CATALOG_NAME}`.example_namespace.example_table (id int, data string) USING ICEBERG LOCATION 'gs://`${BUCKET}`/`${WAREHOUSE_FOLDER}`'; INSERT INTO `${CATALOG_NAME}`.example_namespace.example_table VALUES (1, 'first row'); ALTER TABLE `${CATALOG_NAME}`.example_namespace.example_table ADD COLUMNS (newDoubleCol double); DESCRIBE TABLE `${CATALOG_NAME}`.example_namespace.example_table;
次のように置き換えます。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と WAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
SSH セッション ターミナルで、次の
spark-sql
コマンドを実行してアイスバーグ テーブルを作成します。spark-sql \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER \ -f iceberg-table.sql
次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
BigQuery でテーブルのメタデータを表示する
Google Cloud コンソールで、[BigQuery] ページに移動します。
Iceberg テーブルのメタデータを表示します。
Zeppelin ウェブ インターフェース
次の手順では、Dataproc クラスタのマスターノードで実行されている Zeppelin ウェブ インターフェースを使用して、BigQuery Metastore にテーブル メタデータを格納する Iceberg テーブルを作成する方法を示します。
Google Cloud コンソールで、Dataproc の [クラスタ] ページに移動します。
クラスタ名を選択して、[クラスタの詳細] ページを開きます。
[ウェブ インターフェース] タブをクリックすると、クラスタにインストールされているデフォルト コンポーネントとオプション コンポーネントのウェブ インターフェースへのコンポーネント ゲートウェイ リンクのリストが表示されます。
[Zeppelin] リンクをクリックして Zeppelin ウェブ インターフェースを開きます。
Zeppelin ウェブ インターフェースで [匿名] メニューをクリックし、[インタープリタ] をクリックして [インタープリタ] ページを開きます。
次のように、2 つの jar を Zeppelin Spark インタープリタに追加します。
Search interpreters
ボックスに「Spark」と入力して、[Spark インタープリタ] セクションまでスクロールします。- [edit] をクリックします。
spark.jars フィールドに次の内容を貼り付けます。
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
[Spark インタープリタ] セクションの下部にある [保存] をクリックし、[OK] をクリックしてインタープリタを更新し、新しい設定で Spark インタープリタを再起動します。
Zeppelin ノートブックのメニューで、[Create new note] をクリックします。
[Create new note] ダイアログでノートブックの名前を入力し、デフォルトの spark インタープリタを受け入れます。[作成] をクリックしてノートブックを開きます。
変数を入力したら、次の PySpark コードを Zeppelin ノートブックにコピーします。
%pyspark
from pyspark.sql import SparkSession
project_id = "PROJECT_ID" catalog = "CATALOG_NAME" namespace = "NAMESPACE" location = "LOCATION" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \ .config(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config(f"spark.sql.catalog.{catalog}.gcp_project", f"{project_id}") \ .config(f"spark.sql.catalog.{catalog}.gcp_location", f"{location}") \ .config(f"spark.sql.catalog.{catalog}.warehouse", f"{warehouse_dir}") \ .getOrCreate()
spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;")
\# Create table and display schema (without LOCATION) spark.sql("DROP TABLE IF EXISTS example_iceberg_table") spark.sql("CREATE TABLE example_iceberg_table (id int, data string) USING ICEBERG") spark.sql("DESCRIBE example_iceberg_table;")
\# Insert table data. spark.sql("INSERT INTO example_iceberg_table VALUES (1, 'first row');")
\# Alter table, then display schema. spark.sql("ALTER TABLE example_iceberg_table ADD COLUMNS (newDoubleCol double);")
\# Select and display the contents of the table. spark.sql("SELECT * FROM example_iceberg_table").show()次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- CATALOG_NAME と NAMESPACE: Iceberg カタログ名と名前空間を組み合わせて、Iceberg テーブル(
catalog.namespace.table_name
)を識別します。 - LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
- BUCKET と WAREHOUSE_DIRECTORY: Iceberg ウェアハウス ディレクトリとして使用される Cloud Storage バケットとフォルダ。
実行アイコンをクリックするか、
Shift-Enter
キーを押してコードを実行します。ジョブが完了すると、ステータス メッセージに「Spark Job Finished」と表示され、出力にテーブルの内容が表示されます。BigQuery でテーブルのメタデータを表示する
Google Cloud コンソールで、[BigQuery] ページに移動します。
Iceberg テーブルのメタデータを表示します。