BigQuery のテーブルで BigQuery メタストアを使用する

このドキュメントでは、BigQuery テーブルと Spark で BigQuery メタストアを使用する方法について説明します。

BigQuery Metastore を使用すると、BigQuery から標準(組み込み)テーブルApache Spark Iceberg 用の BigQuery テーブルBigLake 外部テーブルを作成して使用できます。

始める前に

  1. Google Cloud プロジェクトで課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  2. BigQuery API と Dataproc API を有効にします。

    API を有効にする

  3. 省略可: BigQuery メタストアの仕組みと、使用する理由を理解します。

必要なロール

メタデータ ストアとして BigQuery Metastore で Spark と Dataproc を使用するのに必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼します。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

テーブルに接続します。

  1. Google Cloud コンソールでデータセットを作成します。

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    次のように置き換えます。

    • PROJECT_ID: データセットを作成する Google Cloud プロジェクトの ID。
    • DATASET_NAME: データセットの名前。
  2. Cloud リソース接続を作成します。

  3. 標準の BigQuery テーブルを作成します。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    次のように置き換えます。

    • TABLE_NAME: テーブルの名前。
  4. 標準の BigQuery テーブルにデータを挿入します。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Apache Iceberg 用の BigQuery テーブルを作成します。

    たとえば、テーブルを作成するには、次の CREATE ステートメントを実行します。

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    次のように置き換えます。

    • ICEBERG_TABLE_NAME: Iceberg テーブルの名前。例: iceberg_managed_table
    • CONNECTION_NAME: 接続の名前。これは前の手順で作成しました。例: myproject.us.myconnection
    • STORAGE_URI: 完全修飾の Cloud Storage URI。例: gs://mybucket/table
  6. Apache Iceberg の BigQuery テーブルにデータを挿入します。

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. 読み取り専用の Iceberg テーブルを作成します。

    たとえば、読み取り専用の Iceberg テーブルを作成するには、次の CREATE ステートメントを実行します。

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    次のように置き換えます。

    • READONLY_ICEBERG_TABLE_NAME: 読み取り専用テーブルの名前。
    • BUCKET_PATH: 外部テーブルのデータを含む Cloud Storage バケットへのパス(['gs://bucket_name/[folder_name/]file_name'] 形式)。
  8. PySpark から、標準テーブル、マネージド Iceberg テーブル、読み取り専用 Iceberg テーブルに対してクエリを実行します。

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    次のように置き換えます。

    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダの URI。
    • CATALOG_NAME: 使用しているカタログの名前。
    • MATERIALIZATION_NAMESPACE: 一時結果を保存する Namespace。
  9. サーバーレス Spark を使用して PySpark スクリプトを実行します。

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    次のように置き換えます。

    • SCRIPT_PATH: バッチジョブが使用するスクリプトのパス。
    • PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。
    • REGION: ワークロードが実行されるリージョン。
    • YOUR_BUCKET: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットの gs:// URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1 など)を指定できます。

次のステップ