Dataproc と Apache Spark を使用した Monte Carlo メソッド


DataprocApache Spark を組み合わせると、Java、Python、Scala でプログラミングされた Monte Carlo シミュレーションを実行するために使用できる、インフラストラクチャと容量を得ることができます。

Monte Carlo メソッドは、ビジネス、エンジニアリング、科学、数学、およびその他の分野の幅広い範囲の質問に答えることができます。繰り返しのランダム サンプリングを使用して変数の確率分布を作成することで、Monte Carlo シミュレーションは、他の方法では答えることが不可能な質問にも回答を提供できます。たとえば金融では、株式オプションの価格設定には、時間の経過とともに変化する可能性がある何千通りもの株式の価格を分析する必要があります。Monte Carlo メソッドは、問題への可能なインプットのドメインに対する制御を維持しつつ、これらの株価の変化を、考えられる幅広い結果にわたってシミュレートする方法を提供します。

以前は、何千ものシミュレーションを実行すると、非常に長い時間がかかり、高い費用が発生する可能性がありました。Dataproc では、オンデマンドで容量をプロビジョニングでき、分単位で支払うことできます。Apache Spark を使用すると、数十から数千のサーバーのクラスタを使用して、直感的で、かつニーズに合わせてスケーラブルな方法でシミュレーションを実行できます。これにより、より多くのシミュレーションをよりすばやく実行できるため、ビジネスのイノベーションを促進し、リスクをより適切に管理できます。

財務データを扱う場合、セキュリティは常に重要です。Dataproc は Google Cloud 上で動作するため、いくつかの方法でデータの安全性、セキュリティ、プライバシーを維持するのに役立ちます。たとえば、すべてのデータは伝送中および保存時に暗号化され、Google Cloud は ISO 27001、SOC3、および PCI に準拠しています。

目標

  • マネージド Dataproc クラスタを作成する(プレインストールされた Apache Spark を使用)。
  • Python を使用して、長期間にわたって株式ポートフォリオの成長を予測するための Monte Carlo シミュレーションを実行する。
  • Scala を使用して、カジノがどのように利益を得ているかをシミュレートする Monte Carlo シミュレーションを実行する。

料金

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  • Google Cloud プロジェクトの設定
    1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
    2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    3. Google Cloud プロジェクトで課金が有効になっていることを確認します

    4. Dataproc and Compute Engine API を有効にします。

      API を有効にする

    5. Google Cloud CLI をインストールします。
    6. gcloud CLI を初期化するには:

      gcloud init
    7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    8. Google Cloud プロジェクトで課金が有効になっていることを確認します

    9. Dataproc and Compute Engine API を有効にします。

      API を有効にする

    10. Google Cloud CLI をインストールします。
    11. gcloud CLI を初期化するには:

      gcloud init

Dataproc クラスタの作成

手順に沿って、Google Cloud Console から Dataproc クラスタを作成します。このチュートリアルでは、デフォルトのクラスタ設定(2 つのワーカーノードを含む)で十分です。

警告のロギングの無効化

デフォルトでは、Apache Spark はコンソール ウィンドウに詳細ロギングを出力します。このチュートリアルでは、ロギングレベルを変更して、エラーのみがロギングされるようにします。手順は次のとおりです。

ssh を使用して Dataproc クラスタのプライマリ ノードに接続する

