BigQuery Studio で Spark と BigQuery メタストアを使用する

このドキュメントでは、BigQuery Studio で Spark で BigQuery メタストアを使用する方法について説明します。

BigQuery Studio の Spark を使用して、BigQuery Studio で Apache Spark を使用して Iceberg テーブルを作成できます。テーブルを作成したら、Spark からデータをクエリできます。SQL を使用して BigQuery コンソールから同じデータをクエリすることもできます。

始める前に

  1. 次の登録フォームから、BigQuery Studio で Spark へのアクセスをリクエストします。
  2. Google Cloud プロジェクトで課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  3. BigQuery API と Dataflow API を有効にします。

    API を有効にする

  4. 省略可: BigQuery メタストアの仕組みと、使用する理由を理解します。

必要なロール

BigQuery Studio で Spark ノートブックを使用するのに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • Spark で BigQuery Studio メタストア テーブルを作成する: プロジェクトに対する BigQuery データ編集者 roles/bigquery.dataEditor
  • Spark のノートブック メタストア テーブルから Spark セッションを作成します。ユーザー アカウントに対する Dataproc ワーカー roles/dataproc.serverlessEditor

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

ノートブックに接続する

次の例は、BigQuery メタストアに保存されている Iceberg テーブルを操作するように Spark ノートブックを構成する方法を示しています。

この例では、Spark セッションを設定し、名前空間とテーブルを作成し、テーブルにデータを追加してから、BigQuery Studio でデータをクエリします。

  1. BigQuery Studio で Spark ノートブックを作成します。

  2. Apache Spark ノートブックに、必要な Apache Spark インポートを含めます。

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. カタログ、Namespace、ウェアハウス ディレクトリを定義します。

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    次のように置き換えます。

    • CATALOG_NAME: Spark テーブルを参照するカタログ名。
    • NAMESPACE_NAME: Spark テーブルを参照する Namespace ラベル。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
  4. Spark セッションを初期化します。

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    次のように置き換えます。

    • NETWORK_NAME: Spark コードを実行するネットワークの名前または URI。指定しない場合、default ネットワークが使用されます。
    • PROJECT_ID: Spark コードを実行している Google Cloud プロジェクトの ID。
    • LOCATION: Spark ジョブを実行するロケーション
  5. カタログと Namespace を作成します。

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. テーブルを作成します。

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    次のように置き換えます。

    • TABLE_NAME: Iceberg テーブルの名前。
  7. Spark からデータ操作言語(DML)を実行する。

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Spark からデータ定義言語(DDL)を実行します。

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. テーブルにデータを挿入します。

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Spark からテーブルに対してクエリを実行します。

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. 新しいデータセットで、 Google Cloud コンソールからテーブルに対してクエリを実行します。

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

次のステップ