spark-bigquery-connector를 Apache Spark와 함께 사용하여 BigQuery에서 데이터를 읽고 쓸 수 있습니다. 이 튜토리얼에서는 Spark 애플리케이션 내에서 Spark-bigquery-connector를 사용하는 예시 코드를 제공합니다. 클러스터를 만드는 방법은 Dataproc 빠른 시작을 참조하세요.
애플리케이션에서 커넥터를 사용할 수 있도록 설정
다음 방법 중 하나를 사용하여 애플리케이션에서 spark-bigquery-connector를 사용할 수 있습니다.
클러스터를 만들 때 Dataproc 커넥터 초기화 작업을 사용하여 모든 노드의 Spark jars 디렉터리에 spark-bigquery-connector를 설치합니다.
작업을 제출할 때 커넥터 URI를 제공합니다.
- Google Cloud 콘솔: Dataproc 작업 제출 페이지에서 Spark 작업
Jars files
항목을 사용합니다. - gcloud CLI:
gcloud dataproc jobs submit spark --jars
플래그를 사용합니다. - Dataproc API:
SparkJob.jarFileUris
필드를 사용합니다.
- Google Cloud 콘솔: Dataproc 작업 제출 페이지에서 Spark 작업
Scala 또는 자바 Spark 애플리케이션에 있는 jar를 종속 항목으로 포함합니다(커넥터에 대한 컴파일 참조).
커넥터 jar URI를 지정하는 방법
Spark-BigQuery 커넥터 버전은 GitHub GoogleCloudDataproc/spark-bigquery-connector 저장소에 나열되어 있습니다.
다음 URI 문자열에서 Scala 및 커넥터 버전 정보를 대체하여 커넥터 jar을 지정합니다.
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Dataproc 이미지 버전
1.5+
와 함께 Scala2.12
를 사용합니다.gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud CLI 예시:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Dataproc 이미지 버전
1.4
이하에서는 Scala2.11
을 사용합니다.gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud CLI 예시:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
비용 계산
이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
- Dataproc
- BigQuery
- Cloud Storage
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
BigQuery에서 데이터 읽기 및 쓰기
이 예에서는 BigQuery에서 Spark DataFrame으로 데이터를 읽어 들이고 표준 데이터 소스 API를 사용하여 단어 수를 계산합니다.
커넥터는 먼저 모든 데이터를 Cloud Storage 임시 테이블에 버퍼링하여 데이터를 BigQuery에 씁니다. 그런 다음 한 번의 작업으로 모든 데이터를 BigQuery에 복사합니다. 커넥터는 BigQuery 로드 작업이 성공하면 임시 파일을 삭제하고 Spark 애플리케이션이 종료될 때 다시 한 번 삭제합니다.
작업이 실패하면 남아 있는 임시 Cloud Storage 파일을 모두 삭제합니다. 일반적으로 임시 BigQuery 파일은 gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
에 있습니다.
결제 구성
기본적으로 사용자 인증 정보나 서비스 계정과 연결된 프로젝트에는 API 사용 요금이 청구됩니다. 다른 프로젝트에 요금을 청구하려면 spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
구성을 설정합니다.
.option("parentProject", "<BILLED-GCP-PROJECT>")
처럼 읽기/쓰기 작업에 추가할 수도 있습니다.
코드 실행
이 예를 실행하기 전에 'wordcount_dataset'라는 데이터 세트를 만들거나 코드의 출력 데이터 세트를 Google Cloud 프로젝트의 기존 BigQuery 데이터 세트로 변경합니다.
bq 명령어를 사용하여 wordcount_dataset
를 만듭니다.
bq mk wordcount_dataset
Google Cloud CLI 명령어를 사용하여 BigQuery로 내보내는 데 사용할 Cloud Storage 버킷을 만듭니다.
gcloud storage buckets create gs://[bucket]
Scala
- 코드를 검토하고 이전에 만든 Cloud Storage 버킷으로 [bucket] 자리표시자를 바꿉니다.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- 클러스터에서 코드 실행
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
- >클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는
SSH
를 클릭합니다.
마스터 노드의 홈 디렉터리에서 브라우저 창이 열립니다.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 사전 설치된
vi
,vim
,nano
텍스트 편집기로wordcount.scala
을 만든 다음 Scala 코드 목록에서 Scala 코드에 붙여넣습니다.nano wordcount.scala
spark-shell
REPL을 실행합니다.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
:load wordcount.scala
명령으로 wordcount.scala를 실행하여 BigQuerywordcount_output
테이블을 만듭니다. wordcount 출력의 20줄이 출력 목록에 표시됩니다.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
출력 테이블을 미리 보려면BigQuery
페이지를 열고wordcount_output
테이블을 선택한 다음 미리보기를 클릭하세요.
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
PySpark
- 코드를 검토하고 이전에 만든 Cloud Storage 버킷으로 [bucket] 자리표시자를 바꿉니다.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- 클러스터에서 코드 실행
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
- 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는
SSH
를 클릭합니다.
마스터 노드의 홈 디렉터리에서 브라우저 창이 열립니다.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 사전 설치된
vi
,vim
,nano
텍스트 편집기로wordcount.py
을 만든 다음 PySpark 코드 목록에서 PySpark 코드를 붙여넣습니다.nano wordcount.py
spark-submit
로 wordcount를 실행하여 BigQuerywordcount_output
테이블을 만듭니다. wordcount 출력의 20줄이 출력 목록에 표시됩니다.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
출력 테이블을 미리 보려면BigQuery
페이지를 열고wordcount_output
테이블을 선택한 다음 미리보기를 클릭하세요.
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.