Spark Spanner 커넥터 사용

이 페이지에서는 Dataproc 클러스터를 만드는 방법을 보여줍니다. 이 클러스터는 Spark Spanner 커넥터를 활용해 Apache SparkSpanner에서 데이터를 읽습니다.

Spanner 커넥터는 Spark와 함께 Spanner Java 라이브러리를 사용하여 Spanner 데이터베이스에서 데이터를 읽습니다. Spanner 커넥터는 Spanner 테이블그래프를 Spark DataFramesGraphFrames로 읽을 수 있도록 지원합니다.

비용

이 문서에서는 비용이 청구될 수 있는 Google Cloud구성요소( )를 사용합니다.

  • Dataproc
  • Spanner
  • Cloud Storage

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다.

Google Cloud 신규 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

이 튜토리얼에서 Spanner 커넥터를 사용하기 전에 Dataproc 클러스터Spanner 인스턴스와 데이터베이스를 설정하세요.

Dataproc 클러스터 설정

Dataproc 클러스터를 만들거나 다음 설정이 포함된 기존 Dataproc 클러스터를 사용합니다.

  • VM 서비스 계정 권한. 클러스터 VM 서비스 계정에 적합한 Spanner 권한이 설정되어 있어야 합니다. 또한 Data Boost(Data Boost는 Spanner 테이블 내보내기의 예시 코드에서 사용 설정되어 있음)를 사용할 경우에는 VM 서비스 계정에 필요한 Data Boost IAM 권한이 있어야 합니다.

  • 액세스 범위. cloud-platform 범위 또는 적절한 spanner 범위를 사용 설정하여 클러스터를 만들어야 합니다. cloud-platform 범위는 이미지 버전 2.1 이상으로 생성된 클러스터에 대해 기본적으로 사용 설정됩니다.

    다음 안내에서는 Google Cloud 콘솔, gcloud CLI, Dataproc API를 사용하여 클러스터 만들기 요청을 수행할 때 cloud-platform 범위를 설정하는 방법을 보여줍니다. 더 자세한 클러스터 만들기 안내를 보려면 클러스터 만들기를 참조하세요.

    Google Cloud 콘솔

    1. Google Cloud 콘솔에서 Dataproc 클러스터 만들기 페이지를 엽니다.
    2. 프로젝트 액세스 섹션의 보안 관리 섹션에서 "이 클러스터의 클라우드 플랫폼 범위 사용 설정"을 클릭합니다.
    3. 다른 클러스터 만들기 필드를 채우거나 확인한 후 만들기를 클릭합니다.

    gcloud CLI

    다음 gcloud dataproc clusters create 명령어를 실행하여 cloud-platform 범위가 사용 설정된 클러스터를 만듭니다.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    clusters.create 요청의 일부로 GceClusterConfig.serviceAccountScopes를 지정할 수 있습니다.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정

Singers 테이블을 포함하는 데이터베이스가 있는 Spanner 인스턴스를 만듭니다. Spanner 인스턴스 ID와 데이터베이스 ID를 기록해 둡니다.

Spark에서 Spanner 커넥터 사용

Spanner 커넥터는 Spark 버전 3.1+에서 사용 가능합니다. Dataproc 클러스터에 작업을 제출할 때 Cloud Storage 커넥터 JAR 파일 사양에 따라 커넥터 버전을 지정합니다.

예시: Spanner 커넥터를 사용하는 gcloud CLI Spark 작업 제출

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

다음을 바꿉니다.

CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

Spanner 테이블 읽기

Python 또는 Scala를 사용해서 Spark 데이터 소스 API를 통해 Spanner 테이블 데이터를 Spark Dataframe으로 읽어올 수 있습니다.

PySpark

Dataproc 서비스에 작업을 제출하거나 클러스터 마스터 노드의 spark-submit REPL에서 작업을 실행하여 클러스터에서 이 섹션의 PySpark 코드 예시를 실행할 수 있습니다.

