Dataproc 및 Apache Spark를 사용하는 Monte Carlo 방식


DataprocApache Spark는 자바, Python 또는 Scala로 작성된 Monte Carlo 시뮬레이션을 실행하는 데 사용할 수 있는 인프라와 용량을 제공합니다.

Monte Carlo 방법은 비즈니스, 엔지니어링, 과학, 수학, 기타 분야에서 광범위한 질문에 답하는 데 도움이 됩니다. Monte Carlo 시뮬레이션은 반복되는 무작위 샘플링을 사용하여 변수의 확률 분포를 작성함으로써 다른 방식으로는 답을 얻을 수 없는 질문에 답을 제공할 수 있습니다. 예를 들어 금융 분야에서 주식 옵션의 가격을 책정하려면 시간의 경과에 따라 주식 가격이 달라지는 수천 가지 방식을 분석해야 합니다. Monte Carlo 방법은 광범위한 예상 결과를 기반으로 이러한 주가 변동을 시뮬레이션하는 방법을 제공하는 동시에 문제의 가능한 입력값 영역을 통제할 수 있습니다.

과거에는 수천 번의 시뮬레이션을 실행하는 데 오랜 시간이 걸리고 많은 비용이 발생했습니다. 하지만 이제 Dataproc을 사용하면 필요에 따라 용량을 프로비저닝하고 분 단위로 비용을 지불할 수 있습니다. 또한 Apache Spark를 사용하면 수십, 수백 또는 수천 개의 서버 클러스터를 사용하여 직관적이고 필요에 맞게 확장 가능한 방식으로 시뮬레이션을 실행할 수 있습니다. 즉, 더 많은 시뮬레이션을 보다 신속하게 실행할 수 있으므로 비즈니스를 더욱 빠르게 혁신하고 위험을 보다 효과적으로 관리할 수 있습니다.

금융 데이터를 다룰 때는 항상 보안이 중요합니다. Dataproc은 Google Cloud에서 실행되며 여러 가지 방식으로 데이터를 안전하게 보호하고 비공개로 유지합니다. 예를 들어 모든 데이터는 전송 및 미사용 시 암호화되며 Google Cloud는 ISO 27001, SOC3, PCI 규정을 준수합니다.

목표

  • 사전 설치된 Apache Spark로 관리형 Dataproc 클러스터를 만듭니다.
  • Python을 사용하여 시간 경과에 따라 주식 포트폴리오 성장을 예측하는 Monte Carlo 시뮬레이션을 실행합니다.
  • Scala를 사용하여 카지노에서 돈을 버는 방법을 시뮬레이션하는 Monte Carlo 시뮬레이션을 실행합니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  • Google Cloud 프로젝트 설정
    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

    4. Enable the Dataproc and Compute Engine APIs.

      Enable the APIs

    5. Install the Google Cloud CLI.
    6. To initialize the gcloud CLI, run the following command:

      gcloud init
    7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    8. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

    9. Enable the Dataproc and Compute Engine APIs.

      Enable the APIs

    10. Install the Google Cloud CLI.
    11. To initialize the gcloud CLI, run the following command:

      gcloud init

Dataproc 클러스터 만들기

단계에 따라 Google Cloud Console에서 Dataproc 클러스터를 만듭니다. 이 가이드에는 워커 노드 두 개가 포함된 기본 클러스터 설정이면 충분합니다.

경고 로깅 사용 중지

기본적으로 Apache Spark는 콘솔 창에 자세한 로깅을 표시합니다. 이 가이드에서는 오류만 로깅하도록 로깅 수준을 변경합니다. 다음 단계를 따르세요.

ssh를 사용하여 Dataproc 클러스터의 기본 노드에 연결

Dataproc 클러스터의 기본 노드는 VM 이름에 -m 서픽스가 있습니다.

  1. Google Cloud 콘솔에서 VM 인스턴스 페이지로 이동합니다.

    VM 인스턴스로 이동

  2. 가상 머신 인스턴스 목록에서 연결할 인스턴스 행의 SSH를 클릭합니다.

    인스턴스 이름 옆에 있는 SSH 버튼

기본 노드에 연결된 SSH 창이 열립니다.

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

로깅 설정 변경

  1. 기본 노드의 홈 디렉터리에서 /etc/spark/conf/log4j.properties를 수정합니다.

    sudo nano /etc/spark/conf/log4j.properties
    
  2. log4j.rootCategoryERROR로 설정합니다.

    # Set only errors to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
  3. 변경사항을 저장하고 편집기를 종료합니다. 상세 로깅을 다시 사용하도록 설정하려면 .rootCategory의 값을 원래의 (INFO) 값으로 복원하여 변경사항을 되돌립니다.

Spark 프로그래밍 언어

Spark는 Python, Scala, 자바를 독립 실행형 애플리케이션의 프로그래밍 언어로 지원하고, Python 및 Scala를 위한 대화형 인터프리터를 제공합니다. 개인 선호도에 따라 언어를 선택할 수 있습니다. 이 가이드에서는 대화형 인터프리터를 사용합니다. 코드를 변경하고 다양한 입력 값을 사용한 후 결과를 보는 방식으로 실험할 수 있기 때문입니다.

포트폴리오 성장 추정

금융 분야에서 Monte Carlo 방법은 때때로 투자 성과가 어떻게 될지 예측해보는 시뮬레이션을 실행하는 데 사용됩니다. Monte Carlo 시뮬레이션은 가능성이 있는 일정 범위의 시장 조건에 대해 무작위로 결과 표본을 산출함으로써 평균적인 또는 최악의 시나리오에서 포트폴리오 성과가 어떻게 나올 수 있는지에 대한 답을 제공합니다.

Monte Carlo 방법을 사용하여 몇 가지 일반적인 시장 요인을 기반으로 금융 투자의 성장을 예측하는 시뮬레이션을 작성하려면 다음 단계를 따르세요.

  1. Dataproc 기본 노드에서 Python 인터프리터를 시작합니다.

    pyspark
    

    Spark 프롬프트 >>>을 기다립니다.

  2. 다음 코드를 입력합니다. 함수 정의에서 들여쓰기를 유지해야 합니다.

    import random
    import time
    from operator import add
    
    def grow(seed):
        random.seed(seed)
        portfolio_value = INVESTMENT_INIT
        for i in range(TERM):
            growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
            portfolio_value += portfolio_value * growth + INVESTMENT_ANN
        return portfolio_value
    
  3. Spark 프롬프트가 다시 나타날 때까지 return을 누릅니다.

    앞의 코드는 주식 시장에 투자하는 은퇴 계좌를 보유한 투자자가 이 계좌에 매년 자금을 추가하는 경우에 발생 가능한 결과를 모델링하는 함수를 정의합니다. 이 함수는 지정한 기간 동안 매년 무작위의 투자 수익을 백분율로 생성합니다. 이 함수는 시드 값을 매개변수로 사용합니다. 이 값은 난수 생성기의 시드 값을 새로 설정하는 데 사용되며 그러면 난수 생성기가 실행될 때마다 이 함수가 동일한 난수 목록을 받지 않습니다. random.normalvariate 함수는 지정된 평균 및 표준 편차의 정규 분포 범위에서 난수가 골고루 발생하도록 합니다. 이 함수는 포트폴리오 값을 양수 또는 음수의 증감액만큼 늘리고 추가 투자를 나타내는 연간 합계를 더합니다.

    다음 단계에서 필요한 상수를 정의합니다.

  4. 함수에 대입할 시드를 많이 만듭니다. Spark 프롬프트에서 다음 코드를 입력하면 시드 10,000개가 생성됩니다.

    seeds = sc.parallelize([time.time() + i for i in range(10000)])
    

    parallelize 연산 결과는 탄력적 분산 데이터 세트(RDD)로, 이는 병렬 처리에 최적화된 요소 모음입니다. 이 경우 RDD에는 현재 시스템 시간을 기반으로 하는 시드가 포함됩니다.

    RDD를 만들 때 Spark는 사용 가능한 작업자 및 코어의 수에 따라 데이터를 분할합니다. 이 경우 Spark는 각 코어에 하나씩 총 8개의 슬라이스를 사용하기로 선택합니다. 데이터 항목이 10,000개인 이 시뮬레이션에서는 문제가 없습니다. 더 큰 시뮬레이션의 경우 각 슬라이스가 기본 제한보다 클 수 있습니다. 이 경우 두 번째 매개변수를 parallelize로 지정하면 슬라이스 수가 증가하여 각 슬라이스의 크기를 관리하기 쉽게 유지하는 데 도움이 되는 한편 Spark는 여전히 8개의 코어를 모두 활용할 수 있습니다.

  5. 시드가 포함된 RDD를 증가 함수에 대입합니다.

    results = seeds.map(grow)
    

    map 메서드는 RDD의 각 시드를 grow 함수에 전달하고 각 결과를 새로운 RDD에 추가합니다. 결과는 results에 저장됩니다. 변환을 수행하는 이 연산은 결과를 바로 생성하지 않습니다. Spark는 결과가 필요할 때까지 이 작업을 수행하지 않습니다. 이러한 지연 계산(lazy evaluation)으로 인해 상수를 정의하지 않고 코드를 입력할 수 있습니다.

  6. 함수에 사용할 값을 지정합니다.

    INVESTMENT_INIT = 100000  # starting amount
    INVESTMENT_ANN = 10000  # yearly new investment
    TERM = 30  # number of years
    MKT_AVG_RETURN = 0.11 # percentage
    MKT_STD_DEV = 0.18  # standard deviation
    
  7. reduce를 호출하여 RDD 값을 집계합니다. 다음 코드를 입력하여 RDD 결과를 합산합니다.

    sum = results.reduce(add)
    
  8. 평균 수익을 추정하고 표시합니다.

    print (sum / 10000.)
    

    끝에 점(.) 문자를 포함시켜야 합니다. 이 점은 부동 소수점 연산을 나타냅니다.

  9. 이제 가정을 변경하고 결과가 어떻게 변하는지 확인합니다. 예를 들면 시장의 평균 수익에 새 값을 입력할 수 있습니다.

    MKT_AVG_RETURN = 0.07
    
  10. 시뮬레이션을 다시 실행합니다.

    print (sc.parallelize([time.time() + i for i in range(10000)]) \
            .map(grow).reduce(add)/10000.)
    
  11. 실험을 마치면 CTRL+D 키를 눌러 Python 인터프리터를 종료합니다.

