Google Cloud Dataproc と Apache Spark を使用した Monte Carlo メソッド

Cloud DataprocApache Spark を組み合わせると、Java、Python、Scala でプログラミングされた Monte Carlo シミュレーションを実行するのに十分なインフラストラクチャと処理能力が得られます。

Monte Carlo メソッドは、ビジネス、エンジニアリング、科学、数学、およびその他の分野の幅広い範囲の質問に答えることができます。繰り返しのランダム サンプリングを使用して変数の確率分布を作成することで、Monte Carlo シミュレーションは、他の方法では答えることが不可能な質問に対する回答を提供することができます。たとえば金融では、株式オプションの価格設定には、時間の経過とともに変化する可能性がある何千通りもの株式の価格を分析する必要があります。Monte Carlo メソッドは、問題への可能なインプットのドメインに対する制御を維持しつつ、これらの株価の変化を、考えられる幅広い結果にわたってシミュレートする方法を提供します。

以前は、何千ものシミュレーションを実行すると、非常に長い時間がかかり、高い費用が発生する可能性がありました。Cloud Dataproc は、オンデマンドでのプロビジョニングを可能にし、分単位での支払いができます。Apache Spark を使用すると、数十から数千のサーバーのクラスタを利用して、直感的な方法でシミュレーションを実行し、ニーズに合わせて拡張することができます。これは、より多くのシミュレーションをよりすばやく実行できることを意味し、ビジネスのイノベーションを促進し、リスクをよりよく管理することにつながります。

財務データを扱う場合、セキュリティは常に重要です。Cloud Dataproc は、GCP 上で実行されるため、さまざまな方法でデータを安全、セキュアかつ非公開に保持することに役立ちます。たとえば、すべてのデータが伝送時も保存時も暗号化され、Google Cloud Platform は ISO 27001SOC3、FINRA、PCI に準拠しています。

目標

  • マネージド Cloud Dataproc クラスタを作成する(プレインストールされた Apache Spark を使用)。
  • Python を使用して、長期間にわたって株式ポートフォリオの成長を予測するための Monte Carlo シミュレーションを実行する。
  • Scala を使用して、カジノがどのように利益を得ているかをシミュレートする Monte Carlo シミュレーションを実行する。

要件

  • プロジェクトを設定する

    1. Google アカウントにログインします。

      Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

    2. GCP プロジェクトを選択または作成します。

      [リソースの管理] ページに移動

    3. プロジェクトに対して課金が有効になっていることを確認します。

      課金を有効にする方法について

    4. {% dynamic if "no_credentials" in setvar.task_params %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic if not setvar.redirect_url %}{% dynamic setvar redirect_url %}https://console.cloud.google.com{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if setvar.in_henhouse_no_auth_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic endif %}{% dynamic elif setvar.in_henhouse_service_account_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}SERVICE_ACCOUNT{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if not setvar.service_account_roles and setvar.credential_type == "SERVICE_ACCOUNT" %}{% dynamic setvar service_account_roles %}{% dynamic endsetvar %}{% dynamic endif %}{% dynamic setvar console %}{% dynamic if "no_steps" not in setvar.task_params %}
    5. {% dynamic endif %}{% dynamic if setvar.api_list %}{% dynamic if setvar.in_henhouse_no_auth_whitelist or setvar.in_henhouse_service_account_whitelist %}GCP Console プロジェクトをセットアップします。

      プロジェクトをセットアップする

      クリックして、以下を行います。

      • プロジェクトを作成または選択します。
      • プロジェクトに{% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}必要な{% dynamic endif %}{% dynamic if "," in setvar.api_list %} API{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %} を有効にします。
      • {% dynamic if setvar.credential_type == 'SERVICE_ACCOUNT' %}
      • サービス アカウントを作成します。
      • JSON として秘密鍵をダウンロードします。
      • {% dynamic endif %}

      これらのリソースは、GCP Console でいつでも表示および管理できます。

      {% dynamic else %}{% dynamic if "no_text" not in setvar.task_params %}{% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}必要な{% dynamic endif %}{% dynamic if "," in setvar.api_list %} API{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %} を有効にします。 {% dynamic endif %}

      {% dynamic if "," in setvar.api_list %}API{% dynamic else %}API{% dynamic endif %}を有効にする

      {% dynamic endif %}{% dynamic endif %}{% dynamic if "no_steps" not in setvar.task_params %}
    6. {% dynamic endif %}{% dynamic endsetvar %}{% dynamic print setvar.console %}
    7. Cloud SDK をインストールして初期化します。

  • Cloud Storage バケットをプロジェクト内に作成する

    1. GCP Console で、Cloud Storage ブラウザに移動します。

      Cloud Storage ブラウザに移動

    2. [バケットを作成] をクリックします。
    3. [バケットを作成] ダイアログ内で、以下の属性を指定します。
    4. [作成] をクリックします。

