이 페이지에서는 Spark Spanner 커넥터를 사용하여 Apache Spark를 통해 Spanner에서 데이터를 읽는 방법을 보여줍니다.
비용 계산
이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
- Dataproc
- Spanner
- Cloud Storage
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
시작하기 전에
튜토리얼을 실행하기 전에 커넥터 버전을 확인하고 커넥터 URI를 가져옵니다.
커넥터 JAR 파일 URI 지정 방법
Spark Spanner 커넥터 버전은 GitHub GoogleCloudDataproc/spark-spanner-connector 저장소에 나열되어 있습니다.
다음 URI 문자열에서 커넥터 버전 정보를 대체하여 커넥터 JAR 파일을 지정합니다.
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
커넥터는 Spark 버전 3.1+
에서 사용 가능합니다.
gcloud CLI 예시:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Spanner 데이터베이스 준비
Spanner 테이블이 없으면 튜토리얼에 따라 Spanner 테이블을 만들 수 있습니다. 그러면 인스턴스 ID, 데이터베이스 ID, 테이블 Singers
가 포함됩니다.
Dataproc 클러스터 만들기
커넥터를 사용하는 모든 Dataproc 클러스터에는 spanner
또는 cloud-platform
범위가 필요합니다. Dataproc 클러스터에는 이미지 2.1 이상에 대한 기본 범위 cloud-platform
이 포함됩니다. 이전 버전을 사용하는 경우 Google Cloud 콘솔, Google Cloud CLI, Dataproc API를 사용하여 Dataproc 클러스터를 만들 수 있습니다.
Console
- Google Cloud 콘솔에서 Dataproc 클러스터 만들기 페이지를 엽니다.
- '보안 관리' 탭의 '프로젝트 액세스' 섹션에서 '이 클러스터에 cloud-platform 범위 사용 설정'을 클릭합니다.
- 다른 클러스터 만들기 필드를 채우거나 확인한 후 '만들기'를 클릭합니다.
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
clusters.create 요청의 일부로 GceClusterConfig.serviceAccountScopes를 지정할 수 있습니다. 예를 들면 다음과 같습니다."serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
해당 Spanner 권한이 Dataproc VM 서비스 계정에 할당되어 있는지 확인해야 합니다. 튜토리얼에서 Data Boost를 사용하는 경우 Data Boost IAM 권한을 참조하세요.
Spanner에서 데이터 읽기
Scala 및 Python을 사용하여 Spark 데이터 소스 API를 통해 Spanner의 데이터를 Spark Dataframe에서 읽을 수 있습니다.
Scala
- 코드를 검사하고 [projectId], [instanceId], [databaseId], [table] 자리표시자를 이전에 만든 프로젝트 ID, 인스턴스 ID, 데이터베이스 ID, 테이블로 바꿉니다. enableDataBoost 옵션에서 기본 Spanner 인스턴스에 거의 영향을 주지 않는 Spanner Data Boost 기능을 사용 설정합니다.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- 클러스터에서 코드 실행
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
- >클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는
SSH
를 클릭합니다.
마스터 노드의 홈 디렉터리에서 브라우저 창이 열립니다.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 사전 설치된
vi
,vim
,nano
텍스트 편집기로singers.scala
을 만든 다음 Scala 코드 목록에서 Scala 코드에 붙여넣습니다.nano singers.scala
spark-shell
REPL을 실행합니다.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
:load singers.scala
명령어로 singers.scala를 실행하여 SpannerSingers
테이블을 만듭니다. 출력 목록에는 Singers 출력의 예시가 표시됩니다.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- 코드를 검사하고 [projectId], [instanceId], [databaseId], [table] 자리표시자를 이전에 만든 프로젝트 ID, 인스턴스 ID, 데이터베이스 ID, 테이블로 바꿉니다. enableDataBoost 옵션에서 기본 Spanner 인스턴스에 거의 영향을 주지 않는 Spanner Data Boost 기능을 사용 설정합니다.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- 클러스터에서 코드 실행
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
- 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는
SSH
를 클릭합니다.
기본 노드의 홈 디렉터리에서 브라우저 창이 열립니다.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 사전 설치된
vi
,vim
,nano
텍스트 편집기로singers.py
을 만든 다음 PySpark 코드 목록에서 PySpark 코드를 붙여넣습니다.nano singers.py
spark-submit
으로 singers.py를 실행하여 SpannerSingers
테이블을 만듭니다. 출력은 다음과 같습니다.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
삭제
이 둘러보기에서 생성한 리소스에 대한 비용이 Google Cloud 계정에 청구되지 않게 하려면 다음 단계를 수행합니다.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME