Apache Spark의 저장 프로시져 작업

이 문서는 BigQuery에서 Spark용 저장 프로시져를 만들고 호출하는 데이터 엔지니어, 데이터 과학자, 데이터 분석가를 대상으로 작성되었습니다.

BigQuery를 사용하여 Python, Java, Scala로 작성된 Spark 저장 프로시져를 만들고 실행할 수 있습니다. 그런 다음 SQL 저장 프로시져를 실행할 때와 비슷하게 GoogleSQL 쿼리를 사용하여 BigQuery에서 이 저장 프로시져를 실행할 수 있습니다.

시작하기 전에

Spark의 저장 프로시져를 만들려면 관리자에게 Spark 연결을 만들고 공유해 달라고 요청합니다. 관리자는 연결과 관련된 서비스 계정에 필수 Identity and Access Management(IAM) 권한도 부여해야 합니다.

필요한 역할

이 문서의 모든 태스크를 수행하는 데 필요한 권한을 얻으려면 관리자에게 다음의 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이러한 사전 정의된 역할에는 이 문서의 작업을 수행하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

이 문서의 태스크를 수행하려면 다음 권한이 필요합니다.

  • 연결 만들기:
    • bigquery.connections.create
    • bigquery.connections.list
    • Spark의 저장 프로시져 만들기:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
    • Spark의 저장 프로시져 호출하기:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

위치 고려사항

저장 프로시져는 연결과 동일한 위치에서 실행되므로 연결과 동일한 위치에 Spark의 저장 프로시져를 만들어야 합니다. 예를 들어 US 멀티 리전에 저장 프로시져를 만들려면 US 멀티 리전에 있는 연결을 사용합니다.

가격 책정

  • BigQuery에서 Spark 프로시져를 실행하는 요금은 Dataproc Serverless에서 Spark 프로시져를 실행하는 요금과 유사합니다. 자세한 내용은 Dataproc Serverless 가격 책정을 참조하세요.

  • Spark 저장 프로시져는 주문형 가격 책정 모델 뿐만 아니라 모든 BigQuery 버전에서 사용할 수 있습니다. Spark 프로시져는 프로젝트에 사용된 컴퓨팅 가격 책정 모델에 관계없이 모든 경우에 BigQuery Enterprise 버전의 사용한 만큼만 지불 모델을 사용하여 요금이 청구됩니다.

  • BigQuery의 Spark 저장 프로시져는 예약 또는 약정 사용을 지원하지 않습니다. 기존 예약 및 약정은 지원되는 다른 쿼리 및 프로시져에 계속 사용됩니다. Spark 저장 프로시져의 사용 요금은 Enterprise 버전 - 사용한 만큼만 지불 요금에 따라 청구서에 추가됩니다. 해당하는 경우 조직 할인이 적용됩니다.

  • Spark 저장 프로시져는 Spark 실행 엔진을 사용하지만 Spark 실행에 대한 별도의 요금은 표시되지 않습니다. 앞서 언급했듯이 해당 요금은 BigQuery Enterprise 버전 사용한 만큼만 지불 SKU로 보고됩니다.

  • Spark 저장 프로시져는 무료 등급을 제공하지 않습니다.

Spark용 저장 프로시져 만들기

사용하는 연결과 동일한 위치에 저장 프로시져를 만들어야 합니다.

저장 프로시져 본문이 1MB를 초과하는 경우 인라인 코드를 사용하는 대신 저장 프로시져를 Cloud Storage 버킷의 파일에 두는 것이 좋습니다. BigQuery는 Python을 사용하여 Spark의 저장 프로시져를 만드는 두 가지 방법을 제공합니다.

SQL 쿼리 편집기 사용