Cloud Dataproc クラスタの作成

手順に従って、Google Cloud Platform Console から Cloud Dataproc クラスタを作成します。このチュートリアルには、2 つのワーカーノードを含むデフォルトのクラスタ設定で十分です。

警告のロギングを無効化

デフォルトでは、Apache Spark はコンソール ウィンドウに詳細ロギングを出力します。エラーのみを記録するようにロギングレベルを変更する場合は、このチュートリアルに従うと簡単にできます。手順は次のとおりです。

Cloud Dataproc クラスタのマスターノードに SSH を挿入する

  1. GCP Console の [VM インスタンス] ページに移動します。

    [VM インスタンス] ページに移動

  2. 仮想マシン インスタンスのリストで、接続するインスタンスの行の SSH をクリックします。

マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。

Connected, host fingerprint: ssh-rsa 2048 ...
...
user@clusterName-m:~$

ロギングの設定を変更する

/etc/spark/conf/log4j.properties を編集します。

sudo nano /etc/spark/conf/log4j.properties

log4j.rootCategory=ERROR に設定します。

# Set only errors to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

変更を保存し、エディタを終了します。再度詳細ロギングを有効にする場合は、.rootCategory の値を元の値(INFO)に復元することで、変更を元に戻します。

Spark プログラミング言語

Spark は、スタンドアロン アプリケーション向けのプログラミング言語として Python、Scala、Java をサポートし、Python と Scala 用のインタラクティブなインタープリタを提供します。どの言語を選択するかは、個人の好みの問題です。このチュートリアルでは、コードを変更し、別の入力値を試して、結果を表示して、すばやく簡単に実験するため、インタラクティブなインタープリタを使用しています。

ポートフォリオの成長を予測する

金融では、Monte Carlo メソッドは、投資の先行きを予測するシミュレーションを実行するために使用されることがあります。Monte Carlo シミュレーションは、さまざまな有望な市場条件にわたって結果のランダムなサンプルを生成することで、ポートフォリオが平均的なシナリオまたは最悪のケースのシナリオでどのように推移するかという質問に答えることができます。

