使用 Dataproc 和 Apache Spark 实现蒙特卡罗方法


DataprocApache Spark 提供可用于运行蒙特卡罗的基础架构和容量 使用 Java、Python 或 Scala 编写的模拟。

蒙特卡罗方法有助于解答商业、工程、科学、数学和其他领域的各种问题。蒙特卡罗模拟通过使用重复随机采样为变量创建概率分布,从而能够为一些无法用其他方法解答的问题提供答案。例如,在金融领域,为股票期权定价需要分析股票价格随时间变化的数千种方式。蒙特卡罗方法可以根据大量可能的结果模拟股票价格的这些变化,同时控制相关问题可以接受的输入域。

在过去,运行数千次模拟可能需要很长时间并且费用高昂。借助 Dataproc,您可以按需预配容量并按分钟付费。Apache Spark 允许您使用数十、数百或数千台服务器的集群,以直观的方式运行模拟,并按需扩缩。这意味着以更快的速度运行更多模拟,从而有助于您的企业加速创新并更好地管理风险。

处理金融数据时,安全性至关重要。Dataproc 在 Google Cloud 上运行,这有助于使用多种方式来确保数据的安全性和私密性。例如,所有传输中的数据和静态数据均会加密,并且 Google Cloud 符合 ISO 27001、SOC3 和 PCI 标准

目标

  • 创建托管式 Dataproc 集群(预安装 Apache Spark)。
  • 使用 Python 运行一个蒙特卡罗模拟来预估股票投资组合随时间的增长情况。
  • 使用 Scala 运行一个蒙特卡罗模拟,以模拟赌场如何盈利。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  • 设置 Google Cloud 项目
    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Make sure that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc and Compute Engine APIs.

      Enable the APIs

    5. Install the Google Cloud CLI.
    6. To initialize the gcloud CLI, run the following command:

      gcloud init
    7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    8. Make sure that billing is enabled for your Google Cloud project.

    9. Enable the Dataproc and Compute Engine APIs.

      Enable the APIs

    10. Install the Google Cloud CLI.
    11. To initialize the gcloud CLI, run the following command:

      gcloud init

创建 Dataproc 集群

按照步骤在 Google Cloud 控制台中创建 Dataproc 集群。包含两个工作器节点的默认集群设置足以满足本教程的需求。

停用针对警告的日志记录

默认情况下,Apache Spark 会在控制台窗口中显示详细的日志记录。在按本教程操作时,请将日志记录级别更改为仅记录错误。请按照以下步骤操作:

使用 ssh 连接到 Dataproc 集群的主节点

Dataproc 集群的主节点在其虚拟机名称中带有 -m 后缀。

  1. In the Google Cloud console, go to the VM instances page.

    Go to VM instances

  2. In the list of virtual machine instances, click SSH in the row of the instance that you want to connect to.

    SSH button next to instance name.

将打开与主节点连接的 SSH 窗口。

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

更改日志记录设置

  1. 在主节点的主目录中,请修改 /etc/spark/conf/log4j.properties

    sudo nano /etc/spark/conf/log4j.properties
    
  2. log4j.rootCategory 设置为 ERROR

    # Set only errors to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
  3. 保存更改并退出编辑器。如果要再次启用详细日志记录,请通过将 .rootCategory 的值恢复为原始 (INFO) 值来撤消更改。

Spark 编程语言

Spark 支持 Python、Scala 和 Java 作为独立应用的编程语言,并为 Python 和 Scala 提供交互式解释器。您可以根据个人偏好选择编程语言。本教程使用交互式解释器,因为您可以通过它们更改代码、尝试不同的输入值,然后查看结果,从而方便地开展实验。

预估投资组合增长情况

在金融领域,蒙特卡罗方法有时用于运行预测投资表现的模拟。通过在一系列可能的市场条件下生成随机的结果样本,蒙特卡罗模拟可以解答关于投资组合平均表现和最坏表现的问题。

