BigQuery Metastore でメタデータを含む Apache Iceberg テーブルを作成する

このドキュメントでは、Dataproc Jobs サービス、Spark SQL CLI、または Dataproc クラスタで実行されている Zeppelin ウェブ インターフェースを使用して、BigQuery Metastore にメタデータを含む Apache Iceberg テーブルを作成する方法について説明します。

始める前に

まだ作成していない場合は、 Google Cloud プロジェクト、Cloud Storage バケット、Dataproc クラスタを作成します。

  1. プロジェクトを設定する

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Make sure that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    5. Install the Google Cloud CLI.
    6. To initialize the gcloud CLI, run the following command:

      gcloud init
    7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    8. Make sure that billing is enabled for your Google Cloud project.

    9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    10. Install the Google Cloud CLI.
    11. To initialize the gcloud CLI, run the following command:

      gcloud init

  2. プロジェクトに Cloud Storage バケットを作成します。

    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a name that meets the bucket naming requirements.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select a storage class.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.

  3. Dataproc クラスタを作成します。リソースと費用を節約するには、単一ノード Dataproc クラスタを作成して、このドキュメントで説明する例を実行します。

  4. カスタム サービス アカウントにロールを付与する(必要に応じて): デフォルトでは、Dataproc クラスタ VM は Compute Engine のデフォルトのサービス アカウントを使用して Dataproc とやり取りします。クラスタの作成時にカスタム サービス アカウントを指定する場合は、Dataproc ワーカーロール(roles/dataproc.workerロールまたは必要なワーカーロール権限を持つカスタムロールが必要です。

OSS データベースと BigQuery データセットのマッピング

オープンソース データベースと BigQuery データセットの用語の対応は次のとおりです。

OSS データベース BigQuery データセット
Namespace、Database データセット
パーティション分割テーブルまたはパーティション分割なしのテーブル テーブル
ビュー 表示

Iceberg テーブルを作成する

このセクションでは、Dataproc クラスタで実行されている Dataproc サービスSpark SQL CLIZeppelin コンポーネント ウェブ インターフェースに Spark SQL コードを送信して、BigQuery Metastore にメタデータを含む Iceberg テーブルを作成する方法について説明します。

Dataproc ジョブ

Dataproc サービスにジョブを送信するには、Google Cloud コンソールまたは Google Cloud CLI を使用してジョブを Dataproc クラスタに送信します。または、HTTP REST リクエストまたはプログラムによる gRPC Dataproc Cloud クライアント ライブラリ呼び出しで Dataproc Jobs API に送信します。

このセクションの例では、gcloud CLI、 Google Cloud コンソール、または Dataproc REST API を使用して、Dataproc Spark SQL ジョブを Dataproc サービスに送信し、BigQuery にメタデータを含む Iceberg テーブルを作成する方法を示します。

ジョブファイルを準備する

Spark SQL ジョブファイルを作成する手順は次のとおりです。このファイルには、Iceberg テーブルの作成と更新を行う Spark SQL コマンドがあります。

  1. ローカル ターミナル ウィンドウまたは Cloud Shell で、vinano などのテキスト エディタを使用して、次のコマンドを iceberg-table.sql ファイルにコピーし、ファイルを現在のディレクトリに保存します。

    USE CATALOG_NAME;
    CREATE NAMESPACE IF NOT EXISTS example_namespace;
    USE example_namespace;
    DROP TABLE IF EXISTS example_table;
    CREATE TABLE example_table (id int, data string) USING ICEBERG LOCATION 'gs://BUCKET/WAREHOUSE_FOLDER';
    INSERT INTO example_table VALUES (1, 'first row');
    ALTER TABLE example_table ADD COLUMNS (newDoubleCol double);
    DESCRIBE TABLE example_table;
    

    次のように置き換えます。

    • CATALOG_NAME: Iceberg カタログ名。
    • BUCKETWAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
  2. gsutil ツールを使用して、ローカルの iceberg-table.sql を Cloud Storage のバケットにコピーします。

    gsutil cp iceberg-table.sql gs://BUCKET/
    

次に、iceberg-spark-runtime-3.5_2.12-1.5.2 JAR ファイルをダウンロードして Cloud Storage にコピーします。

  1. ローカル ターミナル ウィンドウまたは Cloud Shell で次の curl コマンドを実行して、iceberg-spark-runtime-3.5_2.12-1.5.2 JAR ファイルを現在のディレクトリにダウンロードします。

    curl -o iceberg-spark-runtime-3.5_2.12-1.5.2.jar https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar
    
  2. gsutil ツールを使用して、ローカルの iceberg-spark-runtime-3.5_2.12-1.5.2 JAR ファイルを現在のディレクトリから Cloud Storage のバケットにコピーします。

    gsutil cp iceberg-spark-runtime-3.5_2.12-1.5.2.jar gs://BUCKET/
    

Spark SQL ジョブを送信する

タブを選択して、手順に沿って gcloud CLI、Google Cloud コンソール、または Dataproc REST API を使用して Spark SQL ジョブを Dataproc サービスに送信します。

gcloud

  1. ローカルのターミナル ウィンドウまたは Cloud Shell で、次の gcloud dataproc jobs submit spark-sql コマンドをローカルに実行して、Spark SQL ジョブを送信し、Iceberg テーブルを作成します。

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars="gs://BUCKET/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar" \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER" \
        -f="gs://BUCKETiceberg-table.sql"
    

    注:

    • PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
    • CLUSTER_NAME: Dataproc クラスタの名前。
    • REGION: クラスタが配置されている Compute Engine のリージョン
    • CATALOG_NAME: Iceberg カタログ名。
    • BUCKETWAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
    • LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
    • --jars: リスト内の JAR は、BigQuery Metastore でテーブル メタデータを作成するために必要です。
    • --properties: カタログのプロパティ
    • -f: Cloud Storage のバケットにコピーした iceberg-table.sql ジョブファイル。
  2. ジョブが完了したら、ターミナル出力でテーブルの説明を確認します。

    Time taken: 2.194 seconds
    id                      int
    data                    string
    newDoubleCol            double
    Time taken: 1.479 seconds, Fetched 3 row(s)
    Job JOB_ID finished successfully.
    
  3. BigQuery でテーブルのメタデータを表示する

    1. Google Cloud コンソールで、[BigQuery] ページに移動します。

      BigQuery Studio に移動

    2. Iceberg テーブルのメタデータを表示します。

Console

次の手順で、 Google Cloud コンソールを使用して Spark SQL ジョブを Dataproc サービスに送信し、BigQuery Metastore にメタデータを含む Iceberg テーブルを作成します。

  1. Google Cloud コンソールで、Dataproc の [ジョブを送信] に移動します。

    [ジョブを送信] ページに移動し、次のフィールドに入力します。

    • ジョブ ID: 提案された ID を承諾するか、独自の ID を挿入します。
    • リージョン: クラスタが配置されるリージョンを選択します。
    • クラスタ: クラスタを選択します。
    • Job type: SparkSql を選択します。
    • クエリソースのタイプ: Query file を選択します。
    • クエリ ファイル: gs://BUCKET/iceberg-table.sql を挿入します。
    • JAR ファイル: 次のように挿入します。
      gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
      
    • プロパティ: [プロパティを追加] を 5 回クリックして、5 つの key value 入力フィールドのリストを作成します。次に、次のキーのペアをコピーして、5 つのプロパティを定義します。
      # キー
      1.
      spark.sql.catalog.CATALOG_NAME
      
      org.apache.iceberg.spark.SparkCatalog
      
      2.
      spark.sql.catalog.CATALOG_NAME.catalog-impl
      
      org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
      
      3.
      spark.sql.catalog.CATALOG_NAME.gcp_project
      
      PROJECT_ID
      
      4.
      spark.sql.catalog.CATALOG_NAME.gcp_location
      
      LOCATION
      
      5.
      spark.sql.catalog.CATALOG_NAME.warehouse
      
      gs://BUCKET/WAREHOUSE_FOLDER
      

    注:

    • CATALOG_NAME: Iceberg カタログ名。
    • PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。クラスタが配置されているリージョン
    • LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
    • BUCKETWAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
  2. [送信] をクリックします。

  3. ジョブの進行状況をモニタリングしてジョブ出力を表示するには、 Google Cloud コンソールの Dataproc の [ジョブ] ページに移動し、Job ID をクリックして [ジョブの詳細] ページを開きます。

  4. BigQuery でテーブルのメタデータを表示する

    1. Google Cloud コンソールで、[BigQuery] ページに移動します。

      BigQuery Studio に移動

    2. Iceberg テーブルのメタデータを表示します。

REST

Dataproc の jobs.submit API を使用して Spark SQL ジョブを Dataproc サービスに送信し、BigQuery Metastore にメタデータを含む Iceberg テーブルを作成できます。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • CLUSTER_NAME: Dataproc クラスタの名前。
  • REGION: クラスタが配置されている Compute Engine リージョン
  • CATALOG_NAME: Iceberg カタログ名。
  • BUCKETWAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
  • LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
  • jarFileUris: リストされている JAR は、BigQuery Metastore でテーブル メタデータを作成するために必要です。
  • properties: カタログのプロパティ
  • queryFileUri: Cloud Storage のバケットにコピーした iceberg-table.sql ジョブファイル。

HTTP メソッドと URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

リクエストの本文(JSON):

{
  "projectId": "PROJECT_ID",
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "statusHistory": [],
    "reference": {
      "jobId": "",
      "projectId": "PROJECT_ID"
    },
    "sparkSqlJob": {
      "properties": {
        "spark.sql.catalog."CATALOG_NAME": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog."CATALOG_NAME".catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
        "spark.sql.catalog."CATALOG_NAME".gcp_project": "PROJECT_ID",
        "spark.sql.catalog."CATALOG_NAME".gcp_location": "LOCATION",
        "spark.sql.catalog."CATALOG_NAME".warehouse": "gs://BUCKET/WAREHOUSE_FOLDER"
      },
      "jarFileUris": [
        "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"
      ],
      "scriptVariables": {},
      "queryFileUri": "gs://BUCKET/iceberg-table.sql"
    }
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "..."
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "..."
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "..."
  },
  "submittedBy": "USER",
  "sparkSqlJob": {
    "queryFileUri": "gs://BUCKET/iceberg-table.sql",
    "properties": {
      "spark.sql.catalog.USER_catalog": "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.USER_catalog.catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
      "spark.sql.catalog.USER_catalog.gcp_project": "PROJECT_ID",
      "spark.sql.catalog.USER_catalog.gcp_location": "LOCATION",
      "spark.sql.catalog.USER_catalog.warehouse": "gs://BUCKET/WAREHOUSE_FOLDER"
    },
    "jarFileUris": [
      "gs://BUCKET/iceberg-spark-runtime-3.5_2.12-1.5.2.jar",
      "gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"
    ]
  },
  "driverControlFilesUri": "gs://dataproc-...",
  "driverOutputResourceUri": "gs://dataproc-.../driveroutput",
  "jobUuid": "...",
  "region": "REGION"
}

