BigQuery のテーブルで BigQuery メタストアを使用する
このドキュメントでは、BigQuery テーブルと Spark で BigQuery メタストアを使用する方法について説明します。
BigQuery metastore を使用すると、BigQuery から標準(組み込み)テーブル、Apache Iceberg 用の BigQuery テーブル、BigLake 外部テーブルを作成して使用できます。
始める前に
- Google Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
BigQuery API と Dataproc API を有効にします。
省略可: BigQuery metastore の仕組みと、使用すべき理由を理解します。
省略可: Iceberg を使用している場合は、次のコマンドを使用して、BigQuery メタストア Iceberg テーブルと BigQuery の他の類似テーブル バリエーションを比較します。
標準の BigQuery テーブル | BigLake 外部テーブル | Apache Iceberg 用の BigLake 外部テーブル(BigLake Iceberg テーブル) | BigQuery Metastore Iceberg テーブル(プレビュー) | Apache Iceberg 用の BigQuery テーブル(Iceberg マネージド テーブル / BigQuery Iceberg テーブル)(プレビュー) | |
---|---|---|---|---|---|
主な機能 | フルマネージド エクスペリエンス | オープンソース エンジンと BigQuery エンジン全体でガバナンス(きめ細かいアクセス制御)と機能が提供される | BigLake 外部テーブルの機能 + データの整合性、スキーマの更新。Spark や他のオープンエンジンからは作成できません。 | BigLake Iceberg テーブルの機能 + 外部エンジンからの変更可能。DDL または bq コマンドライン ツールでは作成できません。 | BigLake Iceberg テーブルの機能 + オープンデータとメタデータによる低い管理オーバーヘッド |
データ ストレージ | BigQuery マネージド ストレージ | ユーザー管理バケットでホストされているオープン形式のデータ | |||
オープンモデル | BigQuery Storage Read API(コネクタ経由) | オープン ファイル形式(Parquet) | オープン ライブラリ(Iceberg) | オープンソース互換(Iceberg メタデータ スナップショット) | |
ガバナンス | 統合された BigQuery ガバナンス | ||||
書き込み(DML とストリーミング) | BigQuery コネクタ、API、高スループット DML、CDC を使用。 | 外部エンジン経由の書き込みのみ | BigQuery コネクタ、API、高スループット DML、CDC を使用。 |
必要なロール
メタデータ ストアとして BigQuery Metastore で Spark と Dataproc を使用するのに必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼します。
-
Spark で BigQuery metastore テーブルを作成します。
-
プロジェクトの Dataproc サーバーレス サービス アカウントに対する Dataproc ワーカー (
roles/dataproc.worker
) -
プロジェクト内の Dataproc Serverless サービス アカウントに対する BigQuery データ編集者 (
roles/bigquery.dataEditor
) -
プロジェクト内の Dataproc サーバーレス サービス アカウントに対する ストレージ オブジェクト管理者 (
roles/storage.objectAdmin
)
-
プロジェクトの Dataproc サーバーレス サービス アカウントに対する Dataproc ワーカー (
-
BigQuery で BigQuery metastore テーブルに対してクエリを実行します。
-
プロジェクトに対する BigQuery データ閲覧者 (
roles/bigquery.dataViewer
) -
プロジェクトに対する BigQuery ユーザー (
roles/bigquery.user
) -
プロジェクトに対するストレージ オブジェクト閲覧者 (
roles/storage.objectViewer
)
-
プロジェクトに対する BigQuery データ閲覧者 (
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
テーブルに接続します。
Google Cloud コンソールでデータセットを作成します。
CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;
次のように置き換えます。
PROJECT_ID
: データセットを作成するプロジェクトの ID。 Google CloudDATASET_NAME
: データセットの名前。
Cloud リソース接続を作成します。
標準の BigQuery テーブルを作成します。
CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);
次のように置き換えます。
TABLE_NAME
: テーブルの名前。
標準の BigQuery テーブルにデータを挿入します。
INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
Apache Iceberg 用の BigQuery テーブルを作成します。
たとえば、テーブルを作成するには、次の
CREATE
ステートメントを実行します。CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME( name STRING,id INT64 ) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( file_format = 'PARQUET', table_format = 'ICEBERG', storage_uri = 'STORAGE_URI');
次のように置き換えます。
ICEBERG_TABLE_NAME
: Iceberg テーブルの名前。例:iceberg_managed_table
CONNECTION_NAME
: 接続の名前。これは前の手順で作成しました。例:myproject.us.myconnection
STORAGE_URI
: 完全修飾の Cloud Storage URI。例:gs://mybucket/table
Apache Iceberg の BigQuery テーブルにデータを挿入します。
INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
読み取り専用の Iceberg テーブルを作成します。
たとえば、読み取り専用の Iceberg テーブルを作成するには、次の
CREATE
ステートメントを実行します。CREATE OR REPLACE EXTERNAL TABLE `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME WITH CONNECTION `CONNECTION_NAME` OPTIONS ( format = 'ICEBERG', uris = ['BUCKET_PATH'], require_partition_filter = FALSE);
次のように置き換えます。
READONLY_ICEBERG_TABLE_NAME
: 読み取り専用テーブルの名前。BUCKET_PATH
: 外部テーブルのデータを含む Cloud Storage バケットへのパス(['gs://bucket_name/[folder_name/]file_name']
形式)。
PySpark から、標準テーブル、マネージド Iceberg テーブル、読み取り専用 Iceberg テーブルに対してクエリを実行します。
from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.conf.set("viewsEnabled","true") # Use the bqms_catalog spark.sql("USE `CATALOG_NAME`;") spark.sql("USE NAMESPACE DATASET_NAME;") # Configure spark for temp results spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") # List the tables in the dataset df = spark.sql("SHOW TABLES;") df.show(); # Query a standard BigQuery table sql = """SELECT * FROM DATASET_NAME.TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Managed Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Readonly Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
次のように置き換えます。
WAREHOUSE_DIRECTORY
: データ ウェアハウスが格納されている Cloud Storage フォルダの URI。CATALOG_NAME
: 使用しているカタログの名前。MATERIALIZATION_NAMESPACE
: 一時結果を保存する Namespace。
サーバーレス Spark を使用して PySpark スクリプトを実行します。
gcloud dataproc batches submit pyspark SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=YOUR_BUCKET \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar
次のように置き換えます。
SCRIPT_PATH
: バッチジョブで使用するスクリプトのパス。PROJECT_ID
: バッチジョブを実行する Google Cloud プロジェクトの ID。REGION
: ワークロードが実行されるリージョン。YOUR_BUCKET
: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットのgs://
URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1
など)を指定できます。
次のステップ
- BigQuery metastore のオプション機能を設定します。