使用 Cloud Dataproc 和 Apache Spark 的 Monte Carlo 方法

Cloud DataprocApache Spark 的基礎架構與容量可讓您執行以 Java、Python 或 Scala 編寫的 Monte Carlo 模擬。

Monte Carlo 方法有助於回答商業、工程、科學、數學和其他領域的許多問題。Monte Carlo 模擬透過重複的隨機取樣,為變數建立機率分佈,來回答其他方法無法解決的問題。例如在金融領域,如要為股票選擇權訂出價格,將需要分析股票價格隨著時間可能變化的數千種方式。Monte Carlo 方法提供了一種方法,針對多種可能的結果模擬股票價格的變化,同時針對問題的可能輸入領域保持控制權。

過去執行數千次模擬需要很長的時間,產生高昂的費用。Cloud Dataproc 可讓您根據需求佈建容量,並以分鐘為單位計費。Apache Spark 則可讓您使用由數十部、數百部甚至是數千部伺服器組成的叢集,以直覺的方式執行模擬,並能根據您的需求進行擴充。這表示您可以更快地執行更多模擬,讓您的企業加快創新的腳步,並且更妥善地管理風險。

在處理財務資料時,安全性始終是相當重要的一環。Cloud Dataproc 在 Google Cloud Platform (GCP) 上執行,可透過數種方法協助您保障資料的安全性和隱私性。舉例來說,所有資料在傳輸過程中,以及在靜態存放時都已進行加密,且 GCP 符合 ISO 27001、SOC3 和 PCI 的標準

目標

  • 建立代管 Cloud Dataproc 叢集 (需預先安裝 Apache Spark)。
  • 使用 Python 執行 Monte Carlo 模擬,預估股票組合隨著時間的成長情形。
  • 使用 Scala 執行 Monte Carlo 模擬,模擬賭場的賺錢之道。

費用

本教學課程使用下列 Google Cloud Platform 計費元件:

您可以使用 Pricing Calculator,根據您的預測使用量來產生預估費用。 初次使用 GCP 的使用者可能符合申請免費試用的資格。

完成此教學課程後,您可刪除已建立的資源以免繼續計費。詳情請參閱清除所用資源一節。

事前準備

建立 Cloud Dataproc 叢集

請按照相關步驟,從 Google Cloud Platform 主控台建立 Cloud Dataproc 叢集。預設的叢集設定包含兩個工作站節點,能滿足本教學課程的需求。

停用警告的記錄功能

根據預設,Apache Spark 會在主控台視窗中輸出詳細記錄。基於本教學課程的目的,請將記錄層級變更為僅記錄錯誤。步驟如下:

使用 ssh 連線至 Cloud Dataproc 叢集的主要節點

  1. 在 GCP 主控台中,前往「VM Instances」(VM 執行個體) 頁面。

    前往「VM Instances」(VM 執行個體) 頁面

  2. 在虛擬機器執行個體清單中,找到您要建立連線的執行個體,然後在該列中按一下 [SSH]

瀏覽器視窗會開啟主要節點上的主目錄。

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

變更記錄設定

  1. 從主要節點的主目錄,編輯 /etc/spark/conf/log4j.properties

    sudo nano /etc/spark/conf/log4j.properties
    
  2. log4j.rootCategory 設為「等於 ERROR」。

    # Set only errors to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
  3. 儲存變更並結束編輯器。如果您要再次啟用詳細記錄,只需將 .rootCategory 的值還原成原來的 (INFO) 值,即可取消變更。

Spark 程式設計語言

Spark 在獨立應用程式支援 Python、Scala 和 Java 程式設計語言,並為 Python 和 Scala 提供互動式直譯器。您可以依據個人喜好選擇語言。本教學課程使用互動式直譯器,因為您可以透過變更程式碼、嘗試不同的輸入值,然後查看結果的方式進行實驗。

預估投資組合的成長情形

在金融領域,Monte Carlo 方法有時會用來執行模擬,試著預測投資績效。針對一系列可能的市場條件,Monte Carlo 模擬會產生隨機的結果樣本,來回答「投資組合在一般或最壞情況下的預期績效」的問題。

