이 페이지에서는 Dataproc 클러스터를 만드는 방법을 보여줍니다. 이 클러스터는 Spark Spanner 커넥터를 활용해 Apache Spark로 Spanner에서 데이터를 읽습니다.
Spanner 커넥터는 Spark와 함께 Spanner Java 라이브러리를 사용하여 Spanner 데이터베이스에서 데이터를 읽습니다. Spanner 커넥터는 Spanner 테이블 및 그래프를 Spark DataFrames 및 GraphFrames로 읽을 수 있도록 지원합니다.
비용
이 문서에서는 비용이 청구될 수 있는 Google Cloud구성요소( )를 사용합니다.
- Dataproc
- Spanner
- Cloud Storage
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다.
시작하기 전에
이 튜토리얼에서 Spanner 커넥터를 사용하기 전에 Dataproc 클러스터 및 Spanner 인스턴스와 데이터베이스를 설정하세요.
Dataproc 클러스터 설정
Dataproc 클러스터를 만들거나 다음 설정이 포함된 기존 Dataproc 클러스터를 사용합니다.
VM 서비스 계정 권한. 클러스터 VM 서비스 계정에 적합한 Spanner 권한이 설정되어 있어야 합니다. 또한 Data Boost(Data Boost는 Spanner 테이블 내보내기의 예시 코드에서 사용 설정되어 있음)를 사용할 경우에는 VM 서비스 계정에 필요한 Data Boost IAM 권한이 있어야 합니다.
액세스 범위.
cloud-platform
범위 또는 적절한spanner
범위를 사용 설정하여 클러스터를 만들어야 합니다.cloud-platform
범위는 이미지 버전 2.1 이상으로 생성된 클러스터에 대해 기본적으로 사용 설정됩니다.다음 안내에서는 Google Cloud 콘솔, gcloud CLI, Dataproc API를 사용하여 클러스터 만들기 요청을 수행할 때
cloud-platform
범위를 설정하는 방법을 보여줍니다. 더 자세한 클러스터 만들기 안내를 보려면 클러스터 만들기를 참조하세요.Google Cloud 콘솔
- Google Cloud 콘솔에서 Dataproc 클러스터 만들기 페이지를 엽니다.
- 프로젝트 액세스 섹션의 보안 관리 섹션에서 "이 클러스터의 클라우드 플랫폼 범위 사용 설정"을 클릭합니다.
- 다른 클러스터 만들기 필드를 채우거나 확인한 후 만들기를 클릭합니다.
gcloud CLI
다음
gcloud dataproc clusters create
명령어를 실행하여cloud-platform
범위가 사용 설정된 클러스터를 만듭니다.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
clusters.create 요청의 일부로 GceClusterConfig.serviceAccountScopes를 지정할 수 있습니다.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정
Singers
테이블을 포함하는 데이터베이스가 있는 Spanner 인스턴스를 만듭니다. Spanner 인스턴스 ID와 데이터베이스 ID를 기록해 둡니다.
Spark에서 Spanner 커넥터 사용
Spanner 커넥터는 Spark 버전 3.1+
에서 사용 가능합니다.
Dataproc 클러스터에 작업을 제출할 때 Cloud Storage 커넥터 JAR 파일 사양에 따라 커넥터 버전을 지정합니다.
예시: Spanner 커넥터를 사용하는 gcloud CLI Spark 작업 제출
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
다음을 바꿉니다.
CONNECTOR_VERSION: Spanner 커넥터 버전.
GitHub GoogleCloudDataproc/spark-spanner-connector
저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.
Spanner 테이블 읽기
Python 또는 Scala를 사용해서 Spark 데이터 소스 API를 통해 Spanner 테이블 데이터를 Spark Dataframe으로 읽어올 수 있습니다.
PySpark
Dataproc 서비스에 작업을 제출하거나 클러스터 마스터 노드의 spark-submit
REPL에서 작업을 실행하여 클러스터에서 이 섹션의 PySpark 코드 예시를 실행할 수 있습니다.
Dataproc 작업
- 로컬 텍스트 편집기를 사용하거나 Cloud Shell에서 미리 설치된
vi
,vim
,nano
텍스트 편집기를 사용하여singers.py
파일을 만듭니다. - 자리표시자 변수를 채운 후 다음 코드를
singers.py
파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
다음을 바꿉니다.
- PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드의 프로젝트 정보 섹션에 나열됩니다.
- INSTANCE_ID, DATABASE_ID, TABLE_NAME :
Singers
데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참고하세요.
singers.py
파일을 저장합니다.- Google Cloud 콘솔, gcloud CLI, Dataproc API를 사용하여 Dataproc 서비스에 작업을 제출합니다.
예시: Spanner 커넥터를 사용하는 gcloud CLI 작업 제출
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
다음을 바꿉니다.
- CLUSTER_NAME: 새 클러스터의 이름
- REGION: 워크로드를 실행하는 데 사용할 수 있는 Compute Engine 리전
- CONNECTOR_VERSION: Spanner 커넥터 버전.
GitHub
GoogleCloudDataproc/spark-spanner-connector
저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.
spark-submit 작업
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 다음 클러스터 이름을 클릭합니다.
- 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 후 클러스터 마스터 노드 이름 오른쪽에 있는
SSH
를 클릭합니다.마스터 노드의 홈 디렉터리에 브라우저 창이 열립니다.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 사전 설치된
vi
,vim
,nano
텍스트 편집기를 사용하여 마스터 노드에서singers.py
파일을 만듭니다.- 다음 코드를
singers.py
파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
다음을 바꿉니다.
- PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드의 프로젝트 정보 섹션에 나열됩니다.
- INSTANCE_ID, DATABASE_ID, TABLE_NAME:
Singers
데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참고하세요.
singers.py
파일을 저장합니다.
- 다음 코드를
spark-submit
을 사용해서singers.py
를 실행하여 SpannerSingers
테이블을 만듭니다.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
다음을 바꿉니다.
- CONNECTOR_VERSION: Spanner 커넥터 버전.
GitHub
GoogleCloudDataproc/spark-spanner-connector
저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.
출력은 다음과 같습니다.
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Spanner 커넥터 버전.
GitHub
Scala
클러스터에서 Scala 코드 예시를 실행하려면 다음 단계를 수행합니다.
- SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 다음 클러스터 이름을 클릭합니다.
- 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 후 클러스터 마스터 노드 이름 오른쪽에 있는
SSH
를 클릭합니다.마스터 노드의 홈 디렉터리에 브라우저 창이 열립니다.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 사전 설치된
vi
,vim
,nano
텍스트 편집기를 사용하여 마스터 노드에서singers.scala
파일을 만듭니다.- 다음 코드를
singers.scala
파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
다음을 바꿉니다.
- PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드의 프로젝트 정보 섹션에 나열됩니다.
- INSTANCE_ID, DATABASE_ID, TABLE_NAME:
Singers
데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참고하세요.
singers.scala
파일을 저장합니다.
- 다음 코드를
spark-shell
REPL을 실행합니다.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
다음을 바꿉니다.
CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub
GoogleCloudDataproc/spark-spanner-connector
저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.:load singers.scala
명령어로singers.scala
를 실행하여 SpannerSingers
테이블을 만듭니다. 출력 목록에는 Singers 출력의 예시가 표시됩니다.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Spanner 그래프 읽기
Spanner 커넥터에서는 GraphFrames
로 직접 내보내기는 물론 개별 노드 및 에지 DataFrames로 그래프 내보내기가 지원됩니다.
다음 예시에서는 Spanner를 GraphFrame
으로 내보냅니다.
여기에서는 Spanner 커넥터 jar에 포함된 Python SpannerGraphConnector
클래스를 사용하여 Spanner Graph를 읽습니다.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
다음을 바꿉니다.
- CONNECTOR_VERSION: Spanner 커넥터 버전.
GitHub
GoogleCloudDataproc/spark-spanner-connector
저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다. - PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드의 프로젝트 정보 섹션에 나열됩니다.
- INSTANCE_ID, DATABASE_ID, TABLE_NAME: 인스턴스, 데이터베이스, 그래프 ID를 삽입합니다.
GraphFrames 대신 노드 및 에지 DataFrames
를 내보내려면 load_dfs
를 사용하세요.
df_vertices, df_edges, df_id_map = connector.load_dfs()
삭제
Google Cloud 계정에 비용이 계속 청구되지 않도록 하려면 Dataproc 클러스터를 중지하거나 삭제하고 Spanner 인스턴스를 삭제하면 됩니다.
다음 단계
pyspark.sql.DataFrame
예시 참고하기- Spark DataFrame 언어 지원은 다음을 참고하기
- GitHub의 Spark Spanner Connector 저장소 참고하기
- Spark 작업 미세 조정 팁 참고하기