Dataproc Serverless で BigQuery Metastore を使用する
このドキュメントでは、 Dataproc Serverless で BigQuery Metastore を使用する方法について説明します。
始める前に
- Google Cloud プロジェクトで課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
BigQuery API と Dataproc API を有効にします。
省略可: BigQuery メタストアの仕組みと、使用する理由を理解します。
必要なロール
BigQuery Metastore をメタデータ ストアとして使用して Spark と Dataproc Serverless を使用するのに必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼してください。
-
Spark で BigQuery Metastore テーブルを作成します。
-
プロジェクトの Dataproc サーバーレス サービス アカウントに対する Dataproc ワーカー (
roles/dataproc.worker
) -
プロジェクト内の Dataproc Serverless サービス アカウントに対する BigQuery データ編集者 (
roles/bigquery.dataEditor
) -
プロジェクト内の Dataproc サーバーレス サービス アカウントに対する ストレージ オブジェクト管理者 (
roles/storage.objectAdmin
)
-
プロジェクトの Dataproc サーバーレス サービス アカウントに対する Dataproc ワーカー (
-
BigQuery で BigQuery メタストア テーブルに対してクエリを実行します。
-
プロジェクトに対する BigQuery データ閲覧者 (
roles/bigquery.dataViewer
) -
プロジェクトに対する BigQuery ユーザー (
roles/bigquery.user
) -
プロジェクトに対するストレージ オブジェクト閲覧者 (
roles/storage.objectViewer
)
-
プロジェクトに対する BigQuery データ閲覧者 (
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
全般的なワークフロー
Dataproc Serverless で BigQuery を使用するには、次の一般的な手順を行います。
- BigQuery メタストアで実行するコマンドを含むファイルを作成します。
- 任意のオープンソース ソフトウェア エンジンに接続します。
- Spark SQL や PySpark など、任意の方法でバッチジョブを送信します。
BigQuery Metastore を Spark に接続する
次の手順では、Dataproc Serverless を BigQuery Metastore に接続する方法を示します。
SparkSQL
Spark SQL バッチジョブを送信する手順は次のとおりです。
BigQuery Metastore で実行する Spark SQL コマンドを含む SQL ファイルを作成します。たとえば、このコマンドは Namespace とテーブルを作成します。
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
次のように置き換えます。
CATALOG_NAME
: Spark テーブルを参照するカタログ名。NAMESPACE_NAME
: Spark テーブルを参照する Namespace 名。TABLE_NAME
: Spark テーブルのテーブル名。WAREHOUSE_DIRECTORY
: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
次の
gcloud dataproc batches submit spark-sql
gcloud CLI コマンドを実行して、Spark SQL バッチジョブを送信します。gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
次のように置き換えます。
SQL_SCRIPT_PATH
: バッチジョブが使用する SQL ファイルのパス。PROJECT_ID
: バッチジョブを実行する Google Cloud プロジェクトの ID。REGION
: ワークロードが実行されるリージョン。SUBNET_NAME
: 省略可。限定公開の Google アクセスが有効で、他のセッション サブネットの要件を満たしているREGION
の VPC サブネットの名前。LOCATION
: バッチジョブを実行するロケーション。BUCKET_PATH
: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。WAREHOUSE_FOLDER
はこのバケットにあります。バケットのgs://
URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1
など)を指定できます。
Spark バッチジョブの送信の詳細については、Spark バッチ ワークロードを実行するをご覧ください。
PySpark
PySpark バッチジョブを送信する手順は次のとおりです。
BigQuery Metastore で実行する PySpark コマンドを含む Python ファイルを作成します。
たとえば、次のコマンドは、BigQuery メタストアに保存されている Iceberg テーブルを操作する Spark 環境を設定します。このコマンドは、新しい Namespace とその Namespace 内の Iceberg テーブルを作成します。
from pyspark.sql import SparkSession spark = SparkSession.builder .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
次のように置き換えます。
PROJECT_ID
: バッチジョブを実行する Google Cloud プロジェクトの ID。LOCATION
: BigQuery リソースが配置されているロケーション。CATALOG_NAME
: Spark テーブルを参照するカタログ名。TABLE_NAME
: Spark テーブルのテーブル名。WAREHOUSE_DIRECTORY
: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。NAMESPACE_NAME
: Spark テーブルを参照する Namespace 名。
次の
gcloud dataproc batches submit pyspark
コマンドを使用して、バッチジョブを送信します。gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
次のように置き換えます。
PYTHON_SCRIPT_PATH
: バッチジョブが使用する Python スクリプトのパス。PROJECT_ID
: バッチジョブを実行する Google Cloud プロジェクトの ID。REGION
: ワークロードが実行されるリージョン。BUCKET_PATH
: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットのgs://
URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1
など)を指定できます。
PySpark バッチジョブの送信の詳細については、PySpark gcloud リファレンスをご覧ください。