Spark를 위한 서버리스 Dataproc로 BigQuery 커넥터 사용

spark-bigquery-connectorApache Spark와 함께 사용하여 BigQuery에서 데이터를 읽고 씁니다. 이 튜토리얼에서는 spark-bigquery-connector를 사용하는 PySpark 애플리케이션을 보여줍니다.

워크로드에 커넥터 제공

다음 방법 중 하나로 애플리케이션에서 커넥터를 사용할 수 있습니다.

  • Spark 배치 워크로드를 위해 서버리스 Dataproc를 제출할 때 jars 매개변수를 사용하여 커넥터 jar 파일을 가리킵니다. 다음 예시에서는 커넥터 jar 파일을 지정합니다. GitHub의 GoogleCloudDataproc/spark-bigquery-connector 저장소에서 사용 가능한 커넥터 jar 파일 목록을 참조하세요.
    • SDK 예시:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-version.jar \
          ... other args
      
  • Spark 애플리케이션에 커넥터 jar 파일을 종속 항목으로 포함합니다(커넥터에 대한 컴파일 참조).

비용 계산

이 가이드에서는 다음과 같은 비용이 청구될 수 있는 Google Cloud 구성요소를 사용합니다.

  • 서버리스 Dataproc
  • BigQuery
  • Cloud Storage

가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다. Cloud Platform 신규 사용자는 무료 체험판을 사용할 수 있습니다.

BigQuery I/O

이 예에서는 BigQuery에서 Spark DataFrame으로 데이터를 읽어 들이고 표준 데이터 소스 API를 사용하여 단어 수를 계산합니다.

커넥터는 다음과 같이 워드카운트 출력을 BigQuery에 씁니다.

  1. Cloud Storage 버킷의 임시 파일로 데이터를 버퍼링합니다.

  2. 한 번의 작업으로 Cloud Storage 버킷에서 BigQuery로 데이터를 복사합니다.

  3. BigQuery 로드 작업이 완료된 후 Cloud Storage에서 임시 파일을 삭제합니다(Spark 애플리케이션이 종료되면 임시 파일도 삭제됨). 삭제에 실패하면 원치 않는 임시 Cloud Storage 파일을 삭제해야 하며, 이 파일은 보통 gs://your-bucket/.spark-bigquery-jobid-UUID에 있습니다.

결제 구성

기본적으로 사용자 인증 정보나 서비스 계정과 연결된 프로젝트에는 API 사용 요금이 청구됩니다. 다른 프로젝트에 요금을 청구하려면 spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>") 구성을 설정합니다.

.option("parentProject", "<BILLED-GCP-PROJECT>")처럼 읽기/쓰기 작업에 추가할 수도 있습니다.

PySpark 워드카운트 배치 워크로드 제출

  1. 로컬 터미널 또는 Cloud Shell에서 bq 명령줄 도구를 사용하여 wordcount_dataset를 만듭니다.
    bq mk wordcount_dataset
    
  2. 로컬 터미널 또는 Cloud Shell에서 gsutil 명령줄 도구'를 사용하여 Cloud Storage 버킷을 만듭니다.
    gsutil mb gs://your-bucket
    
  3. 코드 검사
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. PySpark 코드 목록에서 PySpark 코드를 복사하여 텍스트 편집기에서 로컬로 wordcount.py를 만들고 [your-bucket] 자리표시자를 생성된 Cloud Storage 버킷으로 바꿉니다.
  5. PySpark 배치 워크로드를 제출합니다.
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    샘플 터미널 출력:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Console에서 출력 테이블을 미리 보려면 프로젝트의 BigQuery 페이지를 열고 wordcount_output 테이블을 선택한 다음 미리보기를 클릭합니다.

추가 정보