Dataproc クラスタのプライマリ ノードには、VM 名に -m 接尾辞が付いています。

  1. Google Cloud コンソールで、[VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  2. 仮想マシン インスタンスのリストで、接続するインスタンスの行にある [SSH] をクリックします。

    インスタンス名の横にある SSH ボタン。

プライマリ ノードに接続された SSH ウィンドウが開きます。

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

ロギングの設定を変更する

  1. プライマリ ノードのホーム ディレクトリから、/etc/spark/conf/log4j.properties を編集します。

    sudo nano /etc/spark/conf/log4j.properties
    
  2. log4j.rootCategoryERROR に等しくなるように設定します。

    # Set only errors to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
  3. 変更を保存し、エディタを終了します。詳細ログをもう一度有効にするには、.rootCategory の値を元の値(INFO)に復元することで、変更を元に戻します。

Spark プログラミング言語

Spark は、スタンドアロン アプリケーション向けのプログラミング言語として Python、Scala、Java をサポートし、Python と Scala 用のインタラクティブなインタープリタを提供します。どの言語を選択するかは、個人の好みの問題です。このチュートリアルでは、コードを変更し、別の入力値を試して、結果を表示できるため、インタラクティブなインタープリタを使用しています。

ポートフォリオの成長を予測する

金融では、Monte Carlo メソッドは、投資の先行きを予測するシミュレーションを実行するために使用されることがあります。Monte Carlo シミュレーションは、さまざまな有望な市場条件にわたって結果のランダムなサンプルを生成することで、ポートフォリオが平均的なシナリオまたは最悪のケースのシナリオでどのように推移するかという質問に答えることができます。

Monte Carlo メソッドを使用して、いくつかの一般的な市場要因に基づいて金融投資の成長を予測するシミュレーションを作成するには、次の手順を実行します。

  1. Dataproc プライマリ ノードから、Python インタープリタを起動します。

    pyspark
    

    Spark のプロンプト >>> が表示されるまで待ちます。

  2. 次のコードを入力します。関数定義内のインデントを維持するように注意します。

    import random
    import time
    from operator import add
    
    def grow(seed):
        random.seed(seed)
        portfolio_value = INVESTMENT_INIT
        for i in range(TERM):
            growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
            portfolio_value += portfolio_value * growth + INVESTMENT_ANN
        return portfolio_value
    
  3. Spark のプロンプトが再度表示されるまで、return を押します。

    上記のコードは、投資家が株式市場に投資されている既存の退職金口座を持っていて、毎年この口座にお金を追加すると、どのようになるかをモデル化する関数を定義しています。この関数は、指定した期間内で毎年のランダムな ROI をパーセンテージで生成します。関数は、パラメータとしてシード値をとります。この値は、関数が実行されるたびに同じ乱数のリストを取得しないようにする、乱数生成ツールを再シードするために使用されます。random.normalvariate 関数は、指定した平均と標準偏差の正規分布全体でランダムな値を発生させます。この関数は、成長量(正または負)によるポートフォリオの値を増やし、追加投資を表す毎年の合計額を加算します。

    この後の手順で、必要な定数を定義します。

  4. 関数にフィードするための多くのシードを作成します。Spark プロンプトで次のコードを入力し、10,000 のシードを生成します。

    seeds = sc.parallelize([time.time() + i for i in range(10000)])
    

    parallelize オペレーションの結果は耐障害性分散データセット(RDD)となり、これは、並列処理に最適化された要素の集合です。この場合、RDD には、現在のシステム時刻に基づいたシードが含まれています。

    RDD を作成する際に、Spark は利用可能なワーカーとコアの数に基づいてデータをスライスします。このケースでは、Spark は各コアに 1 つずつ、合計 8 つのスライスを使用することを選択しています。データ項目 10,000 のこのシミュレーションにはこれで十分です。さらに大きなシミュレーションでは、各スライスがデフォルトの制限値よりも大きくなる場合があります。その場合、2 番目のパラメータに parallelize を指定することで、スライスの数を増やすことができます。こうすることで、Spark が 8 つのすべてのコアを活用しながら、各スライスを管理しやすいサイズに維持できます。

  5. シードが含まれている RDD を成長関数にフィードします。

    results = seeds.map(grow)
    

    map メソッドは、RDD 内の各シードを grow 関数に渡し、それぞれの結果を results に格納されている新しい RDD に付加します。変換を実行するこの操作では、その結果がすぐに生成されないことに注意してください。Spark は結果が必要になるまでこの作業を行いません。定数を定義せずにコードを入力できるのは、この遅延評価のおかげです。

  6. 関数にいくつかの値を指定します。

    INVESTMENT_INIT = 100000  # starting amount
    INVESTMENT_ANN = 10000  # yearly new investment
    TERM = 30  # number of years
    MKT_AVG_RETURN = 0.11 # percentage
    MKT_STD_DEV = 0.18  # standard deviation
    
  7. reduce を呼び出して、RDD 内の値を集計します。RDD 内の結果を合計するには、次のコードを入力します。

    sum = results.reduce(add)
    
  8. 平均利益を見積もって表示します。

    print (sum / 10000.)
    

    末尾にピリオド(.)文字を必ず含めてください。これは浮動小数点演算を意味します。

  9. 次に、前提を変更し、結果がどのように変化するかを確認します。たとえば、市場の平均利益に新しい値を入力してみます。

    MKT_AVG_RETURN = 0.07
    
  10. 再度シミュレーションを実行します。

    print (sc.parallelize([time.time() + i for i in range(10000)]) \
            .map(grow).reduce(add)/10000.)
    
  11. テストが完了したら、CTRL+D を押して Python インタープリタを終了します。

Scala での Monte Carlo シミュレーションのプログラミング

Monte Carlo(モンテカルロ)といえば、ギャンブルで有名ですが、このセクションでは、Scala を使用して、カジノがゲームで享受している数学的な優位性をモデル化したシミュレーションを作成します。実際のカジノでの「ハウスエッジ(控除率)」は、ゲームによって大きく異なり、たとえばキノでは、20% を超えることもあります。このチュートリアルでは、ハウス(胴元)のアドバンテージが 1% しかない簡単なゲームを作成します。ゲームは次のようになります。

  • プレーヤーは手持ちの資金から複数のチップを使って賭けをします。
  • プレーヤーは 100 面さいころ(実物はすごいでしょうね)を転がします。
  • 出た目が 1~49 の場合、プレーヤーの勝ちです。
  • 結果が 50~100 の場合、プレーヤーは掛け金を失います。

さいころを転がすたびにプレイヤーが負ける確率は 100 分の 51 となるため、このゲームはプレイヤーにとって 1% 不利になります。

次の手順でゲームを作成して実行します。

  1. Dataproc プライマリ ノードから、Scala インタープリタを起動します。

    spark-shell
    
  2. 次のコードをコピーして貼り付けて、ゲームを作成します。インデントに関して Scala には Python と同じ要件はないため、このコードを scala> プロンプトで単にコピーして貼り付けることができます。

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. scala> プロンプトが表示されるまで return を押します。

  4. 次のコードを入力して、ゲームを 25 回プレイします。これは NUMBER_OF_GAMES のデフォルト値です。

    playSession()
    

    手持ち資金は 10 単位の値から開始しました。この時点で、手持ち資金は増えていますか?それとも少なくなっていますか?

  5. それでは、10,000 人のプレーヤーがゲームごとに 100 チップ賭けるシミュレーションを行いましょう。1 つのセッションで 10,000 回ゲームをプレイします。この Monte Carlo シミュレーションは、セッションが終了する前に手持ち資金をすべて失う確率を計算します。次のコードを入力します。

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    構文 .reduce(_+_) は、合計関数を使用して集計するための Scala での省略形です。これは、Python のサンプルで紹介した、構文 .reduce(add) と機能的に同じです。

    上記のコードは、次の手順を実行します。

    • セッションのプレイの結果を使用して RDD を作成します。
    • 破産したプレーヤーの結果を数値 1 に置き換え、ゼロ以外の結果を数値 0 に置き換えます。
    • 破産したプレーヤーの数を合計します。
    • 合計をプレーヤーの数で割ります。

    典型的な結果は次のようになります。

    0.998
    

    これは、カジノ側に 1% のアドバンテージしかなくても、ほぼ間違いなくプレイヤーがすべてのお金を失うことを表しています。

クリーンアップ

プロジェクトの削除

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ

  • ssh を使用してクラスタに接続する必要なく Dataproc に Spark ジョブを送信する方法の詳細については、Dataproc のジョブを送信をご覧ください。