請按照以下步驟建立一個使用 Monte Carlo 方法的模擬,試著根據一些常見的市場因素來預估金融投資的成長情形。

  1. 從 Cloud Dataproc 主要節點啟動 Python 直譯器。

    pyspark
    

    等待 Spark 提示 >>>

  2. 輸入以下程式碼。請務必保持函式定義中的縮排。

    import random
    import time
    from operator import add
    
    def grow(seed):
        random.seed(seed)
        portfolio_value = INVESTMENT_INIT
        for i in range(TERM):
            growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
            portfolio_value += portfolio_value * growth + INVESTMENT_ANN
        return portfolio_value
    
  3. 按下 return 鍵,直到您再次看到 Spark 提示。

    上述程式碼定義了一個函式,針對當投資者有一個用來投資股市的退休帳戶,且每年都會投入額外資金到這個帳戶時,可能發生的情形建立模型。該函式會產生隨機的投資報酬率,在指定期間內每年以百分比表示。該函式會將一個初始值當做參數,用來重新植入隨機號碼產生器,以確保該函式不會在每次執行時取得相同的隨機號碼清單。random.normalvariate 函式可確保在指定平均值和標準差的常態分佈中出現隨機值。該函式會根據成長金額 (可能是正數或負數) 來增加投資組合的值,並加入代表進一步投資的年度總和。

    您將在接下來的步驟中定義所需的常數。

  4. 建立許多初始值以動態饋給該函式。當 Spark 提示時,輸入以下程式碼。此程式碼會產生 10,000 個初始值:

    seeds = sc.parallelize([time.time() + i for i in xrange(10000)])
    

    parallelize 作業的結果是一個彈性分散式資料集 (RDD)。這是最適合平行處理的元素集合。在這個範例中,彈性分散式資料集 (RDD) 包含以目前的系統時間為基礎的初始值。

    建立 RDD 時,Spark 會根據可用的工作站和核心數量分割資料。在這個範例中,Spark 選擇使用八個配量,每個核心一個配量。這對這個包含 10,000 個資料項目的模擬來說沒有什麼問題,但對較大的模擬來說,可能會使每個配量大於預設的限制。在這種情形下,指定用來進行 parallelize 的第二個參數可以增加配量的數量,能讓每個配量的大小在可管理的範圍之內,同時讓 Spark 利用所有核心。

  5. 將包含初始值的彈性分散式資料集 (RDD) 動態饋給成長函式。

    results = seeds.map(grow)
    

    map 方法會將彈性分散式資料集 (RDD) 中的每個初始值傳送至 grow 函式,並將每個結果附加至儲存在 results 中的新彈性分散式資料集 (RDD)。請注意,這項執行「轉換」的作業,不會立即產生作業結果。Spark 只會在您需要結果時,才會執行這項工作。這種「惰性求值」是您可以輸入程式碼,而無需定義常數的原因。

  6. 為函式指定一些值。

    INVESTMENT_INIT = 100000  # starting amount
    INVESTMENT_ANN = 10000  # yearly new investment
    TERM = 30  # number of years
    MKT_AVG_RETURN = 0.11 # percentage
    MKT_STD_DEV = 0.18  # standard deviation
    
  7. 呼叫 reduce 以匯總彈性分散式資料集 (RDD) 中的值。請輸入以下程式碼來加總彈性分散式資料集 (RDD) 中的結果:

    sum = results.reduce(add)
    
  8. 預估並顯示平均報酬:

    print sum / 10000.
    

    請務必在結尾處加上點 (.) 字元,以表示浮點運算。

  9. 改變一項假設,看看結果會如何變化。例如,您可以針對市場的平均報酬輸入新的值:

    MKT_AVG_RETURN = 0.07
    
  10. 再次執行模擬。

    print sc.parallelize([time.time() + i for i in xrange(10000)]) \
            .map(grow).reduce(add)/10000.
    
  11. 實驗結束後,請按下 CTRL+D 鍵以結束 Python 直譯器。

使用 Scala 進行 Monte Carlo 模擬的程式設計

Monte Carlo 以賭場而聞名。在本節中,您將使用 Scala 來建立模擬,針對賭場在博弈遊戲中享有的數學優勢建立模型。真實賭場的「賭場優勢」會因遊戲而有極大的差異,例如在基諾 (keno) 遊戲中,賭場優勢可能會超過 20%。本教學課程會建立一個簡單的遊戲,其中賭場僅擁有 1% 的優勢。以下是這個簡單的遊戲的規則:

  • 玩家下注,包括來自資金的一些籌碼。
  • 玩家擲出一顆有 100 個面的骰子 (很酷吧?)。
  • 如果擲骰的結果是一個 1 到 49 的數字,玩家就贏了。
  • 如果結果是 50 到 100,則玩家會輸掉賭注。

您可以看到這個遊戲讓玩家有百分之一的劣勢:在每次擲骰子的 100 個可能結果中,有 51 個結果是玩家輸。

請按照以下步驟建立並執行這個遊戲:

  1. 從 Cloud Dataproc 主要節點啟動 Scala 直譯器。

    spark-shell
    
  2. 複製並貼上以下程式碼以建立遊戲。Scala 不需要和 Python 一樣注意縮排,因此您只需在 scala> 提示複製貼上此程式碼即可。

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. 按下 return 鍵,直到您看到 scala> 提示。

  4. 輸入以下程式碼,該遊戲將玩 25 次 (這是 NUMBER_OF_GAMES 的預設值)。

    playSession()
    

    開始時,您的資金值為 10 個單位。現在更高還是更低了?

  5. 現在模擬有 10,000 個玩家,每個玩家每場下注 100 個籌碼。在一個工作階段中玩 10,000 次遊戲。這個 Monte Carlo 模擬會計算您在工作階段結束之前,輸掉所有錢的機率。請輸入以下程式碼:

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    請注意,.reduce(_+_) 語法是 Scala 中使用加總函式進行匯總的簡寫,它的功能等於您在 Python 範例中看到的 .reduce(add) 語法。

    上述程式碼會執行下列步驟:

    • 使用遊戲工作階段的結果建立 RDD。
    • 用數字 1 取代破產玩家的結果,用數字 0 取代非零結果。
    • 加總破產玩家的人數。
    • 將該數字除以所有玩家的人數。

    典型的結果可能是:

    0.998
    

    這代表即使賭場只有百分之一的優勢,您還是會輸掉所有的錢。

清除所用資源

如何避免系統向您的 Google Cloud Platform 帳戶收取在本教學課程中使用資源的相關費用:

刪除專案

  1. 前往 GCP 主控台的「Projects」(專案) 頁面。

    前往專案頁面

  2. 在專案清單中選取要刪除的專案,然後按一下 [Delete] (刪除)
  3. 在對話方塊中輸入專案 ID,按一下 [Shut down] (關閉) 即可刪除專案。

後續步驟

  • 如要進一步瞭解如何將 Spark 工作提交至 Cloud Dataproc,而不必使用 ssh 連線至叢集,請參閱 Cloud Dataproc—提交工作的說明

  • 自行試用其他 Google Cloud Platform 功能。請參考我們的教學課程

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
解決方案