BigQuery のテーブルで BigQuery メタストアを使用する

このドキュメントでは、BigQuery テーブルと Spark で BigQuery メタストアを使用する方法について説明します。

BigQuery metastore を使用すると、BigQuery から標準(組み込み)テーブルApache Iceberg 用の BigQuery テーブルBigLake 外部テーブルを作成して使用できます。

始める前に

  1. Google Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  2. BigQuery API と Dataproc API を有効にします。

    API を有効にする

  3. 省略可: BigQuery metastore の仕組みと、使用すべき理由を理解します。

  4. 省略可: Iceberg を使用している場合は、次のコマンドを使用して、BigQuery メタストア Iceberg テーブルと BigQuery の他の類似テーブル バリエーションを比較します。

標準の BigQuery テーブル BigLake 外部テーブル Apache Iceberg 用の BigLake 外部テーブル(BigLake Iceberg テーブル) BigQuery Metastore Iceberg テーブルプレビュー Apache Iceberg 用の BigQuery テーブル(Iceberg マネージド テーブル / BigQuery Iceberg テーブル)(プレビュー
主な機能 フルマネージド エクスペリエンス オープンソース エンジンと BigQuery エンジン全体でガバナンス(きめ細かいアクセス制御)と機能が提供される BigLake 外部テーブルの機能 + データの整合性、スキーマの更新。Spark や他のオープンエンジンからは作成できません。 BigLake Iceberg テーブルの機能 + 外部エンジンからの変更可能。DDL または bq コマンドライン ツールでは作成できません。 BigLake Iceberg テーブルの機能 + オープンデータとメタデータによる低い管理オーバーヘッド
データ ストレージ BigQuery マネージド ストレージ ユーザー管理バケットでホストされているオープン形式のデータ
オープンモデル BigQuery Storage Read API(コネクタ経由) オープン ファイル形式(Parquet) オープン ライブラリ(Iceberg) オープンソース互換(Iceberg メタデータ スナップショット)
ガバナンス 統合された BigQuery ガバナンス
書き込み(DML とストリーミング) BigQuery コネクタ、API、高スループット DML、CDC を使用。 外部エンジン経由の書き込みのみ BigQuery コネクタ、API、高スループット DML、CDC を使用。

必要なロール

メタデータ ストアとして BigQuery Metastore で Spark と Dataproc を使用するのに必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼します。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

テーブルに接続します。

  1. Google Cloud コンソールでデータセットを作成します。

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    次のように置き換えます。

    • PROJECT_ID: データセットを作成するプロジェクトの ID。 Google Cloud
    • DATASET_NAME: データセットの名前。
  2. Cloud リソース接続を作成します。

  3. 標準の BigQuery テーブルを作成します。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    次のように置き換えます。

    • TABLE_NAME: テーブルの名前。
  4. 標準の BigQuery テーブルにデータを挿入します。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Apache Iceberg 用の BigQuery テーブルを作成します。

    たとえば、テーブルを作成するには、次の CREATE ステートメントを実行します。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    次のように置き換えます。

    • ICEBERG_TABLE_NAME: Iceberg テーブルの名前。例: iceberg_managed_table
    • CONNECTION_NAME: 接続の名前。これは前の手順で作成しました。例: myproject.us.myconnection
    • STORAGE_URI: 完全修飾の Cloud Storage URI。例: gs://mybucket/table
  6. Apache Iceberg の BigQuery テーブルにデータを挿入します。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. 読み取り専用の Iceberg テーブルを作成します。

    たとえば、読み取り専用の Iceberg テーブルを作成するには、次の CREATE ステートメントを実行します。

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    次のように置き換えます。

    • READONLY_ICEBERG_TABLE_NAME: 読み取り専用テーブルの名前。
    • BUCKET_PATH: 外部テーブルのデータを含む Cloud Storage バケットへのパス(['gs://bucket_name/[folder_name/]file_name'] 形式)。
  8. PySpark から、標準テーブル、マネージド Iceberg テーブル、読み取り専用 Iceberg テーブルに対してクエリを実行します。

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    次のように置き換えます。

    • WAREHOUSE_DIRECTORY: データ ウェアハウスが格納されている Cloud Storage フォルダの URI。
    • CATALOG_NAME: 使用しているカタログの名前。
    • MATERIALIZATION_NAMESPACE: 一時結果を保存する Namespace。
  9. サーバーレス Spark を使用して PySpark スクリプトを実行します。

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar

    次のように置き換えます。

    • SCRIPT_PATH: バッチジョブで使用するスクリプトのパス。
    • PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。
    • REGION: ワークロードが実行されるリージョン。
    • YOUR_BUCKET: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットの gs:// URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1 など)を指定できます。

次のステップ