ジョブの進行状況をモニタリングしてジョブ出力を表示するには、 Google Cloud コンソールの Dataproc の [ジョブ] ページに移動し、Job ID をクリックして [ジョブの詳細] ページを開きます。

BigQuery でテーブルのメタデータを表示する

  1. Google Cloud コンソールで、[BigQuery] ページに移動します。

    BigQuery Studio に移動

  2. Iceberg テーブルのメタデータを表示します。

Spark SQL CLI

次の手順では、Dataproc クラスタのマスタノードで実行されている Spark SQL CLI を使用して、テーブル メタデータが BigQuery Metastore に保存された Iceberg テーブルを作成する方法を示します。

  1. SSH を使用して、Dataproc クラスタのマスターノードに接続します。

  2. SSH セッション ターミナルで、vi または nano テキスト エディタを使用して、次のコマンドを iceberg-table.sql ファイルにコピーします。

    SET CATALOG_NAME = `CATALOG_NAME`;
    SET BUCKET = `BUCKET`;
    SET WAREHOUSE_FOLDER = `WAREHOUSE_FOLDER`;
    USE `${CATALOG_NAME}`;
    CREATE NAMESPACE IF NOT EXISTS `${CATALOG_NAME}`.example_namespace;
    DROP TABLE IF EXISTS `${CATALOG_NAME}`.example_namespace.example_table;
    CREATE TABLE `${CATALOG_NAME}`.example_namespace.example_table (id int, data string) USING ICEBERG LOCATION 'gs://`${BUCKET}`/`${WAREHOUSE_FOLDER}`';
    INSERT INTO `${CATALOG_NAME}`.example_namespace.example_table VALUES (1, 'first row');
    ALTER TABLE `${CATALOG_NAME}`.example_namespace.example_table ADD COLUMNS (newDoubleCol double);
    DESCRIBE TABLE `${CATALOG_NAME}`.example_namespace.example_table;
    

    次のように置き換えます。

    • CATALOG_NAME: Iceberg カタログ名。
    • BUCKETWAREHOUSE_FOLDER: Iceberg ウェアハウスに使用される Cloud Storage バケットとフォルダ。
  3. SSH セッション ターミナルで、次の spark-sql コマンドを実行してアイスバーグ テーブルを作成します。

    spark-sql \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/WAREHOUSE_FOLDER \
    -f iceberg-table.sql 
    

    次のように置き換えます。

    • PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
    • LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
  4. BigQuery でテーブルのメタデータを表示する

    1. Google Cloud コンソールで、[BigQuery] ページに移動します。

      BigQuery Studio に移動

    2. Iceberg テーブルのメタデータを表示します。

