Google Cloud Dataproc 및 Apache Spark를 사용하는 Monte Carlo 방식

Cloud DataprocApache Spark는 자바, Python 또는 Scala로 작성된 Monte Carlo 시뮬레이션을 실행하는 데 사용할 수 있는 인프라와 용량을 제공합니다.

Monte Carlo 방법은 비즈니스, 엔지니어링, 과학, 수학, 기타 분야에서 광범위한 질문에 답하는 데 도움이 됩니다. Monte Carlo 시뮬레이션은 반복되는 무작위 샘플링을 사용하여 변수의 확률 분포를 작성함으로써 다른 방식으로는 답을 얻을 수 없는 질문에 답을 제공할 수 있습니다. 예를 들어 금융 분야에서 주식 옵션의 가격을 책정하려면 시간의 경과에 따라 주식 가격이 달라지는 수천 가지 방식을 분석해야 합니다. Monte Carlo 방법은 광범위한 예상 결과를 기반으로 이러한 주가 변동을 시뮬레이션하는 방법을 제공하는 동시에 문제의 가능한 입력값 영역을 통제할 수 있습니다.

과거에는 수천 번의 시뮬레이션을 실행하는 데 오랜 시간이 걸리고 많은 비용이 발생했습니다. 하지만 이제 Cloud Dataproc을 사용하면 필요에 따라 용량을 프로비저닝하고 분 단위로 비용을 지불할 수 있습니다. 또한 Apache Spark를 사용하면 수십, 수백 또는 수천 개의 서버 클러스터를 활용하여 직관적이고 필요에 맞게 확장 가능한 방식으로 시뮬레이션을 실행할 수 있습니다. 즉, 더 많은 시뮬레이션을 보다 신속하게 실행할 수 있으므로 비즈니스 혁신의 속도를 높이고 위험을 보다 효과적으로 관리할 수 있습니다.

재무 데이터를 다룰 때는 항상 보안이 중요합니다. Cloud Dataproc은 GCP에서 실행되므로 데이터를 안전한 비공개 상태로 유지하는 데 여러 가지 면에서 도움이 됩니다. 예를 들어 모든 데이터는 전송 중에 그리고 사용되지 않을 때도 암호화되며, Google Cloud Platform은 ISO 27001, SOC3, FINRA, PCI 규정을 준수합니다.

목표

  • 관리되는 Cloud Dataproc 클러스터를 만듭니다(Apache Spark가 사전 설치됨).
  • Python을 사용하여 시간 경과에 따라 주식 포트폴리오의 성장을 예측하는 Monte Carlo 시뮬레이션을 실행합니다.
  • Scala를 사용하여 카지노에서 돈을 버는 방법을 시뮬레이션하는 Monte Carlo 시뮬레이션을 실행합니다.

기본 요건

  • 프로젝트 설정

    1. Google 계정에 로그인합니다.

      아직 계정이 없으면 새 계정을 등록하세요.

    2. Google Cloud Platform 프로젝트를 선택하거나 만듭니다.

      리소스 관리 페이지로 이동

    3. Google Cloud Platform 프로젝트에 결제가 사용 설정되어 있는지 확인하세요.

      결제 사용 설정 방법 알아보기

    4. Cloud Dataproc, Compute Engine APIs를 사용 설정합니다.

      APIs 사용 설정

    5. Cloud SDK 설치 및 초기화.

  • 프로젝트에 Cloud Storage 버킷 생성

    1. GCP Console에서 Cloud Storage 브라우저로 이동합니다.

      Cloud Storage 브라우저로 이동

    2. 버킷 만들기를 클릭합니다.
    3. 버킷 만들기 대화상자에서 다음 속성을 지정합니다.
    4. 만들기를 클릭합니다.

Cloud Dataproc 클러스터 만들기

다음 단계에 따라 Google Cloud Platform 콘솔에서 Cloud Dataproc 클러스터를 만드세요. 이 가이드에는 2개의 작업자 노드를 포함하는 기본 클러스터 설정이면 충분합니다.

경고 로깅 사용 중지

기본적으로 Apache Spark는 콘솔 창에 자세한 로깅을 표시합니다. 이 가이드를 따를 때는 오류만 로깅하도록 로깅 수준을 변경하는 게 더 편합니다. 방법은 다음과 같습니다.

Cloud Dataproc 클러스터의 마스터 노드에 SSH로 연결

마스터 노드의 홈 디렉토리에서 브라우저 창이 열립니다.

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

로깅 설정 변경

/etc/spark/conf/log4j.properties를 편집합니다.

sudo nano /etc/spark/conf/log4j.properties

log4j.rootCategoryERROR로 설정합니다.

# Set only errors to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

변경사항을 저장하고 편집기를 종료합니다. 상세한 로깅을 다시 사용하도록 설정하려면 .rootCategory의 값을 원래의 (INFO) 값으로 복원하여 변경사항을 되돌립니다.

Spark 프로그래밍 언어

Spark는 Python, Scala, 자바를 독립 실행형 애플리케이션의 프로그래밍 언어로 지원하고, Python 및 Scala를 위한 대화형 인터프리터를 제공합니다. 언어는 개인적 선호에 따라 선택할 수 있습니다. 이 가이드에서는 대화형 인터프리터를 사용합니다. 코드를 변경하고 다른 입력 값을 시도한 다음 결과를 보는 방식으로 빠르고 쉽게 테스트할 수 있기 때문입니다.

포트폴리오 성장 추정

금융 분야에서 Monte Carlo 방법은 때때로 투자 성과가 어떻게 될지 예측해보는 시뮬레이션을 실행하는 데 사용됩니다. Monte Carlo 시뮬레이션은 가능성이 있는 일정 범위의 시장 조건에 대해 무작위로 결과 표본을 산출함으로써 평균적인 또는 최악의 시나리오에서 포트폴리오 성과가 어떻게 나올 수 있는지에 대한 답을 제공합니다.

Monte Carlo 방법을 사용하여 몇 가지 일반적인 시장 요인을 기반으로 금융 투자의 성장을 예측하는 시뮬레이션을 작성하려면 다음 단계를 따르세요.

  1. Cloud Dataproc 마스터 노드에서 Python 인터프리터를 시작합니다.

    pyspark
    

    Spark 프롬프트를 기다립니다(>>>).

  2. 다음 코드를 입력합니다. Python으로 프로그래밍할 때는 항상 함수 정의에서 들여쓰기를 유지해야 합니다.

    >>> import random
    >>> import time
    >>> from operator import add
    
    >>> def grow(seed):
            random.seed(seed)
            portfolio_value = INVESTMENT_INIT
            for i in range(TERM):
                growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
                portfolio_value += portfolio_value * growth + INVESTMENT_ANN
            return portfolio_value
    
  3. Spark 프롬프트가 다시 나타날 때까지 Return 키를 누릅니다.

    앞의 코드는 주식 시장에 투자하는 은퇴 계좌를 보유한 투자자가 이 계좌에 매년 자금을 추가하는 경우에 발생 가능한 결과를 모델링하는 함수를 정의합니다. 이 함수는 지정한 기간 동안 매년 무작위의 투자 수익을 백분율로 생성합니다. 이 함수는 시드 값을 매개변수로 사용합니다. 이 값은 난수 생성기의 시드 값을 새로 설정하는 데 사용되며 그러면 난수 생성기가 실행될 때마다 이 함수가 동일한 난수 목록을 받지 않습니다. random.normalvariate 함수는 난수가 지정한 평균 및 표준 편차의 정규 분포 범위에서 골고루 발생하도록 합니다. 이 함수는 포트폴리오의 값을 양수 또는 음수의 증감액만큼 늘리고, 추가 투자를 나타내는 연간 합계를 더합니다.

    다음 단계에서 필요한 상수를 정의합니다.

  4. 다음 단계는 함수에 대입할 많은 수의 시드를 만드는 것입니다. Spark 프롬프트에서 다음 코드를 입력하면 10,000개의 시드가 생성됩니다.

    >>> seeds = sc.parallelize([time.time() + i for i in xrange(10000)])
    

    parallelize 연산의 결과는 탄력적 분산 데이터세트(RDD)로, 이는 병렬 처리에 최적화된 요소의 집합입니다. 이 경우 RDD에는 현재 시스템 시간을 기반으로 하는 시드가 포함됩니다.

    RDD를 만들 때 Spark는 사용 가능한 작업자 및 코어의 수에 따라 데이터를 분할합니다. 이 경우 Spark는 각 코어에 하나씩 총 8개의 슬라이스를 사용하기로 선택합니다. 데이터 항목이 10,000개인 이 시뮬레이션에서는 문제가 없습니다. 더 큰 시뮬레이션의 경우 각 슬라이스가 기본 제한보다 클 수 있습니다. 이 경우 두 번째 매개 변수를 parallelize로 지정하면 슬라이스 수가 증가하여 각 슬라이스의 크기를 관리하기 쉽게 유지하는 데 도움이 되는 한편 Spark는 여전히 8개의 코어를 모두 활용할 수 있습니다.

  5. 시드가 포함된 RDD를 증가 함수에 대입합니다. 다음 코드를 입력합니다.

    >>> results = seeds.map(grow)
    

    map 메소드는 RDD의 각 시드를 grow 함수에 전달하고 각 결과를 새로운 RDD에 추가하며, 이 결과는 results에 저장됩니다. 변환을 수행하는 이 연산은 결과를 바로 생성하지 않습니다. Spark는 결과가 필요할 때까지 이 작업을 수행하지 않습니다. 이러한 지연 계산(lazy evaluation) 때문에 상수를 정의하지 않고 코드를 입력할 수 있습니다.

  6. 함수에 사용할 값을 지정합니다. 다음 코드를 입력합니다.

    >>> INVESTMENT_INIT = 100000  # starting amount
    >>> INVESTMENT_ANN = 10000  # yearly new investment
    >>> TERM = 30  # number of years
    >>> MKT_AVG_RETURN = 0.11 # percentage
    >>> MKT_STD_DEV = 0.18  # standard deviation
    
  7. reduce를 호출하여 RDD의 값을 집계합니다. 다음 코드를 입력하여 RDD의 결과를 합산합니다.

    >>>  sum = results.reduce(add)
    
  8. 다음 코드를 입력하여 평균 수익을 추정하고 표시합니다.

    >>> print sum / 10000.
    

    끝에 점(.) 문자를 포함해야 합니다. 이 점은 부동 소수점 연산을 나타냅니다.

  9. 이제 가정을 변경하고 결과가 어떻게 변하는지 확인합니다. 예를 들면 시장의 평균 수익에 새 값을 입력할 수 있습니다.

    >>> MKT_AVG_RETURN = 0.07
    
  10. 시뮬레이션을 다시 실행합니다. 다음 코드를 사용하여 모든 경우를 실행할 수 있습니다.

    >>> print sc.parallelize([time.time() + i for i in xrange(10000)]) \
                .map(grow).reduce(add)/10000.
    
  11. 실험을 마치면 CTRL+D를 눌러 Python 인터프리터를 종료합니다.

