Spark 저장 프로시저와 함께 BigQuery 메타스토어 사용
이 문서에서는 BigQuery 메타스토어에서 Apache Spark 저장 프로시저를 사용하는 방법을 설명합니다.
시작하기 전에
- Google Cloud 프로젝트에 결제를 사용 설정합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
BigQuery 및 Dataflow API를 사용 설정합니다.
선택사항: 다음에 대해 자세히 알아보세요.
- BigQuery 메타스토어의 작동 방식과 이를 사용해야 하는 이유를 알아봅니다.
- BigQuery Spark 저장 프로시저의 작동 방식을 알아보고 작업을 시작하기 전에 완료합니다.
필요한 역할
Spark 저장 프로시저를 사용하려면 저장 프로시저에 필요한 역할을 검토하고 필요한 역할을 부여합니다.
BigQuery 메타스토어를 메타데이터 저장소로 사용하여 Spark 및 저장 프로시저를 사용하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.
-
Spark에서 BigQuery 메타스토어 테이블을 만듭니다.
-
프로젝트의 Spark 연결 서비스 계정에 대한 BigQuery 데이터 편집자 (
roles/bigquery.dataEditor
) -
프로젝트의 Spark 연결 서비스 계정에 대한 스토리지 객체 관리자 (
roles/storage.objectAdmin
)
-
프로젝트의 Spark 연결 서비스 계정에 대한 BigQuery 데이터 편집자 (
-
BigQuery에서 BigQuery 메타스토어 테이블을 쿼리합니다.
-
프로젝트에 대한 BigQuery 데이터 뷰어 (
roles/bigquery.dataViewer
) -
프로젝트에 대한 BigQuery 사용자 (
roles/bigquery.user
) -
프로젝트의 스토리지 객체 뷰어 (
roles/storage.objectViewer
)
-
프로젝트에 대한 BigQuery 데이터 뷰어 (
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
저장 프로시저 만들기 및 실행
다음 예에서는 BigQuery 메타스토어로 저장 프로시저를 만들고 실행하는 방법을 보여줍니다.
BigQuery 페이지로 이동합니다.
쿼리 편집기에서
CREATE PROCEDURE
문에 다음 샘플 코드를 추가합니다.CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigQuery metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
다음을 바꿉니다.
PROJECT_ID
: Google Cloud 프로젝트의 ID입니다.BQ_DATASET_ID
: 프로시저가 포함된 BigQuery의 데이터 세트 ID입니다.PROCEDURE_NAME
: 만들거나 대체하는 프로시저의 이름입니다.REGION
: Spark 연결의 위치입니다.LOCATION
: BigQuery 리소스의 위치입니다.SPARK_CONNECTION_ID
: Spark 연결의 ID입니다.CATALOG_NAME
: 사용 중인 카탈로그의 이름입니다.WAREHOUSE_DIRECTORY
: 데이터 웨어하우스가 포함된 Cloud Storage 폴더의 URI입니다.NAMESPACE_NAME
: 사용 중인 네임스페이스입니다.