Zeppelin ウェブ インターフェース

次の手順では、Dataproc クラスタのマスターノードで実行されている Zeppelin ウェブ インターフェースを使用して、BigQuery Metastore にテーブル メタデータを格納する Iceberg テーブルを作成する方法を示します。

  1. Google Cloud コンソールで、Dataproc の [クラスタ] ページに移動します。

    [Dataproc クラスタ] ページに移動

  2. クラスタ名を選択して、[クラスタの詳細] ページを開きます。

  3. [ウェブ インターフェース] タブをクリックすると、クラスタにインストールされているデフォルト コンポーネントとオプション コンポーネントのウェブ インターフェースへのコンポーネント ゲートウェイ リンクのリストが表示されます。

  4. [Zeppelin] リンクをクリックして Zeppelin ウェブ インターフェースを開きます。

  5. Zeppelin ウェブ インターフェースで [匿名] メニューをクリックし、[インタープリタ] をクリックして [インタープリタ] ページを開きます。

  6. 次のように、2 つの jar を Zeppelin Spark インタープリタに追加します。

    1. Search interpreters ボックスに「Spark」と入力して、[Spark インタープリタ] セクションまでスクロールします。
    2. [edit] をクリックします。
    3. spark.jars フィールドに次の内容を貼り付けます。

      https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    4. [Spark インタープリタ] セクションの下部にある [保存] をクリックし、[OK] をクリックしてインタープリタを更新し、新しい設定で Spark インタープリタを再起動します。

  7. Zeppelin ノートブックのメニューで、[Create new note] をクリックします。

  8. [Create new note] ダイアログでノートブックの名前を入力し、デフォルトの spark インタープリタを受け入れます。[作成] をクリックしてノートブックを開きます。

  9. 変数を入力したら、次の PySpark コードを Zeppelin ノートブックにコピーします。

    %pyspark
    from pyspark.sql import SparkSession
    project_id = "PROJECT_ID" catalog = "CATALOG_NAME" namespace = "NAMESPACE" location = "LOCATION" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
    spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \ .config(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config(f"spark.sql.catalog.{catalog}.gcp_project", f"{project_id}") \ .config(f"spark.sql.catalog.{catalog}.gcp_location", f"{location}") \ .config(f"spark.sql.catalog.{catalog}.warehouse", f"{warehouse_dir}") \ .getOrCreate()
    spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;")
    \# Create table and display schema (without LOCATION) spark.sql("DROP TABLE IF EXISTS example_iceberg_table") spark.sql("CREATE TABLE example_iceberg_table (id int, data string) USING ICEBERG") spark.sql("DESCRIBE example_iceberg_table;")
    \# Insert table data. spark.sql("INSERT INTO example_iceberg_table VALUES (1, 'first row');")
    \# Alter table, then display schema. spark.sql("ALTER TABLE example_iceberg_table ADD COLUMNS (newDoubleCol double);")
    \# Select and display the contents of the table. spark.sql("SELECT * FROM example_iceberg_table").show()

    次のように置き換えます。

    • PROJECT_ID: Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
    • CATALOG_NAMENAMESPACE: Iceberg カタログ名と名前空間を組み合わせて、Iceberg テーブル(catalog.namespace.table_name)を識別します。
    • LOCATION: サポートされている BigQuery のロケーション。デフォルトのロケーションは「US」です。
    • BUCKETWAREHOUSE_DIRECTORY: Iceberg ウェアハウス ディレクトリとして使用される Cloud Storage バケットとフォルダ。
  10. 実行アイコンをクリックするか、Shift-Enter キーを押してコードを実行します。ジョブが完了すると、ステータス メッセージに「Spark Job Finished」と表示され、出力にテーブルの内容が表示されます。

  11. BigQuery でテーブルのメタデータを表示する

    1. Google Cloud コンソールで、[BigQuery] ページに移動します。

      BigQuery Studio に移動

    2. Iceberg テーブルのメタデータを表示します。