SQL 쿼리 편집기에서 Spark의 저장 프로시져를 만들려면 다음 단계를 따르세요.

  1. BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 쿼리 편집기에서, 표시되는 CREATE PROCEDURE의 샘플 코드를 추가합니다.

    또는, 탐색기 창에서 연결 리소스를 만들기 위해 사용한 프로젝트에서 연결을 클릭합니다. 그런 다음 Spark의 저장 프로시져를 만들려면 저장 프로시져 만들기를 클릭합니다.

    Python

    Python에서 Spark의 저장 프로시져를 만들려면 다음 샘플 코드를 사용합니다.

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java 또는 Scala

    main_file_uri 옵션을 사용하여 Java 또는 Scala에서 Spark의 저장 프로시져를 만들려면 다음 샘플 코드를 사용합니다.

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    main_classjar_uris 옵션을 사용하여 Java 또는 Scala에서 Spark의 저장 프로시져를 만들려면 다음 샘플 코드를 사용합니다.

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    다음을 바꿉니다.

    • PROJECT_ID: 저장 프로시져를 만들려는 프로젝트(예: myproject)
    • DATASET: 저장 프로시져를 만들려는 데이터 세트(예: mydataset)
    • PROCEDURE_NAME: BigQuery에서 실행하려는 저장 프로시져의 이름(예: mysparkprocedure)
    • PROCEDURE_ARGUMENT: 입력 인수를 입력하는 매개변수

      이 매개변수에서 다음 필드를 지정하세요.

      • ARGUMENT_MODE: 인수의 모드

        유효한 값은 IN, OUT, INOUT입니다. 기본값은 IN입니다.

      • ARGUMENT_NAME: 인수의 이름
      • ARGUMENT_TYPE: 인수의 유형

      예를 들면 myproject.mydataset.mysparkproc(num INT64)입니다.

      자세한 내용은 이 문서의 IN 매개변수 또는 OUTINOUT 매개변수로서 값 전달하기를 참조하세요.

    • CONNECTION_PROJECT_ID: Spark 프로시져를 실행하기 위한 연결이 포함된 프로젝트
    • CONNECTION_REGION: Spark 프로시져를 실행하기 위한 연결이 포함된 리전(예: us)
    • CONNECTION_ID: 연결 ID(예: myconnection)

      Google Cloud 콘솔에서 연결 세부정보를 볼 때 연결 ID는 연결 ID에 표시되는 정규화된 연결 ID의 마지막 섹션에 있는 값입니다(예: projects/myproject/locations/connection_location/connections/myconnection).

    • RUNTIME_VERSION: Spark의 런타임 버전(예: 1.1)
    • MAIN_PYTHON_FILE_URI: PySpark 파일의 경로(예: gs://mybucket/mypysparkmain.py)

      또는 CREATE PROCEDURE 문에 저장 프로시져의 본문을 추가하려면 이 문서의 인라인 코드 사용의 예시를 참조하여 LANGUAGE PYTHON AS 다음에 PYSPARK_CODE를 추가하세요.

    • PYSPARK_CODE: 프로시져 인라인의 본문을 전달하려는 경우 CREATE PROCEDURE 문의 PySpark 애플리케이션 정의

      값은 문자열 리터럴입니다. 코드에 따옴표와 백슬래시가 포함되는 경우 이스케이프 처리되거나 원시 문자열로 표현되어야 합니다. 예를 들어 코드 반환 "\n";은 다음 중 하나로 표현할 수 있습니다.

      • 따옴표 붙은 문자열: "return \"\\n\";". 따옴표와 백슬래시 모두 이스케이프 처리됩니다.
      • 삼중따옴표 붙은 문자열: """return "\\n";""". 백슬래시는 이스케이프 처리되지만 따옴표는 이스케이프 처리 되지 않습니다.
      • 원시 문자열: r"""return "\n";""". 이스케이프 처리가 필요하지 않습니다.
      인라인 PySpark 코드를 추가하는 방법을 알아보려면 인라인 코드 사용을 참조하세요.
    • MAIN_JAR_URI: main 클래스가 포함된 JAR 파일의 경로(예: gs://mybucket/my_main.jar)
    • CLASS_NAME: jar_uris 옵션으로 JAR 세트에 있는 클래스의 정규화된 이름(예: com.example.wordcount)
    • URI: main 클래스에 지정된 클래스가 포함된 JAR 파일의 경로(예: gs://mybucket/mypysparkmain.jar)

    OPTIONS에 지정할 수 있는 추가 옵션은 프로시져 옵션 목록을 참조하세요.

PySpark 편집기 사용

PySpark 편집기를 사용하여 프로시져를 만들 때는 CREATE PROCEDURE 문을 사용할 필요가 없습니다. 대신 Pyspark 편집기에서 직접 Python 코드를 추가하고 코드를 저장하거나 실행합니다.

PySpark 편집기에서 Spark용 저장 프로시져를 만들려면 다음 단계를 따르세요.

  1. BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. PySpark 코드를 직접 입력하려면 PySpark 편집기를 엽니다. PySpark 편집기를 열려면 SQL 쿼리 만들기 옆에 있는 메뉴를 클릭한 다음 PySpark 프로시져 만들기를 선택합니다.

  3. 옵션을 설정하려면 더보기 > PySpark 옵션을 클릭한 후 다음을 수행합니다.

    1. PySpark 코드를 실행할 위치를 지정합니다.

    2. 연결 필드에서 Spark 연결을 지정합니다.

    3. 저장 프로시져 호출 섹션에서 생성된 임시 저장 프로시져를 저장할 데이터 세트를 지정합니다. 특정 데이터 세트를 설정하거나 임시 데이터 세트를 사용하여 PySpark 코드를 호출할 수 있습니다.

      임시 데이터 세트는 이전 단계에서 지정한 위치를 사용하여 생성됩니다. 데이터 세트 이름이 지정된 경우 데이터 세트와 Spark 연결이 같은 위치에 있어야 합니다.

    4. 매개변수 섹션에서 저장 프로시져의 매개변수를 정의합니다. 매개변수 값은 PySpark 코드를 세션 실행 중에만 사용하지만 선언 자체는 프로시져에 저장됩니다.

    5. 고급 옵션 섹션에서 프로시져 옵션을 지정합니다. 프로시져 옵션의 자세한 목록은 프로시져 옵션 목록을 참조하세요.

    6. 속성 섹션에서 키-값 쌍을 추가하여 작업을 구성합니다. Dataproc Serverless Spark 속성에서 키-값 쌍을 사용할 수 있습니다.

    7. 서비스 계정 설정에서 PySpark 코드의 세션 내 실행 중에 사용할 커스텀 서비스 계정, CMEK, 스테이징 데이터 세트, 스테이징 Cloud Storage 폴더를 지정합니다.

    8. 저장을 클릭합니다.

Spark를 위한 저장 프로시져 저장

PySpark 편집기를 사용하여 저장 프로시져를 만든 후 저장 프로시져를 저장할 수 있습니다. 단계별 절차는 다음과 같습니다.

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 쿼리 편집기에서 PySpark 편집기로 Python을 사용하여 Spark의 저장 프로시져를 만듭니다.

  3. 저장 > 저장 프로시져를 클릭합니다.

  4. 저장 프로시져 저장 대화상자에서 저장 프로시져를 저장할 데이터 세트 이름과 저장 프로시져의 이름을 지정합니다.

  5. 저장을 클릭합니다.

    PySpark 코드를 저장 프로시져로 저장하는 대신 실행하기만 하려면 저장 대신 실행을 클릭하면 됩니다.

커스텀 컨테이너 사용

커스텀 컨테이너는 워크로드의 드라이버와 실행자 프로세스를 위한 런타임 환경을 제공합니다. 커스텀 컨테이너를 사용하려면 다음 샘플 코드를 사용합니다.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

다음을 바꿉니다.

  • PROJECT_ID: 저장 프로시져를 만들려는 프로젝트(예: myproject)
  • DATASET: 저장 프로시져를 만들려는 데이터 세트(예: mydataset)
  • PROCEDURE_NAME: BigQuery에서 실행하려는 저장 프로시져의 이름(예: mysparkprocedure)
  • PROCEDURE_ARGUMENT: 입력 인수를 입력하는 매개변수

    이 매개변수에서 다음 필드를 지정하세요.

    • ARGUMENT_MODE: 인수의 모드

      유효한 값은 IN, OUT, INOUT입니다. 기본값은 IN입니다.

    • ARGUMENT_NAME: 인수의 이름
    • ARGUMENT_TYPE: 인수의 유형

    예를 들면 myproject.mydataset.mysparkproc(num INT64)입니다.

    자세한 내용은 이 문서의 IN 매개변수 또는 OUTINOUT 매개변수로서 값 전달하기를 참조하세요.

  • CONNECTION_PROJECT_ID: Spark 프로시져를 실행하기 위한 연결이 포함된 프로젝트
  • CONNECTION_REGION: Spark 프로시져를 실행하기 위한 연결이 포함된 리전(예: us)
  • CONNECTION_ID: 연결 ID(예: myconnection)

    Google Cloud 콘솔에서 연결 세부정보를 볼 때 연결 ID는 연결 ID에 표시되는 정규화된 연결 ID의 마지막 섹션에 있는 값입니다(예: projects/myproject/locations/connection_location/connections/myconnection).

  • RUNTIME_VERSION: Spark의 런타임 버전(예: 1.1)
  • MAIN_PYTHON_FILE_URI: PySpark 파일의 경로(예: gs://mybucket/mypysparkmain.py)

    또는 CREATE PROCEDURE 문에 저장 프로시져의 본문을 추가하려면 이 문서의 인라인 코드 사용의 예시를 참조하여 LANGUAGE PYTHON AS 다음에 PYSPARK_CODE를 추가하세요.

  • PYSPARK_CODE: 프로시져 인라인의 본문을 전달하려는 경우 CREATE PROCEDURE 문의 PySpark 애플리케이션 정의

    값은 문자열 리터럴입니다. 코드에 따옴표와 백슬래시가 포함되는 경우 이스케이프 처리되거나 원시 문자열로 표현되어야 합니다. 예를 들어 코드 반환 "\n";은 다음 중 하나로 표현할 수 있습니다.

    • 따옴표 붙은 문자열: "return \"\\n\";". 따옴표와 백슬래시 모두 이스케이프 처리됩니다.
    • 삼중따옴표 붙은 문자열: """return "\\n";""". 백슬래시는 이스케이프 처리되지만 따옴표는 이스케이프 처리되지 않습니다.
    • 원시 문자열: r"""return "\n";""". 이스케이프 처리가 필요하지 않습니다.
    인라인 PySpark 코드를 추가하는 방법을 알아보려면 인라인 코드 사용을 참조하세요.
  • CONTAINER_IMAGE: Artifact Registry의 이미지 경로. 프로시져에서 사용할 라이브러리만 포함해야 합니다. 지정되지 않은 경우 런타임 버전과 연결된 시스템 기본값 컨테이너 이미지가 사용됩니다.

Spark로 커스텀 컨테이너 이미지를 빌드하는 방법에 대한 자세한 내용은 커스텀 컨테이너 이미지 빌드를 참조하세요.

Spark용 저장 프로시져 호출

저장 프로시져를 만든 후 다음 옵션 중 하나를 사용하여 이를 호출할 수 있습니다.

콘솔

  1. BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 탐색기 창에서 프로젝트를 확장하고 실행할 Spark의 저장 프로시져를 선택합니다.

  3. 저장 프로시져 정보 창에서 저장 프로시져 호출을 클릭합니다. 또는 작업 보기 옵션을 확장하고 호출을 클릭합니다.

  4. 실행을 클릭합니다.

  5. 모든 결과 섹션에서 결과 보기를 클릭합니다.

  6. (선택사항): 쿼리 결과 섹션에서 다음 단계를 수행합니다.

    • Spark 드라이버 로그를 보려면 실행 세부정보를 클릭합니다.

    • Cloud Logging에서 로그를 보려면 작업 정보를 클릭한 후 로그 필드에서 로그를 클릭합니다.

    • Spark 기록 서버 엔드포인트를 가져오려면 작업 정보를 클릭한 다음 Spark 기록 서버를 클릭합니다.

SQL

저장 프로시져를 호출하려면 CALL PROCEDURE을 사용합니다.

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 쿼리 편집기에서 다음 문을 입력합니다.

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. 실행을 클릭합니다.

쿼리를 실행하는 방법에 대한 자세한 내용은 대화형 쿼리 실행을 참조하세요.

커스텀 서비스 계정 사용

데이터 액세스에 Spark 연결의 서비스 ID를 사용하는 대신 커스텀 서비스 계정을 사용하여 Spark 코드 내의 데이터에 액세스할 수 있습니다.

커스텀 서비스 계정을 사용하려면 Spark 저장 프로시져를 만들 때 INVOKER 보안 모드(EXTERNAL SECURITY INVOKER 문 사용)를 지정하고 저장 프로시져를 호출할 때 서비스 계정을 지정합니다.

Cloud Storage에서 Spark 코드에 액세스하고 사용하려면 Spark 연결의 서비스 ID에 필요한 권한을 부여해야 합니다. 연결의 서비스 계정에 storage.objects.get IAM 권한 또는 storage.objectViewer IAM 역할을 부여해야 합니다.

선택적으로 연결에 지정한 경우 연결의 서비스 계정에 Dataproc Metastore 및 Dataproc 영구 기록 서버에 대한 액세스 권한을 부여할 수 있습니다. 자세한 내용은 서비스 계정에 액세스 권한 부여를 참조하세요.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

선택적으로 앞의 코드에 다음 인수를 추가할 수 있습니다.

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

다음을 바꿉니다.

  • CUSTOM_SERVICE_ACCOUNT: (필수사항) 사용자가 제공하는 커스텀 서비스 계정
  • BUCKET_NAME: (선택사항) 기본 Spark 애플리케이션 파일 시스템으로 사용되는 Cloud Storage 버킷. 제공되지 않은 경우 프로젝트에 기본 Cloud Storage 버킷이 생성되고 동일한 프로젝트에서 실행되는 모든 작업에서 해당 버킷을 공유합니다.
  • DATASET: (선택사항) 프로시져를 호출하여 생성된 임시 데이터를 저장할 데이터 세트. 작업이 완료되면 데이터가 정리됩니다. 제공되지 않은 경우 작업의 기본 임시 데이터 세트가 생성됩니다.

커스텀 서비스 계정에는 다음 권한이 있어야 합니다.

  • 기본 Spark 애플리케이션 파일 시스템으로 사용되는 스테이징 버킷을 읽고 쓰려면 다음 안내를 따르세요.

    • 지정한 스테이징 버킷에 대한 storage.objects.* 권한 또는 roles/storage.objectAdmin IAM 역할
    • 또한 스테이징 버킷이 지정되지 않은 경우 프로젝트에 대한 storage.buckets.* 권한 또는 roles/storage.Admin IAM 역할
  • (선택사항) BigQuery에서 데이터를 읽고 쓰려면 다음 안내를 따르세요.

    • BigQuery 테이블에 대한 bigquery.tables.* 권한
    • 프로젝트에 대한 bigquery.readsessions.* 권한
    • roles/bigquery.admin IAM 역할에는 이전 권한이 포함됩니다.
  • (선택사항) Cloud Storage에서 데이터를 읽고 쓰려면 다음 안내를 따르세요.

    • Cloud Storage 객체에 대한 storage.objects.* 권한 또는 roles/storage.objectAdmin IAM 역할
  • (선택사항) INOUT/OUT 매개변수에 사용되는 스테이징 데이터 세트를 읽고 쓰려면 다음 안내를 따르세요.

    • 지정한 스테이징 데이터 세트에 대한 bigquery.tables.* 또는 roles/bigquery.dataEditor IAM 역할
    • 또한 스테이징 데이터 세트가 지정되지 않은 경우 프로젝트에 대한 bigquery.datasets.create 권한 또는 roles/bigquery.dataEditor IAM 역할이 필요합니다.

Spark를 위한 저장 프로시져의 예시

이 섹션에서는 Apache Spark의 저장 프로시져를 만드는 방법의 예시를 보여줍니다.

Cloud Storage에서 PySpark 또는 JAR 파일 사용

다음 예시에서는 my-project-id.us.my-connection 연결과 PySpark 또는 Cloud Storage 버킷에 저장된 JAR 파일을 사용하여 Spark의 저장 프로시져를 만드는 방법을 보여줍니다.

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java 또는 Scala

main_file_uri를 사용하여 저장 프로시져를 만듭니다.

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

main_class를 사용하여 저장 프로시져를 만듭니다.

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

인라인 코드 사용

다음 예시에서는 연결 my-project-id.us.my-connection과 인라인 PySpark 코드를 사용하여 Spark의 저장 프로시져를 만드는 방법을 보여줍니다.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

입력 매개변수로 값 전달

다음 예시에서는 Python에서 입력 매개변수로 값을 전달하는 두 가지 방법을 보여줍니다.

방법 1: 환경 변수 사용

PySpark 코드에서 Spark 드라이버와 실행자의 환경 변수를 통해 Spark를 위한 저장 프로시져의 입력 매개변수를 가져올 수 있습니다. 환경 변수 이름은 BIGQUERY_PROC_PARAM.PARAMETER_NAME 형식이며, 여기서 PARAMETER_NAME은 입력 매개변수 이름입니다. 예를 들어 입력 매개변수의 이름이 var이면 해당 환경 변수의 이름은 BIGQUERY_PROC_PARAM.var이 됩니다. 입력 매개변수는 JSON 인코딩됩니다. PySpark 코드에서 환경 변수로부터 JSON 문자열의 입력 매개변수 값을 가져와 Python 변수로 디코딩할 수 있습니다.

다음 예시에서는 INT64 유형의 입력 매개변수 값을 PySpark 코드로 가져오는 방법을 보여줍니다.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

방법 2: 기본 제공 라이브러리 사용

PySpark 코드에서 기본 제공 라이브러리를 가져와서 모든 유형의 매개변수를 채우는 데 사용할 수 있습니다. 매개변수를 실행자에 전달하려면 Spark 드라이버의 매개변수를 Python 변수로 채우고 값을 실행자에 전달합니다. 기본 제공 라이브러리는 INTERVAL, GEOGRAPHY, NUMERIC, BIGNUMERIC을 제외한 대부분의 BigQuery 데이터 유형을 지원합니다.

BigQuery 데이터 유형 Python 데이터 유형
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC 지원되지 않음
BIGNUMERIC 지원되지 않음
INTERVAL 지원되지 않음
GEOGRAPHY 지원되지 않음

다음은 기본 제공 라이브러리를 가져와서 이를 사용하여 INT64 유형의 입력 매개변수와 ARRAY<STRUCT<a INT64, b STRING>> 유형의 입력 매개변수를 PySpark 코드에 채우는 방법을 보여줍니다.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Java 또는 Scala 코드에서 Spark 드라이버 및 실행자의 환경 변수를 통해 Spark에 대한 저장 프로시져의 입력 매개변수를 가져올 수 있습니다. 환경 변수 이름은 BIGQUERY_PROC_PARAM.PARAMETER_NAME 형식이며, 여기서 PARAMETER_NAME은 입력 매개변수 이름입니다. 예를 들어 입력 매개변수의 이름이 var이면 해당 환경 변수의 이름은 BIGQUERY_PROC_PARAM.var이 됩니다. Java 또는 Scala 코드에서 환경 변수에서 입력 매개변수 값을 가져올 수 있습니다.

다음 예시에서는 환경 변수에서 Scala 코드로 입력 매개변수 값을 가져오는 방법을 보여줍니다.

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

다음 예시에서는 환경 변수에서 Java 코드로 입력 매개변수를 가져오는 방법을 보여줍니다.

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

OUTINOUT 매개변수로서 값을 전달

출력 매개변수는 Spark 프로시져의 값을 반환하는 반면 INOUT 매개변수는 프로시져의 값을 수락하고 프로시져의 값을 반환합니다. OUTINOUT 매개변수를 사용하려면 Spark 프로시져를 만들 때 매개변수 이름 앞에 OUT 또는 INOUT 키워드를 추가하세요. PySpark 코드에서 기본 제공 라이브러리를 사용하여 값을 OUT 또는 INOUT 매개변수로 반환합니다. 기본 제공 라이브러리는 입력 매개변수와 마찬가지로 INTERVAL, GEOGRAPHY, NUMERIC, BIGNUMERIC을 제외한 대부분의 BigQuery 데이터 유형을 지원합니다. TIMEDATETIME 유형 값은 OUT 또는 INOUT 매개변수로 반환할 때 UTC 시간대로 변환됩니다.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Hive Metastore 테이블에서 읽기 및 BigQuery에 결과 쓰기

다음 예시는 Hive Metastore 테이블을 변환하고 결과를 BigQuery에 쓰는 방법을 보여줍니다.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

로그 필터 보기

Spark용 저장 프로시져를 호출한 후 로그 정보를 볼 수 있습니다. Cloud Logging 필터 정보와 Spark 기록 클러스터 엔드포인트를 가져오려면 bq show 명령어를 사용하세요. 필터 정보는 하위 작업의 SparkStatistics 필드에서 사용할 수 있습니다. 로그 필터를 가져오려면 다음 단계를 수행합니다.

  1. BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 쿼리 편집기에서 저장 프로시져의 스크립트 작업의 하위 작업을 나열합니다.

    bq ls -j --parent_job_id=$parent_job_id

    작업 ID를 가져오는 방법을 알아보려면 작업 세부정보 보기를 참조하세요.

    출력은 다음과 비슷합니다.

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. 저장 프로시져의 jobId를 식별하고 bq show 명령어를 사용하여 작업의 세부정보를 확인합니다.

    bq show --format=prettyjson --job $child_job_id

    다른 단계에 필요하므로 sparkStatistics 필드를 복사합니다.

    출력은 다음과 비슷합니다.

    {
    "configuration": {...}
    …
    "statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Logging의 경우 SparkStatistics 필드로 로그 필터를 생성합니다.

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    로그는 bigquery.googleapis.com/SparkJob 모니터링 리소스에 기록됩니다. 로그는 INFO, DRIVER, EXECUTOR 구성요소로 라벨이 지정됩니다. Spark 드라이버의 로그를 필터링하려면 로그 필터에 labels.component = "DRIVER" 구성요소를 추가합니다. Spark 실행자의 로그를 필터링하려면 labels.component = "EXECUTOR" 구성요소를 로그 필터에 추가합니다.

고객 관리 암호화 키 사용

BigQuery Spark 프로시져는 BigQuery에서 제공하는 기본 암호화와 함께 고객 관리 암호화 키(CMEK)를 사용하여 콘텐츠를 보호합니다. Spark 프로시져에서 CMEK를 사용하려면 먼저 BigQuery 암호화 서비스 계정 만들기를 트리거하고 필요한 권한을 부여합니다. Spark 프로시져는 프로젝트에 적용되는 경우 CMEK 조직 정책도 지원합니다.

저장 프로시져가 INVOKER 보안 모드를 사용하는 경우 프로시져를 호출할 때 SQL 시스템 변수를 통해 CMEK를 지정해야 합니다. 그렇지 않으면 저장 프로시져와 연결된 연결을 통해 CMEK를 지정할 수 있습니다.

Spark 저장 프로시져를 만들 때 연결을 통해 CMEK를 지정하려면 다음 샘플 코드를 사용합니다.

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

프로시져를 호출할 때 SQL 시스템 변수를 통해 CMEK를 지정하려면 다음 샘플 코드를 사용합니다.

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

VPC 서비스 제어 사용

VPC 서비스 제어를 사용하면 데이터 무단 반출을 방지하기 위해 보안 경계를 설정할 수 있습니다. 추가 보안을 위해 Spark 프로시져와 함께 VPC 서비스 제어를 사용하려면 먼저 서비스 경계를 만듭니다.

Spark 프로시져 작업을 완벽하게 보호하려면 다음 API를 서비스 경계에 추가합니다.

  • BigQuery API(bigquery.googleapis.com)
  • Cloud Logging API(logging.googleapis.com)
  • Cloud Storage API(storage.googleapis.com)(Cloud Storage를 사용하는 경우)
  • Artifact Registry API(artifactregistry.googleapis.com) 또는 Container Registry API(containerregistry.googleapis.com)(커스텀 컨테이너를 사용하는 경우)
  • Dataproc Metastore API(metastore.googleapis.com) 및 Cloud Run Admin API(run.googleapis.com)(Dataproc Metastore를 사용하는 경우)

Spark 프로시져의 쿼리 프로젝트를 경계에 추가합니다. Spark 코드 또는 데이터를 호스팅하는 다른 프로젝트를 경계에 추가합니다.

권장사항

  • 프로젝트에서 처음으로 연결을 사용하는 경우 프로비저닝하는 데 약 1분이 추가로 소요됩니다. 시간을 절약하려면 Spark의 저장 프로시져를 만들 때 기존 Spark 연결을 재사용하는 방법이 있습니다.

  • 프로덕션 용도로 Spark 프로시져를 만드는 경우 런타임 버전을 지정하는 것이 좋습니다. 지원되는 런타임 버전 목록은 Dataproc Serverless 런타임 버전을 참조하세요. LTS(Long-Time-Support) 버전을 사용하는 것이 좋습니다.

  • Spark 프로시져에서 커스텀 컨테이너를 지정하는 경우 Artifact Registry 및 이미지 스트리밍을 사용하는 것이 좋습니다.

  • 성능 향상을 위해 Spark 프로시져에서 리소스 할당 속성을 지정할 수 있습니다. Spark 저장 프로시져는 Dataproc Serverless와 동일한 리소스 할당 속성 목록을 지원합니다.

제한사항

  • gRPC 엔드포인트 프로토콜을 사용해서만 Dataproc Metastore에 연결할 수 있습니다. 다른 유형의 Hive Metastore는 아직 지원되지 않습니다.
  • 고객 관리 암호화 키(CMEK)는 고객이 단일 리전 Spark 프로시져를 만드는 경우에만 사용할 수 있습니다. 전역 리전 CMEK 키와 멀티 리전 CMEK 키(예: EU 또는 US)는 지원되지 않습니다.
  • 출력 매개변수 전달은 PySpark에서만 지원됩니다.
  • Spark의 저장 프로시져와 연결된 데이터 세트가 리전 간 데이터 세트 복제를 통해 대상 리전에 복제되면 저장 프로시져는 생성된 리전에서만 쿼리할 수 있습니다.

할당량 및 한도

할당량 및 한도에 대한 상세 설명은 Spark의 저장 프로시져 할당량 및 한도를 참조하세요.

다음 단계