使用 Cloud Dataproc 和 Apache Spark 实现蒙特卡罗方法

Cloud DataprocApache Spark 为您提供了可用于运行以 Java、Python 或 Scala 编写的蒙特卡罗模拟的基础架构和容量。

蒙特卡罗方法有助于解答商业、工程、科学、数学和其他领域的各种问题。蒙特卡罗模拟通过使用重复随机采样为变量创建概率分布,从而能够为一些无法用其他方法解答的问题提供答案。例如,在金融领域,为股票期权定价需要分析股票价格随时间变化的数千种方式。蒙特卡罗方法可以根据大量可能的结果模拟股票价格的这些变化,同时控制相关问题可以接受的输入域。

在过去,运行数千次模拟可能需要很长时间并且费用高昂。借助 Cloud Dataproc,您可以按需预配容量并按分钟付费。Apache Spark 允许您使用数十、数百或数千台服务器的集群,以直观的方式运行模拟,并按需扩缩。这意味着您能够以更快的速度运行更多模拟,从而帮助您的企业加速创新并更好地管理风险。

处理金融数据时,安全性至关重要。Cloud Dataproc 在 Google Cloud Platform (GCP) 上运行,能够以多种方式帮助您确保数据的安全性、可靠性和私密性。例如,所有传输中的数据和静态数据均会加密,并且 GCP 符合 ISO 27001、SOC3 和 PCI 标准

目标

  • 创建托管式 Cloud Dataproc 集群(预安装 Apache Spark)。
  • 使用 Python 运行一个蒙特卡罗模拟,以预估股票投资组合随时间的增长情况。
  • 使用 Scala 运行一个蒙特卡罗模拟,以模拟赌场如何盈利。

费用

本教程使用 Google Cloud Platform 的以下收费组件:

您可以使用价格计算器根据您的预计使用情况来估算费用。 GCP 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

创建 Cloud Dataproc 集群

按照步骤从 Google Cloud Platform Console 创建 Cloud Dataproc 集群。包含两个工作器节点的默认集群设置足以满足本教程的需求。

停用针对警告的日志记录

默认情况下,Apache Spark 会在控制台窗口中显示详细的日志记录。在按本教程操作时,请将日志记录级别更改为仅记录错误。请执行以下操作:

使用 ssh 连接到 Cloud Dataproc 集群的主节点

  1. 在 GCP Console 中,转到“虚拟机实例”页面。

    转到“虚拟机实例”页面

  2. 在虚拟机实例列表中,点击要连接到的实例行中的 SSH

此时会打开一个浏览器窗口并显示主节点上的主目录。

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

更改日志记录设置

  1. 在主节点的主目录中,请修改 /etc/spark/conf/log4j.properties

    sudo nano /etc/spark/conf/log4j.properties
    
  2. log4j.rootCategory 设置为 ERROR

    # Set only errors to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
  3. 保存更改并退出编辑器。如果要再次启用详细日志记录,请通过将 .rootCategory 的值恢复为其原始 (INFO) 值来撤消更改。

Spark 编程语言

Spark 支持 Python、Scala 和 Java 作为独立应用的编程语言,并为 Python 和 Scala 提供交互式解释器。您可以根据个人偏好选择编程语言。本教程使用交互式解释器,因为您可以通过它们更改代码、尝试不同的输入值,然后查看结果,从而方便地开展实验。

预估投资组合增长情况

在金融领域,蒙特卡罗方法有时用于运行预测投资表现的模拟。通过在一系列可能的市场条件下生成随机的结果样本,蒙特卡罗模拟可以解答关于投资组合平均表现和最坏表现的问题。

