Dataproc Container for Spark サービスを実行する

GDC は Spark 用の Dataproc コンテナを提供します。これは、データ処理用の Apache Spark 環境です。Apache Spark の詳細については、https://spark.apache.org/ をご覧ください。Dataproc Container for Spark のコンテナを使用すると、GDC Kubernetes クラスタ内で新しい Spark アプリケーションまたは既存の Spark アプリケーションを最小限の変更で実行できます。Spark ツールに精通している場合は、引き続き使用できます。

YAML ファイルで Spark アプリケーションを定義すると、GDC がリソースを割り当てます。Dataproc Container for Spark コンテナは数秒で起動します。Spark エグゼキュータは、必要に応じてスケールアップまたはシャットダウンします。

GDC の Dataproc Container for Spark からコンテナを構成して、専用のハードウェア(専用のハードウェア ノードや GPU など)を使用します。

Spark アプリケーションを実行するための前提条件

Spark アプリケーションを実行する前に、プラットフォーム管理者(PA)に mkt-system Namespace の Spark Operator(mkt-spark-operator)ロールへのアクセス権を付与するよう依頼してください。

Spark 3 サンプル アプリケーションを実行する

Spark アプリケーションをコンテナ化すると、GDC を使用してオンプレミスでビッグデータ アプリケーションを簡単に実行できます。アプリケーション オペレーター(AO)として、SparkApplication カスタム リソースタイプの GKE オブジェクトで指定された Spark アプリケーションを実行します。

GDC で Apache Spark 3 アプリケーションを実行して使用するには、次の操作を行います。

  1. プロジェクトの spark-operator イメージを調べて、Spark アプリケーションで参照する $DATAPROC_IMAGE を見つけます。

    export DATAPROC_IMAGE=$(kubectl get pod --kubeconfig INFRA_CLUSTER_KUBECONFIG \
    --selector app.kubernetes.io/name=spark-operator -n mkt-system \
    -o=jsonpath='{.items[*].spec.containers[0].image}' \
    | sed 's/spark-operator/dataproc/')
    

    次に例を示します。

    export DATAPROC_IMAGE=10.200.8.2:10443/dataproc-service/private-cloud-devel/dataproc:3.1-dataproc-17
    
  2. SparkApplication 仕様を記述し、YAML ファイルに保存します。詳細については、Spark アプリケーション仕様を記述するをご覧ください。

  3. kubectl コマンドを使用して、GKE クラスタの SparkApplication 仕様で構成された Spark アプリケーションを送信、実行、モニタリングします。詳細については、アプリケーションの例をご覧ください。

  4. アプリケーションのステータスを確認します。

  5. 省略可: アプリケーション ログを確認します。詳細については、Spark 3 アプリケーションのログを表示するをご覧ください。

  6. Spark アプリケーションを使用して、ドライバとエグゼキュータのステータスを収集し、ユーザーに表示します。

Spark アプリケーション仕様を作成する

SparkApplication 仕様には次のコンポーネントが含まれます。

  • apiVersion フィールド。
  • kind フィールド。
  • metadata フィールド。
  • spec セクション。

詳細については、GitHub の SparkApplication 仕様の作成https://github.com/kubeflow/spark-operator/blob/gh-pages/docs/user-guide.md#writing-a-sparkapplication-spec)をご覧ください。

用途例

このセクションでは、Spark アプリケーションを実行するための SparkApplication 仕様の例をいくつか紹介します。

Spark Pi

このセクションでは、円にダーツを投げて 𝛑(円周率)を推定するコンピューティング負荷の高い Spark Pi アプリケーションを実行する例を示します。

次の手順に沿って Spark Pi を実行します。

  1. 組織のインフラストラクチャ クラスタに次の SparkApplication 仕様の例を適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: mkt-system
    spec:
      type: Python
      pythonVersion: "3"
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: IfNotPresent
      mainApplicationFile: "local:///usr/lib/spark/examples/src/main/python/pi.py"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: dataproc-addon-spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  2. 次のコマンドを使用して、SparkApplication 仕様の例が実行され、1 ~ 2 分で完了することを確認します。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG get SparkApplication spark-pi -n mkt-system
    
  3. ドライバログを表示して結果を確認します。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG logs spark-pi-driver -n mkt-system | grep "Pi is roughly"
    

    出力は次のようになります。

    Pi is roughly 3.1407357036785184
    

