spark-bigquery-connector를 Apache Spark와 함께 사용하여 BigQuery에서 데이터를 읽고 씁니다.
이 튜토리얼에서는 spark-bigquery-connector
를 사용하는 PySpark 애플리케이션을 보여줍니다.
워크로드에 커넥터 제공
다음 방법 중 하나로 애플리케이션에서 커넥터를 사용할 수 있습니다.
- Spark 배치 워크로드를 위해 서버리스 Dataproc를 제출할 때
jars
매개변수를 사용하여 커넥터 jar 파일을 가리킵니다. 다음 예시에서는 커넥터 jar 파일을 지정합니다. GitHub의 GoogleCloudDataproc/spark-bigquery-connector 저장소에서 사용 가능한 커넥터 jar 파일 목록을 참조하세요.- SDK 예시:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-version.jar \ ... other args
- SDK 예시:
- Spark 애플리케이션에 커넥터 jar 파일을 종속 항목으로 포함합니다(커넥터에 대한 컴파일 참조).
비용 계산
이 가이드에서는 다음과 같은 비용이 청구될 수 있는 Google Cloud 구성요소를 사용합니다.
- 서버리스 Dataproc
- BigQuery
- Cloud Storage
가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다. Cloud Platform 신규 사용자는 무료 체험판을 사용할 수 있습니다.
BigQuery I/O
이 예에서는 BigQuery에서 Spark DataFrame으로 데이터를 읽어 들이고 표준 데이터 소스 API를 사용하여 단어 수를 계산합니다.
커넥터는 다음과 같이 워드카운트 출력을 BigQuery에 씁니다.
Cloud Storage 버킷의 임시 파일로 데이터를 버퍼링합니다.
한 번의 작업으로 Cloud Storage 버킷에서 BigQuery로 데이터를 복사합니다.
BigQuery 로드 작업이 완료된 후 Cloud Storage에서 임시 파일을 삭제합니다(Spark 애플리케이션이 종료되면 임시 파일도 삭제됨). 삭제에 실패하면 원치 않는 임시 Cloud Storage 파일을 삭제해야 하며, 이 파일은 보통
gs://your-bucket/.spark-bigquery-jobid-UUID
에 있습니다.
결제 구성
기본적으로 사용자 인증 정보나 서비스 계정과 연결된 프로젝트에는 API 사용 요금이 청구됩니다. 다른 프로젝트에 요금을 청구하려면 spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
구성을 설정합니다.
.option("parentProject", "<BILLED-GCP-PROJECT>")
처럼 읽기/쓰기 작업에 추가할 수도 있습니다.
PySpark 워드카운트 배치 워크로드 제출
- 로컬 터미널 또는 Cloud Shell에서 bq 명령줄 도구를 사용하여
wordcount_dataset
를 만듭니다.bq mk wordcount_dataset
- 로컬 터미널 또는 Cloud Shell에서 gsutil 명령줄 도구'를 사용하여 Cloud Storage 버킷을 만듭니다.
gsutil mb gs://your-bucket
- 코드 검사
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- PySpark 코드 목록에서 PySpark 코드를 복사하여 텍스트 편집기에서 로컬로
wordcount.py
를 만들고 [your-bucket] 자리표시자를 생성된 Cloud Storage 버킷으로 바꿉니다. - PySpark 배치 워크로드를 제출합니다.
gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
샘플 터미널 출력:... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Console에서 출력 테이블을 미리 보려면 프로젝트의 BigQuery 페이지를 열고wordcount_output
테이블을 선택한 다음 미리보기를 클릭합니다.