BigLake metastore 구성
이 문서에서는 Dataproc 또는 Google Cloud Serverless for Apache Spark를 사용하여 BigLake metastore를 구성하여 Apache Spark 또는 Apache Flink와 같은 오픈소스 엔진에서 작동하는 단일 공유 metastore를 만드는 방법을 설명합니다.
시작하기 전에
- Google Cloud 프로젝트에 결제를 사용 설정합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
BigQuery 및 Dataproc API를 사용 설정합니다.
선택사항: BigLake metastore의 작동 방식과 이를 사용해야 하는 이유를 알아봅니다.
필요한 역할
BigLake Metastore를 구성하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.
-
Dataproc 클러스터를 만듭니다.
프로젝트의 Compute Engine 기본 서비스 계정에 대한 Dataproc 작업자 (
roles/dataproc.worker
) -
BigLake metastore 테이블 만들기:
-
프로젝트의 Dataproc VM 서비스 계정에 대한 Dataproc 작업자 (
roles/dataproc.worker
) -
프로젝트의 Dataproc VM 서비스 계정에 대한 BigQuery 데이터 편집자 (
roles/bigquery.dataEditor
) -
프로젝트의 Dataproc VM 서비스 계정에 대한 스토리지 객체 관리자 (
roles/storage.objectAdmin
)
-
프로젝트의 Dataproc VM 서비스 계정에 대한 Dataproc 작업자 (
-
BigLake metastore 테이블 쿼리:
-
프로젝트에 대한 BigQuery 데이터 뷰어(
roles/bigquery.dataViewer
) -
프로젝트에 대한 BigQuery 사용자(
roles/bigquery.user
) -
프로젝트에 대한 스토리지 객체 뷰어(
roles/storage.objectViewer
)
-
프로젝트에 대한 BigQuery 데이터 뷰어(
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
Dataproc으로 메타스토어 구성
Spark 또는 Flink를 사용하여 Dataproc으로 BigLake metastore를 구성할 수 있습니다.
Spark
새 클러스터를 구성합니다. 새 Dataproc 클러스터를 만들려면 BigLake Metastore를 사용하는 데 필요한 설정이 포함된 다음
gcloud dataproc clusters create
명령어를 실행합니다.gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
다음을 바꿉니다.
CLUSTER_NAME
: Dataproc 클러스터의 이름입니다.PROJECT_ID
: 클러스터를 만드는 Google Cloud 프로젝트의 ID입니다.LOCATION
: 클러스터를 만드는 Compute Engine 리전입니다.
다음 방법 중 하나를 사용하여 Spark 작업을 제출합니다.
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
다음을 바꿉니다.
PROJECT_ID
: Dataproc 클러스터가 포함된 Google Cloud 프로젝트의 ID입니다.CLUSTER_NAME
: Spark SQL 작업을 실행하는 데 사용하는 Dataproc 클러스터의 이름입니다.REGION
: 클러스터가 있는 Compute Engine 리전입니다.LOCATION
: BigQuery 리소스의 위치입니다.CATALOG_NAME
: SQL 작업과 함께 사용할 Spark 카탈로그의 이름WAREHOUSE_DIRECTORY
: 데이터 웨어하우스가 포함된 Cloud Storage 폴더 이 값은gs://
로 시작합니다.SPARK_SQL_COMMAND
: 실행할 Spark SQL 쿼리입니다. 이 쿼리에는 리소스를 만드는 명령어가 포함되어 있습니다. 예를 들어 네임스페이스와 테이블을 만듭니다.
spark-sql CLI
Google Cloud 콘솔에서 VM 인스턴스 페이지로 이동합니다.
Dataproc VM 인스턴스에 연결하려면 Dataproc 클러스터 기본 VM 인스턴스 이름이 나열된 행에서 SSH를 클릭합니다. 클러스터 이름 다음에
-m
서픽스가 붙습니다. 출력은 다음과 비슷합니다.Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
터미널에서 다음 BigLake 메타 스토어 초기화 명령어를 실행합니다.
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
다음을 바꿉니다.
CATALOG_NAME
: SQL 작업과 함께 사용하는 Spark 카탈로그의 이름PROJECT_ID
: Spark 카탈로그가 연결되는 BigLake Metastore 카탈로그의 Google Cloud 프로젝트 ID입니다.LOCATION
: BigLake 메타 스토어의 Google Cloud 위치입니다.WAREHOUSE_DIRECTORY
: 데이터 웨어하우스가 포함된 Cloud Storage 폴더 이 값은gs://
로 시작합니다.
클러스터에 성공적으로 연결되면 Spark 터미널에
spark-sql
프롬프트가 표시되며, 이를 사용하여 Spark 작업을 제출할 수 있습니다.spark-sql (default)>
Flink
- 선택적 Flink 구성요소를 사용 설정하여 Dataproc 클러스터를 만들고 Dataproc
2.2
이상을 사용하고 있는지 확인합니다. Google Cloud 콘솔에서 VM 인스턴스 페이지로 이동합니다.
가상 머신 인스턴스 목록에서 SSH를 클릭하여 기본 Dataproc 클러스터 VM 인스턴스에 연결합니다. 이 인스턴스는 클러스터 이름 뒤에
-m
접미사가 붙은 것으로 표시됩니다.BigLake metastore용 Iceberg 커스텀 카탈로그 플러그인을 구성합니다.
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
YARN에서 Flink 세션을 시작합니다.
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Flink에서 카탈로그를 만듭니다.
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
다음을 바꿉니다.
CATALOG_NAME
: BigLake Metastore 카탈로그에 연결된 Flink 카탈로그 식별자입니다.WAREHOUSE_DIRECTORY
: 웨어하우스 디렉터리의 기본 경로입니다 (Flink가 파일을 만드는 Cloud Storage 폴더). 이 값은gs://
로 시작합니다.PROJECT_ID
: Flink 카탈로그가 연결되는 BigLake metastore 카탈로그의 프로젝트 ID입니다.LOCATION
: BigQuery 리소스의 위치입니다.
이제 Flink 세션이 BigLake Metastore에 연결되었으며 Flink SQL 명령어를 실행할 수 있습니다.
BigLake metastore 리소스 관리
BigLake Metastore에 연결되었으므로 BigLake Metastore에 저장된 메타데이터를 기반으로 리소스를 만들고 볼 수 있습니다.
예를 들어 대화형 Flink SQL 세션에서 다음 명령어를 실행하여 Iceberg 데이터베이스와 테이블을 만듭니다.
커스텀 Iceberg 카탈로그를 사용하세요.
USE CATALOG CATALOG_NAME;
CATALOG_NAME
을 Flink 카탈로그 식별자로 바꿉니다.데이터베이스를 만듭니다. 그러면 BigQuery에 데이터 세트가 생성됩니다.
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
DATABASE_NAME
을 새 데이터베이스의 이름으로 바꿉니다.생성한 데이터베이스를 사용합니다.
USE DATABASE_NAME;
Iceberg 테이블을 만듭니다. 다음은 예시 판매 테이블을 만듭니다.
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
ICEBERG_TABLE_NAME
을 새 테이블의 이름으로 바꿉니다.테이블 메타데이터를 보려면 다음 단계를 따르세요.
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
데이터베이스의 테이블을 나열합니다.
SHOW TABLES;
테이블에 데이터 수집
이전 섹션에서 Iceberg 테이블을 만든 후 Flink DataGen을 데이터 소스로 사용하여 테이블에 실시간 데이터를 수집할 수 있습니다. 다음 단계는 이 워크플로의 예시입니다.
DataGen을 사용하여 임시 테이블을 만듭니다.
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
다음을 바꿉니다.
DATABASE_NAME
: 임시 테이블을 저장할 데이터베이스의 이름입니다.TEMP_TABLE_NAME
: 임시 테이블의 이름ICEBERG_TABLE_NAME
: 이전 섹션에서 만든 Iceberg 테이블의 이름입니다.
동시 로드를 1로 설정합니다.
SET 'parallelism.default' = '1';
체크포인트 간격을 설정합니다.
SET 'execution.checkpointing.interval' = '10second';
체크포인트를 설정합니다.
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
실시간 스트리밍 작업을 시작합니다.
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
출력은 다음과 비슷합니다.
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
스트리밍 작업의 상태를 확인하려면 다음 단계를 따르세요.
Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.
클러스터를 선택합니다.
웹 인터페이스 탭을 클릭합니다.
YARN ResourceManager 링크를 클릭합니다.
YARN ResourceManager 인터페이스에서 Flink 세션을 찾아 추적 UI 아래의 ApplicationMaster 링크를 클릭합니다.
상태 열에서 작업 상태가 실행 중인지 확인합니다.
Flink SQL 클라이언트에서 스트리밍 데이터를 쿼리합니다.
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
BigQuery에서 스트리밍 데이터를 쿼리합니다.
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Flink SQL 클라이언트에서 스트리밍 작업을 종료합니다.
STOP JOB 'JOB_ID';
JOB_ID
를 스트리밍 작업을 만들 때 출력에 표시된 작업 ID로 바꿉니다.
Apache Spark용 서버리스로 메타스토어 구성
Spark SQL 또는 PySpark를 사용하여 Apache Spark용 서버리스로 BigLake metastore를 구성할 수 있습니다.
Spark SQL
BigLake metastore에서 실행하려는 Spark SQL 명령어로 SQL 파일을 만듭니다. 예를 들어 다음 명령어는 네임스페이스와 테이블을 만듭니다.
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
다음을 바꿉니다.
CATALOG_NAME
: Spark 테이블을 참조하는 카탈로그 이름입니다.NAMESPACE_NAME
: Spark 테이블을 참조하는 네임스페이스 이름입니다.TABLE_NAME
: Spark 테이블의 테이블 이름입니다.WAREHOUSE_DIRECTORY
: 데이터 웨어하우스가 저장된 Cloud Storage 폴더의 URI
다음
gcloud dataproc batches submit spark-sql
명령어를 실행하여 Spark SQL 일괄 작업을 제출합니다.gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
다음을 바꿉니다.
SQL_SCRIPT_PATH
: 일괄 작업에서 사용하는 SQL 파일의 경로입니다.PROJECT_ID
: 일괄 작업을 실행할 Google Cloud 프로젝트의 ID입니다.REGION
: 워크로드가 실행되는 리전SUBNET_NAME
(선택사항):REGION
에서 세션 서브넷 요구사항을 충족하는 VPC 서브넷의 이름입니다.BUCKET_PATH
: 워크로드 종속 항목을 업로드할 Cloud Storage 버킷의 위치.WAREHOUSE_DIRECTORY
는 이 버킷에 있습니다. 버킷의gs://
URI 프리픽스는 필요하지 않습니다. 버킷 경로 또는 버킷 이름을 지정할 수 있습니다(예:mybucketname1
).LOCATION
: 일괄 작업을 실행할 위치입니다.
Spark 일괄 작업을 제출하는 방법에 관한 자세한 내용은 Spark 일괄 워크로드 실행을 참고하세요.
PySpark
BigLake metastore에서 실행하려는 PySpark 명령어로 Python 파일을 만듭니다.
예를 들어 다음 명령어는 BigLake metastore에 저장된 Iceberg 테이블과 상호작용하도록 Spark 환경을 설정합니다. 그런 다음 이 명령어는 새 네임스페이스와 해당 네임스페이스 내에 Iceberg 테이블을 만듭니다.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
다음을 바꿉니다.
PROJECT_ID
: 일괄 작업을 실행할 Google Cloud 프로젝트의 ID입니다.LOCATION
: BigQuery 리소스가 있는 위치입니다.CATALOG_NAME
: Spark 테이블을 참조하는 카탈로그 이름TABLE_NAME
: Spark 테이블의 테이블 이름WAREHOUSE_DIRECTORY
: 데이터 웨어하우스가 저장된 Cloud Storage 폴더의 URI입니다.NAMESPACE_NAME
: Spark 테이블을 참조하는 네임스페이스 이름
다음
gcloud dataproc batches submit pyspark
명령을 사용하여 일괄 작업을 제출합니다.gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
다음을 바꿉니다.
PYTHON_SCRIPT_PATH
: 일괄 작업에서 사용하는 Python 스크립트의 경로PROJECT_ID
: 일괄 작업을 실행할 Google Cloud 프로젝트의 ID입니다.REGION
: 워크로드가 실행되는 리전BUCKET_PATH
: 워크로드 종속 항목을 업로드할 Cloud Storage 버킷의 위치. 버킷의gs://
URI 프리픽스는 필요하지 않습니다. 버킷 경로 또는 버킷 이름을 지정할 수 있습니다(예:mybucketname1
).
PySpark 일괄 작업 제출에 대한 자세한 내용은 PySpark gcloud 참조를 확인하세요.