Apache Spark로 Cloud Storage 커넥터 사용


이 튜토리얼은 Apache Spark에서 Cloud Storage 커넥터를 사용하는 예시 코드를 실행하는 방법을 보여줍니다.

목표

자바, Scala, Python에서 간단한 WordCount Spark 작업을 작성한 다음 Dataproc 클러스터에서 작업을 실행합니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Compute Engine
  • Dataproc
  • Cloud Storage

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

아래 단계에 따라 이 튜토리얼에서 코드 실행을 준비하세요.

  1. 프로젝트를 설정합니다. 필요한 경우 로컬 머신에 Dataproc, Compute Engine 및 Cloud Storage API가 사용 설정되고 Google Cloud CLI가 설치된 상태로 프로젝트를 설정합니다.

    1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
    2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

      프로젝트 선택기로 이동

    3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

    4. API Dataproc, Compute Engine, and Cloud Storage 사용 설정

      API 사용 설정

    5. 서비스 계정을 만듭니다.

      1. Google Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

        서비스 계정 만들기로 이동
      2. 프로젝트를 선택합니다.
      3. 서비스 계정 이름 필드에 이름을 입력합니다. Google Cloud 콘솔은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

        서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

      4. 만들고 계속하기를 클릭합니다.
      5. 서비스 계정에 Project > Owner 역할을 부여합니다.

        역할을 부여하려면 역할 선택 목록을 찾은 후 Project > Owner을 선택합니다.

      6. 계속을 클릭합니다.
      7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

        브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

    6. 서비스 계정 키 만들기

      1. Google Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
      2. 를 클릭합니다.
      3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
      4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
      5. 닫기를 클릭합니다.
    7. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 사용자 인증 정보가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로 새 세션을 열면 변수를 다시 설정합니다.

    8. Google Cloud CLI를 설치합니다.
    9. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

      gcloud init
    10. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

      프로젝트 선택기로 이동

    11. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

    12. API Dataproc, Compute Engine, and Cloud Storage 사용 설정

      API 사용 설정

    13. 서비스 계정을 만듭니다.

      1. Google Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

        서비스 계정 만들기로 이동
      2. 프로젝트를 선택합니다.
      3. 서비스 계정 이름 필드에 이름을 입력합니다. Google Cloud 콘솔은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

        서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

      4. 만들고 계속하기를 클릭합니다.
      5. 서비스 계정에 Project > Owner 역할을 부여합니다.

        역할을 부여하려면 역할 선택 목록을 찾은 후 Project > Owner을 선택합니다.

      6. 계속을 클릭합니다.
      7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

        브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

    14. 서비스 계정 키 만들기

      1. Google Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
      2. 를 클릭합니다.
      3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
      4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
      5. 닫기를 클릭합니다.
    15. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 사용자 인증 정보가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로 새 세션을 열면 변수를 다시 설정합니다.

    16. Google Cloud CLI를 설치합니다.
    17. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

      gcloud init

  2. Cloud Storage 버킷을 만듭니다. 튜토리얼 데이터를 보관하려면 Cloud Storage가 필요합니다. 사용할 준비가 되지 않은 경우 프로젝트에서 새 버킷을 만듭니다.

    1. Google Cloud Console에서 Cloud Storage 버킷 페이지로 이동합니다.

      버킷 페이지로 이동

    2. 버킷 만들기를 클릭합니다.
    3. 버킷 만들기 페이지에서 버킷 정보를 입력합니다. 다음 단계로 이동하려면 계속을 클릭합니다.
      • 버킷 이름 지정에서 버킷 이름 지정 요구사항을 충족하는 이름을 입력합니다.
      • 데이터를 저장할 위치 선택에서 다음을 수행합니다.
        • 위치 유형 옵션을 선택합니다.
        • 위치 옵션을 선택합니다.
      • 데이터의 기본 스토리지 클래스 선택에서 스토리지 클래스를 선택합니다.
      • 객체 액세스를 제어하는 방식 선택에서 액세스 제어 옵션을 선택합니다.
      • 고급 설정(선택사항)에서 암호화 방법, 보관 정책 또는 버킷 라벨을 지정합니다.
    4. 만들기를 클릭합니다.

  3. 로컬 환경 변수를 설정합니다. 로컬 머신에 환경 변수를 설정합니다. 이 튜토리얼에 사용할 Google Cloud 프로젝트 ID와 Cloud Storage 버킷의 이름을 설정합니다. 또한 기존 또는 새 Dataproc 클러스터의 이름과 리전을 입력합니다. 다음 단계에서 이 튜토리얼에서 사용할 클러스터를 만들 수 있습니다.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Dataproc 클러스터를 만듭니다. 아래 명령어를 실행하여 지정된 Compute Engine 영역에서 단일 노드 Dataproc 클러스터를 만듭니다.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Cloud Storage 버킷에 공개 데이터를 복사합니다. 공개 데이터 Shakespeare 텍스트 스니펫을 Cloud Storage 버킷의 input 폴더에 복사합니다.

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. 자바(Apache Maven), Scala(SBT), Python 개발 환경을 설정합니다.

