Spark에서 Spanner 커넥터 사용

이 페이지에서는 Spark Spanner 커넥터를 사용하여 Apache Spark를 통해 Spanner에서 데이터를 읽는 방법을 보여줍니다.

비용 계산

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataproc
  • Spanner
  • Cloud Storage

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

튜토리얼을 실행하기 전에 커넥터 버전을 확인하고 커넥터 URI를 가져옵니다.

커넥터 JAR 파일 URI 지정 방법

Spark Spanner 커넥터 버전은 GitHub GoogleCloudDataproc/spark-spanner-connector 저장소에 나열되어 있습니다.

다음 URI 문자열에서 커넥터 버전 정보를 대체하여 커넥터 JAR 파일을 지정합니다.

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

커넥터는 Spark 버전 3.1+에서 사용 가능합니다.

gcloud CLI 예시:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Spanner 데이터베이스 준비

Spanner 테이블이 없으면 튜토리얼에 따라 Spanner 테이블을 만들 수 있습니다. 그러면 인스턴스 ID, 데이터베이스 ID, 테이블 Singers가 포함됩니다.

Dataproc 클러스터 만들기

커넥터를 사용하는 모든 Dataproc 클러스터에는 spanner 또는 cloud-platform 범위가 필요합니다. Dataproc 클러스터에는 이미지 2.1 이상에 대한 기본 범위 cloud-platform이 포함됩니다. 이전 버전을 사용하는 경우 Google Cloud 콘솔, Google Cloud CLI, Dataproc API를 사용하여 Dataproc 클러스터를 만들 수 있습니다.

Console

  1. Google Cloud 콘솔에서 Dataproc 클러스터 만들기 페이지를 엽니다.
  2. '보안 관리' 탭의 '프로젝트 액세스' 섹션에서 '이 클러스터에 cloud-platform 범위 사용 설정'을 클릭합니다.
  3. 다른 클러스터 만들기 필드를 채우거나 확인한 후 '만들기'를 클릭합니다.

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

clusters.create 요청의 일부로 GceClusterConfig.serviceAccountScopes를 지정할 수 있습니다. 예를 들면 다음과 같습니다.
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

해당 Spanner 권한Dataproc VM 서비스 계정에 할당되어 있는지 확인해야 합니다. 튜토리얼에서 Data Boost를 사용하는 경우 Data Boost IAM 권한을 참조하세요.

Spanner에서 데이터 읽기

Scala 및 Python을 사용하여 Spark 데이터 소스 API를 통해 Spanner의 데이터를 Spark Dataframe에서 읽을 수 있습니다.

Scala

  1. 코드를 검사하고 [projectId], [instanceId], [databaseId], [table] 자리표시자를 이전에 만든 프로젝트 ID, 인스턴스 ID, 데이터베이스 ID, 테이블로 바꿉니다. enableDataBoost 옵션에서 기본 Spanner 인스턴스에 거의 영향을 주지 않는 Spanner Data Boost 기능을 사용 설정합니다.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. 클러스터에서 코드 실행
    1. SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
      1. Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
        Cloud 콘솔의 Dataproc 클러스터 페이지
      2. >클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.
        Cloud 콘솔의 Dataproc 클러스터 세부정보 페이지

        마스터 노드의 홈 디렉터리에서 브라우저 창이 열립니다.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 사전 설치된 vi, vim, nano 텍스트 편집기로 singers.scala을 만든 다음 Scala 코드 목록에서 Scala 코드에 붙여넣습니다.
      nano singers.scala
        
    3. spark-shell REPL을 실행합니다.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. :load singers.scala 명령어로 singers.scala를 실행하여 Spanner Singers 테이블을 만듭니다. 출력 목록에는 Singers 출력의 예시가 표시됩니다.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. 코드를 검사하고 [projectId], [instanceId], [databaseId], [table] 자리표시자를 이전에 만든 프로젝트 ID, 인스턴스 ID, 데이터베이스 ID, 테이블로 바꿉니다. enableDataBoost 옵션에서 기본 Spanner 인스턴스에 거의 영향을 주지 않는 Spanner Data Boost 기능을 사용 설정합니다.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. 클러스터에서 코드 실행
    1. SSH를 사용하여 Dataproc 클러스터 마스터 노드에 연결합니다.
      1. Google Cloud 콘솔에서 Dataproc 클러스터 페이지로 이동한 후 클러스터 이름을 클릭합니다.
        Cloud 콘솔의 클러스터 페이지
      2. 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 다음 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.
        Cloud 콘솔의 클러스터 세부정보 페이지에 있는 클러스터 이름 행에서 SSH를 선택합니다.

        기본 노드의 홈 디렉터리에서 브라우저 창이 열립니다.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 사전 설치된 vi , vim, nano 텍스트 편집기로 singers.py을 만든 다음 PySpark 코드 목록에서 PySpark 코드를 붙여넣습니다.
      nano singers.py
      
    3. spark-submit으로 singers.py를 실행하여 Spanner Singers 테이블을 만듭니다.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      출력은 다음과 같습니다.
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

삭제

이 둘러보기에서 생성한 리소스에 대한 비용이 Google Cloud 계정에 청구되지 않게 하려면 다음 단계를 수행합니다.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

자연어 처리와 디코더-인코더,