Scala에서 Monte Carlo 시뮬레이션 프로그래밍하기

몬테카를로는 도박의 도시로 유명합니다. 이 섹션에서는 Scala를 사용하여 카지노가 확률 게임에서 누리는 수학적 이점을 모델링하는 시뮬레이션을 만듭니다. 실제 카지노에서 '승률'은 게임마다 크게 다릅니다. 예를 들어 키노에서는 20%가 넘을 수 있습니다. 이 가이드에서는 하우스 어드밴티지가 1%에 불과한 간단한 게임을 만듭니다. 게임 방식은 다음과 같습니다.

  • 플레이어가 자금에서 칩을 여러 개 떼어 베팅을 합니다.
  • 플레이어가 100면으로 된 주사위를 굴립니다.
  • 결과가 1에서 49 사이의 숫자이면 플레이어가 베팅한 만큼 상금을 받습니다.
  • 결과가 50에서 100이면 플레이어가 베팅에서 집니다.

이 게임에서는 플레이어의 승률이 1% 더 낮습니다. 주사위를 굴릴 때마다 100번 중에 51번의 가능성으로 플레이어가 집니다.

게임을 만들고 실행하려면 다음 단계를 따르세요.

  1. Cloud Dataproc 마스터 노드에서 Scala 인터프리터를 시작합니다.

    spark-shell
    
  2. 다음 코드를 복사하고 붙여넣어 게임을 만듭니다. 들여쓰기와 관련하여 Scala는 Python과 같은 요구사항이 없으므로 이 코드를 복사하여 scala> 프롬프트에 붙여넣기만 하면 됩니다.

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. scala> 프롬프트가 표시될 때까지 Return 키를 누릅니다.

  4. 다음 코드를 입력하여 게임을 25번 플레이합니다. 25는 NUMBER_OF_GAMES의 기본값입니다.

    playSession()
    

    자금은 10단위로 시작했습니다. 이제 더 늘어났습니까 아니면 줄어들었습니까?

  5. 이제 게임당 100개의 칩을 베팅하는 10,000명의 플레이어를 시뮬레이션합니다. 한 세션에서 10,000번의 게임을 플레이합니다. 이 Monte Carlo 시뮬레이션은 세션이 종료되기 전에 모든 자금을 잃을 확률을 계산합니다. 다음 코드를 입력합니다.

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    구문 .reduce(_+_)는 Scala에서 합계 함수를 사용하여 집계하는 데 사용되는 축약형입니다. Python 예제에 나온 .reduce(add) 구문과 기능적으로 동일합니다.

    앞의 코드는 다음 단계를 수행합니다.

    • 세션을 플레이한 결과로 RDD를 만듭니다.
    • 파산한 플레이어의 결과를 숫자 1로 대체하고 0이 아닌 결과는 숫자 0으로 대체합니다.
    • 파산한 플레이어의 수를 합산합니다.
    • 이 숫자를 플레이어 수로 나눕니다.

일반적인 결과는 다음과 같습니다.

0.998

카지노의 우위가 1%에 불과한데도 이 값은 모든 돈을 잃을 것이 거의 확실함을 나타냅니다.

다음 단계

SSH를 사용하여 클러스터에 연결할 필요 없이 Spark 작업을 Cloud Dataproc 서비스에 제출하는 방법에 대한 정보는 Cloud Dataproc—작업 제출을 참조하세요.

  • 다른 Google Cloud Platform 기능을 직접 사용해 보세요. 가이드를 살펴보세요.
  • Google Cloud Platform 제품을 사용하여 엔드 투 엔드 솔루션을 구축하는 방법에 대해 알아보세요.
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...