Monte Carlo メソッドを使用して、いくつかの一般的な市場要因に基づいて金融投資の成長を予測するシミュレーションを作成するには、次の手順を実行します。

  1. Cloud Dataproc マスターノードから Python インタープリタを起動します。

    pyspark
    

    Spark のプロンプト(>>>)を待ちます。

  2. 次のコードを入力します。Python でプログラミングする場合は常に、関数定義内のインデントを維持するように注意します。

    >>> import random
    >>> import time
    >>> from operator import add
    
    >>> def grow(seed):
            random.seed(seed)
            portfolio_value = INVESTMENT_INIT
            for i in range(TERM):
                growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
                portfolio_value += portfolio_value * growth + INVESTMENT_ANN
            return portfolio_value
    
  3. Spark のプロンプトが再度表示されるまで Return キーを押します。

    上記のコードは、投資家が株式市場に投資されている既存の退職金口座を持っていて、毎年この口座にお金を追加すると、どのようになるかをモデル化する関数を定義しています。この関数は、指定した期間内で毎年のランダムな ROI をパーセンテージで生成します。関数は、パラメータとしてシード値をとります。この値は、関数が実行するたびに同じ乱数のリストを取得しないようにする、乱数生成ツールを再シードするために使用されます。random.normalvariate 関数は、指定した平均と標準偏差の正規分布全体でランダムな値を発生させます。この関数は、成長量(正または負)によるポートフォリオの値を増やし、追加投資を表す毎年の合計額を加算します。

    この後の手順で、必要な定数を定義します。

  4. 次の手順では、関数にフィードするための多くのシードを作成します。Spark プロンプトで次のコードを入力し、10,000 のシードを生成します。

    >>> seeds = sc.parallelize([time.time() + i for i in xrange(10000)])
    

    parallelize オペレーションの結果は、並列処理に最適化された要素の集合である Resilient Distributed ataset(RDD)です。この場合、RDD には、現在のシステム時刻に基づいたシードが含まれています。

    RDD を作成する際に、Spark は利用可能なワーカーとコアの数に基づいてデータをスライスします。このケースでは、Spark は各コアに 1 つずつ、合計 8 つのスライスを使用することを選択しています。データ項目 10,000 のこのシミュレーションにはこれで十分です。さらに大きなシミュレーションでは、各スライスがデフォルトの制限値よりも大きくなる場合があります。その場合、2 番目のパラメータに parallelize を指定することで、スライスの数を増やすことができます。こうすることで、Spark が 8 つのすべてのコアを活用しながら、各スライスを管理しやすいサイズに維持することができます。

  5. シードが含まれている RDD を成長関数にフィードします。次のコードを入力します。

    >>> results = seeds.map(grow)
    

    map メソッドは、RDD 内の各シードを grow 関数に渡し、それぞれの結果を results に格納されている新しい RDD に付加します。変換を実行するこの操作では、その結果がすぐに生成されないことに注意してください。Spark は結果が必要になるまでこの作業を行いません。定数を定義せずにコードを入力できるのは、この遅延評価のおかげです。

  6. 使用する関数の値をいくつか指定します。次のコードを入力します。

    >>> INVESTMENT_INIT = 100000  # starting amount
    >>> INVESTMENT_ANN = 10000  # yearly new investment
    >>> TERM = 30  # number of years
    >>> MKT_AVG_RETURN = 0.11 # percentage
    >>> MKT_STD_DEV = 0.18  # standard deviation
    
  7. reduce を呼び出して RDD 内の値を集計します。RDD 内の結果を合計するには、次のコードを入力します。

    >>>  sum = results.reduce(add)
    
  8. 平均利益を見積もって表示するには、次のコードを入力します。

    >>> print sum / 10000.
    

    末尾にピリオド(.)文字を必ず含めてください。これは浮動小数点演算を意味します。

  9. 次に、前提を変更し、結果がどのように変化するかを確認します。たとえば、市場の平均利益に新しい値を入力してみます。

    >>> MKT_AVG_RETURN = 0.07
    
  10. 再度シミュレーションを実行します。次のコードですべてを実行することができます。

    >>> print sc.parallelize([time.time() + i for i in xrange(10000)]) \
                .map(grow).reduce(add)/10000.
    
  11. 実験が終わったら、Ctrl+D キーを押して Python インタープリタを終了します。

Scala での Monte Carlo シミュレーションのプログラミング

Monte Carlo(モンテカルロ)といえば、ギャンブルで有名ですが、このセクションでは、Scala を使用して、運が左右するゲームでカジノがもうける数学的な優位性をモデル化するシミュレーションを作成します。実際のカジノでの「ハウスエッジ(控除率)」は、ゲームによって大きく異なり、たとえばキノでは、20% を超えることもあります。このチュートリアルでは、ハウス(胴元)のアドバンテージが 1% しかない簡単なゲームを作成します。ゲームは次のようになります。

  • プレイヤーは手持ちの資金から複数のチップを使って賭けをします。
  • プレイヤーは 100 面さいころ(本物だったらすごいですね)を転がします。
  • 回転の結果が 1~49 の数字の場合、プレイヤーは掛け金と同額をもらえます。
  • 結果が 50~100 の場合、プレイヤーは掛け金を失います。

さいころを転がすたびにプレイヤーが負ける確率は 100 分の 51 となるため、このゲームはプレイヤーにとって 1% 不利になります。

次の手順でゲームを作成して実行します。

  1. Cloud Dataproc マスターノードから、Scala インタープリタを起動します。

    spark-shell
    
  2. 次のコードをコピーして貼り付けて、ゲームを作成します。インデントに関しては、Scala には Python のような要件はないため、このコードをコピーして scala> プロンプトに単純に貼り付けることができます。

    val STARTING_FUND = 10
    val STAKE = 1   // the amount of the bet
    val NUMBER_OF_GAMES = 25
    
    def rollDie: Int = {
        val r = scala.util.Random
        r.nextInt(99) + 1
    }
    
    def playGame(stake: Int): (Int) = {
        val faceValue = rollDie
        if (faceValue < 50)
            (2*stake)
        else
            (0)
    }
    
    // Function to play the game multiple times
    // Returns the final fund amount
    def playSession(
       startingFund: Int = STARTING_FUND,
       stake: Int = STAKE,
       numberOfGames: Int = NUMBER_OF_GAMES):
       (Int) = {
    
        // Initialize values
        var (currentFund, currentStake, currentGame) = (startingFund, 0, 1)
    
        // Keep playing until number of games is reached or funds run out
        while (currentGame <= numberOfGames && currentFund > 0) {
    
            // Set the current bet and deduct it from the fund
            currentStake = math.min(stake, currentFund)
            currentFund -= currentStake
    
            // Play the game
            val (winnings) = playGame(currentStake)
    
            // Add any winnings
            currentFund += winnings
    
            // Increment the loop counter
            currentGame += 1
        }
        (currentFund)
    }
    
  3. scala> プロンプトが表示されるまで Return キーを押します。

  4. 次のコードを入力して、ゲームを 25 回プレイします。これは NUMBER_OF_GAMES のデフォルト値です。

    playSession()
    

    手持ち資金は 10 単位の値から開始しました。この時点で、手持ち資金は増えていますか?それとも少なくなっていますか?

  5. それでは、10,000 人のプレーヤーがゲームごとに 100 チップ賭けるシミュレーションを行いましょう。1 つのセッションで 10,000 回ゲームをプレイします。この Monte Carlo シミュレーションは、セッションが終了する前に手持ち資金をすべて失う確率を計算します。次のコードを入力します。

    (sc.parallelize(1 to 10000, 500)
      .map(i => playSession(100000, 100, 250000))
      .map(i => if (i == 0) 1 else 0)
      .reduce(_+_)/10000.0)
    

    構文 .reduce(_+_) は、合計関数を使用して集計するための Scala での省略形です。これは、Python のサンプルで紹介した、構文 .reduce(add) と機能的に同じです。

    上記のコードは、次の手順を実行します。

    • セッションのプレイの結果を使用して RDD を作成します。
    • 破産したプレーヤーの結果を数値 1 に置き換え、ゼロ以外の結果を数値 0 に置き換えます。
    • 破産したプレーヤーの数を合計します。
    • 合計をプレイヤーの数で割ります。

典型的な結果は次のようになります。

0.998

これは、カジノ側に 1% のアドバンテージしかなくても、ほぼ間違いなくプレイヤーがすべてのお金を失うことを表しています。

次のステップ

SSH を使用してクラスタに接続することなく、Cloud Dataproc サービスに Spark ジョブを送信する方法については、Cloud Dataproc - ジョブの送信をご覧ください。

  • Google Cloud Platform の他の機能を試すには、チュートリアルをご覧ください。
  • Google Cloud Platform プロダクトを使用してエンドツーエンドのソリューションを作成する方法を学習します。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...