BigQuery 커넥터를 Spark와 함께 사용

spark-bigquery-connectorApache Spark와 함께 사용하여 BigQuery에서 데이터를 읽고 쓸 수 있습니다. 이 가이드에서는 Spark 애플리케이션 내에서 Spark-bigquery-connector를 사용하는 예시 코드를 제공합니다. 클러스터를 만드는 방법은 Dataproc 빠른 시작을 참조하세요.

애플리케이션에서 커넥터를 사용할 수 있도록 설정

다음 방법 중 하나를 사용하여 애플리케이션에서 spark-bigquery-connector를 사용할 수 있습니다.

  1. 클러스터를 만들 때 Dataproc 커넥터 초기화 작업을 사용하여 모든 노드의 Spark jars 디렉터리에 spark-bigquery-connector를 설치합니다.

  2. 작업을 제출할 때 커넥터 URI를 제공합니다.

    1. Google Cloud 콘솔: Dataproc 작업 제출 페이지에서 Spark 작업 Jars files 항목을 사용합니다.
    2. gcloud CLI: gcloud dataproc jobs submit spark --jars 플래그를 사용합니다.
    3. Dataproc API: SparkJob.jarFileUris 필드를 사용합니다.
  3. Scala 또는 자바 Spark 애플리케이션에 있는 jar를 종속 항목으로 포함합니다(커넥터에 대한 컴파일 참조).

커넥터 jar URI를 지정하는 방법

Spark-BigQuery 커넥터 버전은 GitHub GoogleCloudDataproc/spark-bigquery-connector 저장소에 나열되어 있습니다.

다음 URI 문자열에서 Scala 및 커넥터 버전 정보를 대체하여 커넥터 jar을 지정합니다.

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Dataproc 이미지 버전 1.5+와 함께 Scala 2.12를 사용합니다.

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    gcloud CLI 예시:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Dataproc 이미지 버전 1.4 이하에서는 Scala 2.11을 사용합니다.

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    gcloud CLI 예시:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

비용 계산

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataproc
  • BigQuery
  • Cloud Storage

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

BigQuery에서 데이터 읽기 및 쓰기

이 예에서는 BigQuery에서 Spark DataFrame으로 데이터를 읽어 들이고 표준 데이터 소스 API를 사용하여 단어 수를 계산합니다.

커넥터는 먼저 모든 데이터를 Cloud Storage 임시 테이블에 버퍼링하여 데이터를 BigQuery에 씁니다. 그런 다음 한 번의 작업으로 모든 데이터를 BigQuery에 복사합니다. 커넥터는 BigQuery 로드 작업이 성공하면 임시 파일을 삭제하고 Spark 애플리케이션이 종료될 때 다시 한 번 삭제합니다. 작업이 실패하면 남아 있는 임시 Cloud Storage 파일을 모두 삭제합니다. 일반적으로 임시 BigQuery 파일은 gs://[bucket]/.spark-bigquery-[jobid]-[UUID]에 있습니다.

결제 구성

기본적으로 사용자 인증 정보나 서비스 계정과 연결된 프로젝트에는 API 사용 요금이 청구됩니다. 다른 프로젝트에 요금을 청구하려면 spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>") 구성을 설정합니다.

.option("parentProject", "<BILLED-GCP-PROJECT>")처럼 읽기/쓰기 작업에 추가할 수도 있습니다.

코드 실행

이 예를 실행하기 전에 'wordcount_dataset'라는 데이터 세트를 만들거나 코드의 출력 데이터 세트를 Google Cloud 프로젝트의 기존 BigQuery 데이터 세트로 변경합니다.

bq 명령어를 사용하여 wordcount_dataset를 만듭니다.

bq mk wordcount_dataset

gsutil 명령어를 사용하여 BigQuery로 내보내는 데 사용할 Cloud Storage 버킷을 만듭니다.

gsutil mb gs://[bucket]

Scala

  1. 코드를 검토하고 이전에 만든 Cloud Storage 버킷으로 [bucket] 자리표시자를 바꿉니다.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. 클러스터에서 코드 실행
    1. SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
      1. Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
        Cloud 콘솔의 Dataproc 클러스터 페이지
      2. >클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.
        Cloud 콘솔의 Dataproc 클러스터 세부정보 페이지

        마스터 노드의 홈 디렉터리에서 브라우저 창이 열립니다.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 사전 설치된 vi, vim, nano 텍스트 편집기로 wordcount.scala을 만든 다음 Scala 코드 목록에서 Scala 코드에 붙여넣습니다.
      nano wordcount.scala
        
    3. spark-shell REPL을 실행합니다.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. :load wordcount.scala 명령으로 wordcount.scala를 실행하여 BigQuery wordcount_output 테이블을 만듭니다. wordcount 출력의 20줄이 출력 목록에 표시됩니다.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      출력 테이블을 미리 보려면BigQuery 페이지에서 wordcount_output 테이블을 선택한 후 미리보기를 클릭합니다.
      Cloud 콘솔의 BigQuery 탐색기 페이지에서 테이블 미리보기

PySpark

  1. 코드를 검토하고 이전에 만든 Cloud Storage 버킷으로 [bucket] 자리표시자를 바꿉니다.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. 클러스터에서 코드 실행
    1. SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
      1. Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
        Cloud 콘솔의 클러스터 페이지
      2. 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.
        Cloud 콘솔의 클러스터 세부정보 페이지에 있는 클러스터 이름 행에서 SSH를 선택합니다.

        마스터 노드의 홈 디렉터리에서 브라우저 창이 열립니다.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 사전 설치된 vi , vim, nano 텍스트 편집기로 wordcount.py을 만든 다음 PySpark 코드 목록에서 PySpark 코드를 붙여넣습니다.
      nano wordcount.py
      
    3. spark-submit로 wordcount를 실행하여 BigQuery wordcount_output 테이블을 만듭니다. wordcount 출력의 20줄이 출력 목록에 표시됩니다.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      출력 테이블을 미리 보려면BigQuery 페이지에서 wordcount_output 테이블을 선택한 후 미리보기를 클릭합니다.
      Cloud 콘솔의 BigQuery 탐색기 페이지에서 테이블 미리보기

추가 정보