Spark ストアド プロシージャで BigQuery メタストアを使用する
このドキュメントでは、BigQuery Metastore で Apache Spark ストアド プロシージャを使用する方法について説明します。
始める前に
- Google Cloud プロジェクトで課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
BigQuery API と Dataflow API を有効にします。
省略可: 以下の詳細を確認します。
- BigQuery Metastore の仕組みと、BigQuery Metastore を使用する理由を理解する。
- BigQuery Spark ストアド プロシージャの仕組みを学び、開始前に行うタスクを完了します。
必要なロール
Spark ストアド プロシージャを使用するには、ストアド プロシージャに必要なロールを確認し、必要なロールを付与します。
メタデータ ストアとして BigQuery メタストアで Spark とストアド プロシージャを使用するのに必要な権限を取得するには、次の IAM ロールの付与を管理者に依頼してください。
-
Spark で BigQuery Metastore テーブルを作成します。
-
プロジェクト内の Spark Connection サービス アカウントに対する BigQuery データ編集者 (
roles/bigquery.dataEditor
) -
プロジェクトの Spark Connection サービス アカウントに対するストレージ オブジェクト管理者 (
roles/storage.objectAdmin
)
-
プロジェクト内の Spark Connection サービス アカウントに対する BigQuery データ編集者 (
-
BigQuery で BigQuery メタストア テーブルに対してクエリを実行します。
-
プロジェクトに対する BigQuery データ閲覧者 (
roles/bigquery.dataViewer
) -
プロジェクトに対する BigQuery ユーザー (
roles/bigquery.user
) -
プロジェクトに対するストレージ オブジェクト閲覧者 (
roles/storage.objectViewer
)
-
プロジェクトに対する BigQuery データ閲覧者 (
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
ストアド プロシージャを作成して実行する
次の例は、BigQuery メタストアを使用してストアド プロシージャを作成し、実行する方法を示しています。
[BigQuery] ページに移動します。
クエリエディタで、
CREATE PROCEDURE
ステートメントのサンプルコードを追加します。CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigQuery metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
次のように置き換えます。
PROJECT_ID
: Google Cloud プロジェクトの ID。BQ_DATASET_ID
: プロシージャを含む BigQuery データセットの ID。PROCEDURE_NAME
: 作成または置き換えるプロシージャの名前。REGION
: Spark 接続のロケーション。LOCATION
: BigQuery リソースのロケーション。SPARK_CONNECTION_ID
: Spark 接続の ID。CATALOG_NAME
: 使用しているカタログの名前。WAREHOUSE_DIRECTORY
: データ ウェアハウスを含む Cloud Storage フォルダの URI。NAMESPACE_NAME
: 使用している Namespace。