Spark ストアド プロシージャで BigQuery メタストアを使用する

このドキュメントでは、BigQuery Metastore で Apache Spark ストアド プロシージャを使用する方法について説明します。

始める前に

  1. Google Cloud プロジェクトで課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  2. BigQuery API と Dataflow API を有効にします。

    API を有効にする

  3. 省略可: 以下の詳細を確認します。

必要なロール

Spark ストアド プロシージャを使用するには、ストアド プロシージャに必要なロールを確認し、必要なロールを付与します。

メタデータ ストアとして BigQuery メタストアで Spark とストアド プロシージャを使用するのに必要な権限を取得するには、次の IAM ロールの付与を管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

ストアド プロシージャを作成して実行する

次の例は、BigQuery メタストアを使用してストアド プロシージャを作成し、実行する方法を示しています。

  1. [BigQuery] ページに移動します。

    BigQuery に移動

  2. クエリエディタで、CREATE PROCEDURE ステートメントのサンプルコードを追加します。

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigQuery metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    次のように置き換えます。

    • PROJECT_ID: Google Cloud プロジェクトの ID。
    • BQ_DATASET_ID: プロシージャを含む BigQuery データセットの ID。
    • PROCEDURE_NAME: 作成または置き換えるプロシージャの名前。
    • REGION: Spark 接続のロケーション。
    • LOCATION: BigQuery リソースのロケーション。
    • SPARK_CONNECTION_ID: Spark 接続の ID。
    • CATALOG_NAME: 使用しているカタログの名前。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダの URI。
    • NAMESPACE_NAME: 使用している Namespace。

次のステップ