詳しくは、次のリソースをご覧ください。

  • アプリケーション コードについては、Apache Spark ドキュメントの Pi の推定https://spark.apache.org/examples.html)をご覧ください。
  • Spark Pi YAML ファイルの例については、Spark アプリケーション仕様を記述するをご覧ください。

Spark SQL

次の手順で Spark SQL を実行します。

  1. 1 値を選択する Spark SQL アプリケーションを実行するには、次のクエリを使用します。

    select 1;
    
  2. 次の SparkApplication 仕様の例を組織インフラストラクチャ クラスタに適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: pyspark-sql-arrow
      namespace: mkt-system
    spec:
      type: Python
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: IfNotPresent
      mainApplicationFile: "local:///usr/lib/spark/examples/src/main/python/sql/arrow.py"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: dataproc-addon-spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  3. 次のコマンドを使用して、SparkApplication 仕様の例が 1 分以内に実行されて完了することを確認します。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG get SparkApplication pyspark-sql-arrow -n mkt-system
    

Spark MLlib

Spark MLlib を実行する手順は次のとおりです。

  1. 次の Scala の例を使用して、統計分析を実行し、結果をコンソールに出力する Spark MLlib インスタンスを実行します。

    import org.apache.spark.ml.linalg.{Matrix, Vectors}
    import org.apache.spark.ml.stat.Correlation
    import org.apache.spark.sql.Row
    
    val data = Seq(
      Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))),
      Vectors.dense(4.0, 5.0, 0.0, 3.0),
      Vectors.dense(6.0, 7.0, 0.0, 8.0),
      Vectors.sparse(4, Seq((0, 9.0), (3, 1.0)))
    )
    
    val df = data.map(Tuple1.apply).toDF("features")
    val Row(coeff1: Matrix) = Correlation.corr(df, "features").head
    println(s"Pearson correlation matrix:\n $coeff1")
    
    val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head
    println(s"Spearman correlation matrix:\n $coeff2")
    
  2. 組織のインフラストラクチャ クラスタに次の SparkApplication 仕様の例を適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-ml
      namespace: mkt-system
    spec:
      type: Scala
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.ml.SummarizerExample
      mainApplicationFile: "local:///usr/lib/spark/examples/jars/spark-examples_2.12-3.1.3.jar"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: dataproc-addon-spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  3. 次のコマンドを使用して、SparkApplication 仕様の例が 1 分以内に実行されて完了することを確認します。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG get SparkApplication spark-ml -n mkt-system
    

SparkR

SparkR を実行する手順は次のとおりです。

  1. 次のコード例を使用して、バンドルされたデータセットを読み込み、最初の行を出力する SparkR インスタンスを実行します。

    library(SparkR)
    sparkR.session()
    df <- as.DataFrame(faithful)
    head(df)
    
  2. 次の SparkApplication 仕様の例を組織インフラストラクチャ クラスタに適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-r-dataframe
      namespace: mkt-system
    spec:
      type: R
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: Always
      mainApplicationFile: "local:///usr/lib/spark/examples/src/main/r/dataframe.R"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: dataproc-addon-spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  3. 次のコマンドを使用して、SparkApplication 仕様の例が 1 分以内に実行されて完了することを確認します。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG get SparkApplication spark-r-dataframe -n mkt-system
    

Spark 3 アプリケーションのログを表示する

Spark には、次の 2 種類のログがあり、可視化できます。

ターミナルを使用してコマンドを実行します。

ドライバログ

次の手順に沿って、Spark アプリケーションのドライバログを表示します。

  1. Spark ドライバ Pod を見つけます。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG get pods -n mkt-system
    
  2. Spark ドライバ Pod のログを開きます。

    kubectl --kubeconfig INFRA_CLUSTER_KUBECONFIG logs DRIVER_POD -n mkt-system
    

    DRIVER_POD は、前の手順で確認した Spark ドライバ Pod の名前に置き換えます。

イベントログ

イベントログは、SparkApplication 仕様の YAML ファイルで指定されたパスにあります。

次の手順に沿って、Spark アプリケーションのイベントログを表示します。

  1. SparkApplication 仕様の YAML ファイルを開きます。
  2. ファイル内で spec フィールドを探します。
  3. spec フィールドにネストされている sparkConf フィールドを見つけます。
  4. sparkConf セクションにネストされている spark.eventLog.dir フィールドの値を見つけます。
  5. パスを開いてイベントログを表示します。

SparkApplication 仕様の YAML ファイルの例については、Spark アプリケーション仕様を記述するをご覧ください。

詳しくは、アカウント マネージャーにお問い合わせください。