Scala에서 Monte Carlo 시뮬레이션 프로그래밍하기

몬테카를로는 도박의 도시로 유명합니다. 이 섹션에서는 Scala를 사용하여 확률 게임에서 카지노가 누리는 수학적 이점을 모델링한 시뮬레이션을 만듭니다. 실제 카지노에서 '승률'은 게임마다 크게 다릅니다. 예를 들어 키노에서는 승률이 20% 이상일 수 있습니다. 이 가이드에서는 하우스 어드밴티지가 1%에 불과한 간단한 게임을 만듭니다. 게임 방식은 다음과 같습니다.

  • 플레이어가 자금에서 칩을 여러 개 떼어 베팅을 합니다.
  • 플레이어가 100면으로 된 주사위를 굴립니다.
  • 굴린 결과가 1~49 사이의 숫자이면 플레이어가 이깁니다.
  • 결과가 50~100 사이이면 플레이어가 베팅에서 집니다.

이 게임에서는 플레이어의 승률이 1% 더 낮습니다. 주사위를 굴릴 때마다 100번 중에 51번의 가능성으로 플레이어가 집니다.

게임을 만들고 실행하려면 다음 단계를 따르세요.

  1. Dataproc 기본 노드에서 Scala 인터프리터를 시작합니다.

    spark-shell
    
  2. 다음 코드를 복사하고 붙여넣어 게임을 만듭니다. 들여쓰기와 관련하여 Scala는 Python과 같은 요구사항이 없으므로 이 코드를 복사하여 scala> 프롬프트에 붙여넣기만 하면 됩니다.

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. scala> 프롬프트가 표시될 때까지 return을 누릅니다.

  4. 다음 코드를 입력하여 게임을 25번 플레이합니다. 25는 NUMBER_OF_GAMES의 기본값입니다.

    playSession()
    

    자금은 10단위로 시작했습니다. 이제 더 늘어났습니까 아니면 줄어들었습니까?

  5. 이제 게임당 100개의 칩을 베팅하는 10,000명의 플레이어를 시뮬레이션합니다. 한 세션에서 게임을 10,000회 플레이합니다. 이 Monte Carlo 시뮬레이션에서는 세션이 끝나기 전에 모든 돈을 잃을 확률을 계산합니다. 다음 코드를 입력합니다.

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    구문 .reduce(_+_)는 Scala에서 합계 함수를 사용하여 집계하는 데 사용되는 축약형입니다. Python 예시에 나온 .reduce(add) 구문과 기능적으로 동일합니다.

    앞의 코드는 다음 단계를 수행합니다.

    • 세션을 플레이한 결과로 RDD를 만듭니다.
    • 파산한 플레이어의 결과를 숫자 1로 대체하고 0이 아닌 결과는 숫자 0으로 대체합니다.
    • 파산한 플레이어의 수를 합산합니다.
    • 이 숫자를 플레이어 수로 나눕니다.

    일반적인 결과는 다음과 같습니다.

    0.998
    

    카지노의 우위가 1%에 불과한데도 이 값은 모든 돈을 잃을 것이 거의 확실함을 나타냅니다.

삭제

프로젝트 삭제

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

다음 단계

  • Dataproc - 작업 제출에서 ssh를 사용하여 클러스터에 연결하지 않고 Spark 작업을 Cloud Dataproc에 제출하는 방법 알아보기