按照以下步骤创建一个模拟,该模拟使用蒙特卡罗方法尝试基于一些常见的市场因素来预估金融投资的增长情况。

  1. 从 Cloud Dataproc 主节点启动 Python 解释器。

    pyspark
    

    等待 Spark 提示符 >>> 出现。

  2. 输入以下代码。请务必保留函数定义中的缩进格式。

    import random
    import time
    from operator import add
    
    def grow(seed):
        random.seed(seed)
        portfolio_value = INVESTMENT_INIT
        for i in range(TERM):
            growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
            portfolio_value += portfolio_value * growth + INVESTMENT_ANN
        return portfolio_value
    
  3. return 直到再次看到 Spark 提示符。

    前述代码定义了一个函数,该函数模拟当投资者拥有一个投资于股票市场的现有退休账户,并且每年向其追加资金时可能发生的情况。该函数每年在指定期限内以百分比形式生成随机的投资回报,它将种子值作为参数。此值用于重新设定随机数生成器,这可确保函数在每次运行时不会获得相同的随机数列表。random.normalvariate 函数确保在指定均值和标准差的正态分布中出现随机值。该函数通过增长量(可能是正数或负数)增加投资组合的价值,并增加表示未来投资的年度总和。

    您将在下一步中定义所需的常量。

  4. 创建许多种子以供给该函数。在 Spark 提示符处输入以下代码,即可生成 10000 个种子:

    seeds = sc.parallelize([time.time() + i for i in xrange(10000)])
    

    parallelize 操作的结果是生成弹性分布式数据集 (RDD),这是一个针对并行处理进行优化的元素集合。在本例中,RDD 包含基于当前系统时间的种子。

    创建 RDD 时,Spark 会根据可用的工作器和核心数对数据进行分片。在本例中,Spark 选择使用八个分片,每个核心一个分片。这一分片比例足以完成这个包含 10000 个数据项的模拟。对于较大的模拟,每个分片可能大于默认上限。在这种情况下,指定 parallelize 的第二个参数可以增加分片数量,这有助于让每个分片的大小易于管理,而 Spark 仍能充分利用所有八个核心。

  5. 将包含种子的 RDD 传递给增长函数。

    results = seeds.map(grow)
    

    map 方法将 RDD 中的每个种子传递给 grow 函数,并将每个结果附加到存储在 results 中的新 RDD。请注意,这个执行转换的操作不会立即生成结果。除非您需要结果,否则 Spark 不会执行此操作。这种“惰性求值”就是您可以输入代码而不定义常量的原因。

  6. 为该函数指定一些值。

    INVESTMENT_INIT = 100000  # starting amount
    INVESTMENT_ANN = 10000  # yearly new investment
    TERM = 30  # number of years
    MKT_AVG_RETURN = 0.11 # percentage
    MKT_STD_DEV = 0.18  # standard deviation
    
  7. 调用 reduce 以聚合 RDD 中的值。输入以下代码以对 RDD 中的结果求和:

    sum = results.reduce(add)
    
  8. 估算并显示平均回报:

    print sum / 10000.
    

    请务必在末尾包含点 (.) 字符。它表示浮点运算。

  9. 现在改变一个假设条件,看看结果如何变化。例如,您可以为市场的平均回报输入一个新值:

    MKT_AVG_RETURN = 0.07
    
  10. 再次运行模拟。

    print sc.parallelize([time.time() + i for i in xrange(10000)]) \
            .map(grow).reduce(add)/10000.
    
  11. 完成实验后,按 CTRL+D 退出 Python 解释器。

用 Scala 编写蒙特卡罗模拟

众所周知,蒙特卡罗其实是一个热门的赌博模拟工具。在本部分中,您将使用 Scala 创建一个模拟,模拟赌场在博彩游戏中享有的数学优势。真实赌场的“庄家优势”因赌局而异;例如,对于基诺,这个比例高于 20%。本教程创建了一个简单的赌局,其中庄家只有百分之一的优势。以下是赌局规则:

  • 玩家从投注资金中拿出多个筹码下注。
  • 玩家掷出一个 100 面的骰子(那该有多酷?)。
  • 如果掷骰的结果是 1 到 49 之间的数字,玩家赢得赌注。
  • 如果掷骰结果在 50 到 100 之间,则玩家输掉赌注。

您可以看到,这个赌局为玩家制造了百分之一的劣势,也就是说,在每次投掷的 100 个可能结果中,有 51 个会使玩家输掉赌局。

按照以下步骤创建并运行赌局:

  1. 从 Cloud Dataproc 主节点启动 Scala 解释器。

    spark-shell
    
  2. 复制并粘贴以下代码以创建赌局。在缩进方面,Scala 与 Python 要求不同,因此您只需在 scala> 提示符处复制并粘贴此代码即可。

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. return,直到看到 scala> 提示符。

  4. 输入以下代码,开 25 场赌局,这是 NUMBER_OF_GAMES 的默认值。

    playSession()
    

    开始时您的投注资金的值为 10 个单位。现在是变高了还是变低了?

  5. 现在模拟 10000 个玩家每场下注 100 个筹码。在一个会话中开 10000 场赌局。此蒙特卡罗模拟将计算在会话结束前输光所有资金的概率。输入以下代码:

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    请注意,语法 .reduce(_+_) 是 Scala 中的简写,其使用求和函数进行合并。它的功能等同于您在 Python 示例中看到的 .reduce(add) 语法。

    上述代码执行以下步骤:

    • 使用赌局会话的结果创建 RDD。
    • 将破产玩家的结果替换为数字 1,将非零结果替换为数字 0
    • 为破产玩家数量求和。
    • 将总和除以玩家数量。

    典型的结果可能为:

    0.998
    

    即使赌场只有百分之一的优势,这也几乎可以保证您会输光所有的钱。

清理

为避免因本教程中使用的资源而导致您的 Google Cloud Platform 帐号产生费用,请执行以下操作:

删除项目

  1. 在 GCP Console 中,转到“项目”页面。

    转到“项目”页面

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤

  • 如需详细了解如何向 Cloud Dataproc 提交 Spark 作业,而不必使用 ssh 连接到集群,请参阅 Cloud Dataproc - 提交作业

  • 亲自试用其他 Google Cloud Platform 功能。查阅我们的教程

此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
Solutions