採用 Dataproc 和 Apache Spark 的蒙地卡羅方法


DataprocApache Spark 的基礎架構與容量可讓您執行以 Java、Python 或 Scala 編寫的 Monte Carlo 模擬。

Monte Carlo 方法有助於回答商業、工程、科學、數學和其他領域的許多問題。Monte Carlo 模擬透過重複的隨機取樣,為變數建立機率分佈,來回答其他方法無法解決的問題。例如在金融領域,如要為股票選擇權訂出價格,將需要分析股票價格隨著時間可能變化的數千種方式。Monte Carlo 方法提供了一種方法,針對多種可能的結果模擬股票價格的變化,同時針對問題的可能輸入領域保持控制權。

過去執行數千次模擬需要很長的時間,產生高昂的費用。Dataproc 可讓您根據需求佈建容量,並以分鐘為單位計費。Apache Spark 則可讓您使用由數十部、數百部甚至是數千部伺服器組成的叢集,以直覺的方式執行模擬,並能根據您的需求進行擴充。這表示您可以更快地執行更多模擬,讓您的企業加快創新的腳步,並且更妥善地管理風險。

在處理財務資料時,安全性始終是重要的考量。Dataproc 在 Google Cloud上執行,可透過數種方法協助您保障資料的安全性和隱私性。舉例來說,所有資料在傳輸過程中,以及在靜態存放時都會經過加密, Google Cloud 也符合 ISO 27001、SOC3 和 PCI 標準

目標

  • 建立代管 Dataproc 叢集 (需預先安裝 Apache Spark)。
  • 使用 Python 執行 Monte Carlo 模擬,預估股票組合隨著時間的成長情形。
  • 使用 Scala 執行 Monte Carlo 模擬,模擬賭場的賺錢之道。

費用

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

事前準備

  • 設定 Google Cloud 專案
    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Make sure that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc and Compute Engine APIs.

      Enable the APIs

    5. Install the Google Cloud CLI.

    6. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    7. To initialize the gcloud CLI, run the following command:

      gcloud init
    8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    9. Make sure that billing is enabled for your Google Cloud project.

    10. Enable the Dataproc and Compute Engine APIs.

      Enable the APIs

    11. Install the Google Cloud CLI.

    12. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    13. To initialize the gcloud CLI, run the following command:

      gcloud init

建立 Dataproc 叢集

請按照相關步驟,從 Google Cloud 控制台建立 Dataproc 叢集。預設的叢集設定包含兩個工作站節點,能滿足本教學課程的需求。

停用警告的記錄功能

根據預設,Apache Spark 會在主控台視窗中輸出詳細記錄。基於本教學課程的目的,請將記錄層級變更為僅記錄錯誤。步驟如下:

使用 ssh 連線至 Dataproc 叢集的主要節點

Dataproc 叢集的主要節點 VM 名稱會加上 -m 尾碼。

  1. In the Google Cloud console, go to the VM instances page.

    Go to VM instances

  2. In the list of virtual machine instances, click SSH in the row of the instance that you want to connect to.

    SSH button next to instance name.

系統會開啟 SSH 視窗,並連線至主要節點。

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

變更記錄設定

  1. 從主要節點的主目錄編輯 /etc/spark/conf/log4j.properties

    sudo nano /etc/spark/conf/log4j.properties
    
  2. log4j.rootCategory 設為等於 ERROR

    # Set only errors to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
  3. 儲存變更並結束編輯器。如果您要再次啟用詳細記錄,只需將 .rootCategory 的值還原成原來的 (INFO) 值,即可取消變更。

Spark 程式設計語言

Spark 在獨立應用程式支援 Python、Scala 和 Java 程式設計語言,並為 Python 和 Scala 提供互動式直譯器。您可以依據個人喜好選擇語言。本教學課程使用互動式直譯器,因為您可以透過變更程式碼、嘗試不同的輸入值,然後查看結果的方式進行實驗。

預估投資組合的成長情形

在金融領域,Monte Carlo 方法有時會用來執行模擬,試著預測投資績效。針對一系列可能的市場條件,Monte Carlo 模擬會產生隨機的結果樣本,來回答「投資組合在一般或最壞情況下的預期績效」的問題。