按照以下步骤创建一个模拟,该模拟使用蒙特卡罗方法尝试基于一些常见的市场因素来预估金融投资的增长情况。

  1. 从 Dataproc 主节点启动 Python 解释器。

    pyspark
    

    等待 Spark 提示符 >>> 出现。

  2. 输入以下代码。请务必保留函数定义中的缩进格式。

    import random
    import time
    from operator import add
    
    def grow(seed):
        random.seed(seed)
        portfolio_value = INVESTMENT_INIT
        for i in range(TERM):
            growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
            portfolio_value += portfolio_value * growth + INVESTMENT_ANN
        return portfolio_value
    
  3. return 直到再次看到 Spark 提示符。

    前述代码定义了一个函数,该函数模拟当投资者拥有一个投资于股票市场的现有退休账户,并且每年向其追加资金时可能发生的情况。该函数每年在指定期限内以百分比形式生成随机的投资回报,它将种子值作为参数。此值用于重新设定随机数生成器,这可确保函数在每次运行时不会获得相同的随机数列表。random.normalvariate 函数可确保 值出现在 正态分布 计算给定均值和标准差的值。该函数通过增长量(可能是正数或负数)增加投资组合的价值,并增加表示未来投资的年度总和。

    您将在下一步中定义所需的常量。

  4. 创建许多种子以供给该函数。在 Spark 提示符处输入以下代码,即可生成 10000 个种子:

    seeds = sc.parallelize([time.time() + i for i in range(10000)])
    

    parallelize 操作的结果是生成弹性分布式数据集 (RDD),这是一个针对并行处理进行优化的元素集合。在本例中,RDD 包含基于当前系统时间的种子。

    创建 RDD 时,Spark 会根据可用的工作器和核心数对数据进行分片。在本例中,Spark 选择使用八个分片,每个核心一个分片。这一分片比例足以完成这个包含 10000 个数据项的模拟。对于较大的模拟,每个分片可能大于默认上限。在这种情况下,指定 parallelize 的第二个参数可以增加分片数量,这有助于让每个分片的大小易于管理,而 Spark 仍能充分利用所有八个核心。

  5. 将包含种子的 RDD 传递给增长函数。

    results = seeds.map(grow)
    

    map 方法将 RDD 中的每个种子传递给 grow 函数,并将每个结果附加到存储在 results 中的新 RDD。请注意,这个执行转换的操作不会立即生成结果。除非您需要结果,否则 Spark 不会执行此操作。这种“惰性求值”就是您可以输入代码而不定义常量的原因。

  6. 为该函数指定一些值。

    INVESTMENT_INIT = 100000  # starting amount
    INVESTMENT_ANN = 10000  # yearly new investment
    TERM = 30  # number of years
    MKT_AVG_RETURN = 0.11 # percentage
    MKT_STD_DEV = 0.18  # standard deviation
    
  7. 调用 reduce 以聚合 RDD 中的值。输入以下代码以对 RDD 中的结果求和:

    sum = results.reduce(add)
    
  8. 估算并显示平均回报:

    print (sum / 10000.)
    

    请务必在末尾添加点 (.) 字符。它表示浮点运算。

  9. 现在改变一个假设条件,看看结果如何变化。例如,您可以为市场的平均回报输入一个新值:

    MKT_AVG_RETURN = 0.07
    
  10. 再次运行模拟。

    print (sc.parallelize([time.time() + i for i in range(10000)]) \
            .map(grow).reduce(add)/10000.)
    
  11. 完成实验后,按 CTRL+D 退出 Python 解释器。

用 Scala 编写蒙特卡罗模拟

众所周知,蒙特卡罗其实是一个热门的赌博模拟工具。在本部分中,您将使用 Scala 创建一个模拟,模拟赌场在博彩游戏中享有的数学优势。真实赌场的“庄家优势”因赌局而异;例如,对于基诺,这个比例高于 20%。本教程创建了一个简单的赌局,其中庄家只有百分之一的优势。以下是赌局规则:

  • 玩家从投注资金中拿出多个筹码下注。
  • 玩家掷出一个 100 面的骰子(那该有多酷?)。
  • 如果掷骰的结果是 1 到 49 之间的数字,玩家赢得赌注。
  • 如果掷骰结果在 50 到 100 之间,则玩家输掉赌注。

您可以看到,这个赌局为玩家制造了百分之一的劣势,也就是说,在每次投掷的 100 个可能结果中,有 51 个会使玩家输掉赌局。

按照以下步骤创建并运行赌局:

  1. 从 Dataproc 主节点启动 Scala 解释器。

    spark-shell
    
  2. 复制并粘贴以下代码以创建赌局。在缩进方面,Scala 与 Python 要求不同,因此您只需在 scala> 提示符处复制并粘贴此代码即可。

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. return,直到看到 scala> 提示符。

  4. 输入以下代码,开 25 场赌局,这是 NUMBER_OF_GAMES 的默认值。

    playSession()
    

    开始时您的投注资金的值为 10 个单位。现在是变高了还是变低了?

  5. 现在模拟 10000 个玩家每场下注 100 个筹码。在一个会话中开 10000 场赌局。此蒙特卡罗模拟将计算在会话结束前输光所有资金的概率。输入以下代码:

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    请注意,语法 .reduce(_+_) 是 Scala 中的简写,其使用求和函数进行合并。它的功能等同于您在 Python 示例中看到的 .reduce(add) 语法。

    上述代码执行以下步骤:

    • 使用赌局会话的结果创建 RDD。
    • 将破产玩家的结果替换为数字 1,将非零结果替换为数字 0
    • 为破产玩家数量求和。
    • 将总和除以玩家数量。

    典型的结果可能为:

    0.998
    

    即使赌场只有百分之一的优势,这也几乎可以保证您会输光所有的钱。

清理

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

后续步骤

  • 如需详细了解如何向 Dataproc 提交 Spark 作业,而不必使用 ssh 连接到集群,请参阅提交作业