BigQuery 스튜디오에서 Spark와 함께 BigQuery 메타스토어 사용

이 문서에서는 BigQuery 스튜디오에서 Spark와 함께 BigQuery 메타스토어를 사용하는 방법을 설명합니다.

BigQuery 스튜디오에서 Spark를 사용하여 BigQuery 스튜디오에서 Apache Spark로 Iceberg 테이블을 만들 수 있습니다. 테이블을 만든 후 Spark에서 데이터를 쿼리할 수 있습니다. SQL을 사용하여 BigQuery 콘솔에서 동일한 데이터를 쿼리할 수도 있습니다.

시작하기 전에

  1. 다음 가입 양식을 통해 BigQuery Studio에서 Spark에 대한 액세스 권한을 요청합니다.
  2. Google Cloud 프로젝트에 결제를 사용 설정합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
  3. BigQuery 및 Dataflow API를 사용 설정합니다.

    API 사용 설정

  4. 선택사항: BigQuery 메타스토어의 작동 방식과 이를 사용해야 하는 이유를 알아봅니다.

필요한 역할

BigQuery 스튜디오에서 Spark 노트북을 사용하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.

  • Spark에서 BigQuery 스튜디오 메타스토어 테이블을 만듭니다. 프로젝트의 BigQuery 데이터 편집기 (roles/bigquery.dataEditor)
  • Spark의 노트북 메타스토어 테이블에서 Spark 세션을 만듭니다. 사용자 계정의 Dataproc Worker (roles/dataproc.serverlessEditor)

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

노트북에 연결

다음 예에서는 BigQuery 메타스토어에 저장된 Iceberg 테이블과 상호작용하도록 Spark 노트북을 구성하는 방법을 보여줍니다.

이 예에서는 Spark 세션을 설정하고 네임스페이스와 테이블을 만들고 테이블에 데이터를 추가한 후 BigQuery 스튜디오에서 데이터를 쿼리합니다.

  1. BigQuery 스튜디오에서 Spark 노트북을 만듭니다.

  2. Apache Spark 노트북에 필요한 Apache Spark 가져오기를 포함합니다.

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. 카탈로그, 네임스페이스, 창고 디렉터리를 정의합니다.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    다음을 바꿉니다.

    • CATALOG_NAME: Spark 테이블을 참조하는 카탈로그 이름입니다.
    • NAMESPACE_NAME: Spark 테이블을 참조하는 네임스페이스 라벨입니다.
    • WAREHOUSE_DIRECTORY: 데이터 웨어하우스가 저장된 Cloud Storage 폴더의 URI입니다.
  4. Spark 세션을 초기화합니다.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    다음을 바꿉니다.

    • NETWORK_NAME: Spark 코드를 실행하는 네트워크의 이름 또는 URI입니다. 지정하지 않으면 default 네트워크가 사용됩니다.
    • PROJECT_ID: Spark 코드를 실행하는 Google Cloud 프로젝트의 ID입니다.
    • LOCATION: Spark 작업을 실행할 위치입니다.
  5. 카탈로그 및 네임스페이스를 만듭니다.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. 테이블을 만듭니다.

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    다음을 바꿉니다.

    • TABLE_NAME: Iceberg 테이블의 이름입니다.
  7. Spark에서 데이터 조작 언어 (DML)를 실행합니다.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Spark에서 데이터 정의 언어 (DDL)를 실행합니다.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. 테이블에 데이터를 삽입합니다.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Spark에서 테이블을 쿼리합니다.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. 새 데이터 세트에서 Google Cloud 콘솔에서 테이블을 쿼리합니다.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

다음 단계