請按照以下步驟建立一個使用 Monte Carlo 方法的模擬,試著根據一些常見的市場因素來預估金融投資的成長情形。

  1. 從 Dataproc 主要節點啟動 Python 直譯器。

    pyspark
    

    等待 Spark 提示 >>>

  2. 輸入以下程式碼。請務必保持函式定義中的縮排。

    import random
    import time
    from operator import add
    
    def grow(seed):
        random.seed(seed)
        portfolio_value = INVESTMENT_INIT
        for i in range(TERM):
            growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
            portfolio_value += portfolio_value * growth + INVESTMENT_ANN
        return portfolio_value
    
  3. 按下 return 鍵,直到您再次看到 Spark 提示。

    上述程式碼定義了一個函式,針對當投資者有一個用來投資股市的退休帳戶,且每年都會投入額外資金到這個帳戶時,可能發生的情形建立模型。該函式會產生隨機的投資報酬率,在指定期間內每年以百分比表示。該函式會將一個初始值當做參數,用來重新植入隨機號碼產生器,以確保該函式不會在每次執行時取得相同的隨機號碼清單。random.normalvariate 函式可確保在指定平均值和標準差的常態分佈中出現隨機值。該函式會根據成長金額 (可能是正數或負數) 來增加投資組合的值,並加入代表進一步投資的年度總和。

    您將在接下來的步驟中定義所需的常數。

  4. 建立許多初始值以動態饋給該函式。當 Spark 提示時,輸入以下程式碼。此程式碼會產生 10,000 個初始值:

    seeds = sc.parallelize([time.time() + i for i in range(10000)])
    

    parallelize 作業的結果是一個彈性分散式資料集 (RDD)。這是最適合平行處理的元素集合。在這個範例中,彈性分散式資料集 (RDD) 包含以目前的系統時間為基礎的初始值。

    建立 RDD 時,Spark 會根據可用的工作站和核心數量分割資料。在這個範例中,Spark 選擇使用八個配量,每個核心一個配量。這對這個包含 10,000 個資料項目的模擬來說沒有什麼問題,但對較大的模擬來說,可能會使每個配量大於預設的限制。在這種情形下,指定用來進行 parallelize 的第二個參數可以增加配量的數量,能讓每個配量的大小在可管理的範圍之內,同時讓 Spark 利用所有核心。

  5. 將包含初始值的彈性分散式資料集 (RDD) 動態饋給成長函式。

    results = seeds.map(grow)
    

    map 方法會將彈性分散式資料集 (RDD) 中的每個初始值傳送至 grow 函式,並將每個結果附加至儲存在 results 中的新彈性分散式資料集 (RDD)。請注意,這項執行「轉換」的作業,不會立即產生作業結果。Spark 只會在您需要結果時,才會執行這項工作。這種「惰性求值」是您可以輸入程式碼,而無需定義常數的原因。

  6. 為函式指定一些值。

    INVESTMENT_INIT = 100000  # starting amount
    INVESTMENT_ANN = 10000  # yearly new investment
    TERM = 30  # number of years
    MKT_AVG_RETURN = 0.11 # percentage
    MKT_STD_DEV = 0.18  # standard deviation
    
  7. 呼叫 reduce 以匯總 RDD 中的值。請輸入以下程式碼來加總 RDD 中的結果:

    sum = results.reduce(add)
    
  8. 預估並顯示平均報酬:

    print (sum / 10000.)
    

    請務必在結尾處加上點 (.) 字元,以表示浮點運算。

  9. 改變一項假設,看看結果會如何變化。例如,您可以針對市場的平均報酬輸入新的值:

    MKT_AVG_RETURN = 0.07
    
  10. 再次執行模擬。

    print (sc.parallelize([time.time() + i for i in range(10000)]) \
            .map(grow).reduce(add)/10000.)
    
  11. 實驗結束後,請按下 CTRL+D 鍵以結束 Python 直譯器。

使用 Scala 進行 Monte Carlo 模擬的程式設計

Monte Carlo 以賭場而聞名。在本節中,您將使用 Scala 來建立模擬,針對賭場在博弈遊戲中享有的數學優勢建立模型。真實賭場的「賭場優勢」會因遊戲而有極大的差異,例如在基諾 (keno) 遊戲中,賭場優勢可能會超過 20%。本教學課程會建立一個簡單的遊戲,其中賭場僅擁有 1% 的優勢。以下是這個簡單的遊戲的規則:

  • 玩家下注,包括來自資金的一些籌碼。
  • 玩家擲出一顆有 100 個面的骰子 (很酷吧?)。
  • 如果擲骰的結果是一個 1 到 49 的數字,玩家就贏了。
  • 如果結果是 50 到 100,則玩家會輸掉賭注。

您可以看到這個遊戲讓玩家有百分之一的劣勢:在每次擲骰子的 100 個可能結果中,有 51 個結果是玩家輸。

請按照以下步驟建立並執行這個遊戲:

  1. 從 Dataproc 主要節點啟動 Scala 直譯器。

    spark-shell
    
  2. 複製並貼上以下程式碼以建立遊戲。Scala 不需要和 Python 一樣注意縮排,因此您只需在 scala> 提示複製貼上此程式碼即可。

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. 按下 return 鍵,直到畫面上顯示 scala> 提示。

  4. 輸入以下程式碼,該遊戲將玩 25 次 (這是 NUMBER_OF_GAMES 的預設值)。

    playSession()
    

    開始時,您的資金值為 10 個單位。現在更高還是更低了?

  5. 現在模擬有 10,000 個玩家,每個玩家每場下注 100 個籌碼。在一個工作階段中玩 10,000 次遊戲。這個 Monte Carlo 模擬會計算您在工作階段結束之前,輸掉所有錢的機率。請輸入以下程式碼:

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    請注意,.reduce(_+_) 語法是 Scala 中使用加總函式進行匯總的簡寫,它的功能等於您在 Python 範例中看到的 .reduce(add) 語法。

    上述程式碼會執行下列步驟:

    • 使用遊戲工作階段的結果建立 RDD。
    • 用數字 1 取代破產玩家的結果,用數字 0 取代非零結果。
    • 加總破產玩家的人數。
    • 將該數字除以所有玩家的人數。

    典型的結果可能是:

    0.998
    

    這代表即使賭場只有百分之一的優勢,您還是會輸掉所有的錢。

清除所用資源

刪除專案

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

後續步驟

  • 如要進一步瞭解如何將 Spark 工作提交至 Dataproc,而不必使用 ssh 連線至叢集,請參閱「Dataproc - 提交工作