Spark WordCount 작업 준비

아래의 탭을 선택하여 단계에 따라 클러스터에 제출할 작업 패키지 또는 파일을 준비합니다. 다음 작업 유형 중 하나를 준비할 수 있습니다.

자바

  1. pom.xml 파일을 로컬 머신에 복사합니다. 다음 pom.xml 파일은 Scala 및 Spark 라이브러리 종속 항목을 지정합니다. 이는 Dataproc 클러스터가 런타임 시 이러한 라이브러리를 제공함을 나타내기 위해 provided 범위를 제공합니다. 커넥터가 표준 HDFS 인터페이스를 구현하므로 pom.xml 파일은 Cloud Storage 종속 항목을 지정하지 않습니다. Spark 작업이 Cloud Storage 클러스터 파일(gs://로 시작하는 URI가 있는 파일)에 액세스할 때, 시스템이 자동으로 Cloud Storage 커넥터를 사용하여 Cloud Storage의 파일에 액세스합니다.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. 아래 나열된 WordCount.java 코드를 로컬 머신에 복사합니다.
    1. src/main/java/dataproc/codelab 경로를 사용하여 디렉터리 집합을 만듭니다.
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java를 로컬 머신의 src/main/java/dataproc/codelab에 복사합니다.
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java는 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 자바의 간단한 Spark 작업입니다.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. 패키지를 빌드합니다.
    mvn clean package
    
    성공적으로 빌드되면 target/word-count-1.0.jar이 생성됩니다.
  4. 패키지를 Cloud Storage로 스테이징합니다.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 파일을 로컬 머신에 복사합니다. 다음 build.sbt 파일은 Scala 및 Spark 라이브러리 종속 항목을 지정합니다. 이는 Dataproc 클러스터가 런타임 시 이러한 라이브러리를 제공함을 나타내기 위해 provided 범위를 제공합니다. 커넥터가 표준 HDFS 인터페이스를 구현하므로 build.sbt 파일은 Cloud Storage 종속 항목을 지정하지 않습니다. Spark 작업이 Cloud Storage 클러스터 파일(gs://로 시작하는 URI가 있는 파일)에 액세스할 때, 시스템이 자동으로 Cloud Storage 커넥터를 사용하여 Cloud Storage의 파일에 액세스합니다.
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. word-count.scala를 로컬 머신에 복사합니다. 이는 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를Cloud Storage에 쓰는 자바의 간단한 Spark 작업입니다.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. 패키지를 빌드합니다.
    sbt clean package
    
    성공적으로 빌드되면 target/scala-2.11/word-count_2.11-1.0.jar이 생성됩니다.
  4. 패키지를 Cloud Storage로 스테이징합니다.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py를 로컬 머신에 복사합니다. 이는 PySpark를 사용하여 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Python의 간단한 Spark 작업입니다.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

작업 제출

다음 gcloud 명령어를 실행하여 WordCount 작업을 Dataproc 클러스터에 제출합니다.

자바

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

출력 보기

작업이 완료되면 다음 gcloud CLI gsutil 명령어를 실행하여 WordCount 출력을 확인합니다.

gsutil cat gs://${BUCKET_NAME}/output/*

WordCount 출력은 다음과 비슷하게 표시됩니다.

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

삭제

튜토리얼을 완료한 후에는 만든 리소스를 삭제하여 할당량 사용을 중지하고 요금이 청구되지 않도록 할 수 있습니다. 다음 섹션은 이러한 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. Google Cloud 콘솔에서 리소스 관리 페이지로 이동합니다.

    리소스 관리로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

Dataproc 클러스터 삭제

프로젝트를 삭제하는 대신 프로젝트 내의 클러스터만 삭제할 수 있습니다.

Cloud Storage 버킷 삭제

Google Cloud Console

  1. Google Cloud 콘솔에서 Cloud Storage 버킷 페이지로 이동합니다.

    버킷으로 이동

  2. 삭제할 버킷의 체크박스를 클릭합니다.
  3. 버킷을 삭제하려면 삭제를 클릭한 후 안내를 따르세요.

명령줄

    버킷을 삭제합니다.
    gcloud storage buckets delete BUCKET_NAME

다음 단계