Dataproc 작업

  1. 로컬 텍스트 편집기를 사용하거나 Cloud Shell에서 미리 설치된 vi, vim, nano 텍스트 편집기를 사용하여 singers.py 파일을 만듭니다.
    1. 자리표시자 변수를 채운 후 다음 코드를 singers.py 파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      다음을 바꿉니다.

      1. PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
      2. INSTANCE_ID, DATABASE_ID, TABLE_NAME : Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참고하세요.
    2. singers.py 파일을 저장합니다.
  2. Google Cloud 콘솔, gcloud CLI, Dataproc API를 사용하여 Dataproc 서비스에 작업을 제출합니다.

    예시: Spanner 커넥터를 사용하는 gcloud CLI 작업 제출

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    다음을 바꿉니다.

    1. CLUSTER_NAME: 새 클러스터의 이름
    2. REGION: 워크로드를 실행하는 데 사용할 수 있는 Compute Engine 리전
    3. CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

spark-submit 작업

  1. SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
    1. Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 다음 클러스터 이름을 클릭합니다.
    2. 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 후 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.
       Google Cloud 콘솔에서 클러스터 마스터 노드에 연결하는 데 사용되는 SSH 버튼이 표시된 Dataproc 클러스터 세부정보 페이지를 보여주는 스크린샷입니다.

      마스터 노드의 홈 디렉터리에 브라우저 창이 열립니다.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 사전 설치된 vi, vim, nano 텍스트 편집기를 사용하여 마스터 노드에서 singers.py 파일을 만듭니다.
    1. 다음 코드를 singers.py 파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      다음을 바꿉니다.

      1. PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
      2. INSTANCE_ID, DATABASE_ID, TABLE_NAME: Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참고하세요.
    2. singers.py 파일을 저장합니다.
  3. spark-submit을 사용해서 singers.py를 실행하여 Spanner Singers 테이블을 만듭니다.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    다음을 바꿉니다.

    1. CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

    출력은 다음과 같습니다.

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

클러스터에서 Scala 코드 예시를 실행하려면 다음 단계를 수행합니다.

  1. SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
    1. Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 다음 클러스터 이름을 클릭합니다.
    2. 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 후 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.  Google Cloud 콘솔의 Dataproc 클러스터 세부정보 페이지

      마스터 노드의 홈 디렉터리에 브라우저 창이 열립니다.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 사전 설치된 vi, vim, nano 텍스트 편집기를 사용하여 마스터 노드에서 singers.scala 파일을 만듭니다.
    1. 다음 코드를 singers.scala 파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      다음을 바꿉니다.

      1. PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
      2. INSTANCE_ID, DATABASE_ID, TABLE_NAME: Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참고하세요.
    2. singers.scala 파일을 저장합니다.
  3. spark-shell REPL을 실행합니다.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    다음을 바꿉니다.

    CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

  4. :load singers.scala 명령어로 singers.scala를 실행하여 Spanner Singers 테이블을 만듭니다. 출력 목록에는 Singers 출력의 예시가 표시됩니다.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Spanner 그래프 읽기

Spanner 커넥터에서는 GraphFrames로 직접 내보내기는 물론 개별 노드 및 에지 DataFrames로 그래프 내보내기가 지원됩니다.

다음 예시에서는 Spanner를 GraphFrame으로 내보냅니다. 여기에서는 Spanner 커넥터 jar에 포함된 Python SpannerGraphConnector 클래스를 사용하여 Spanner Graph를 읽습니다.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

다음을 바꿉니다.

  • CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.
  • PROJECT_ID: Google Cloud 프로젝트 ID. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
  • INSTANCE_ID, DATABASE_ID, TABLE_NAME: 인스턴스, 데이터베이스, 그래프 ID를 삽입합니다.

GraphFrames 대신 노드 및 에지 DataFrames를 내보내려면 load_dfs를 사용하세요.

df_vertices, df_edges, df_id_map = connector.load_dfs()

삭제

Google Cloud 계정에 비용이 계속 청구되지 않도록 하려면 Dataproc 클러스터를 중지하거나 삭제하고 Spanner 인스턴스를 삭제하면 됩니다.

다음 단계