Spark 用 Dataproc コンテナ

Google Distributed Cloud(GDC)エアギャップは、Spark 用の Dataproc コンテナを提供します。これは、データ処理用の Apache Spark 環境です。Apache Spark の詳細については、https://spark.apache.org/ をご覧ください。Dataproc Container for Spark のコンテナを使用して、Distributed Cloud Kubernetes クラスタ内で新しい Spark アプリケーションまたは既存の Spark アプリケーションを最小限の変更で実行します。Spark ツールに精通している場合は、引き続き使用できます。

YAML ファイルで Spark アプリケーションを定義すると、Distributed Cloud がリソースを割り当てます。Dataproc Container for Spark コンテナは数秒で起動します。Spark エグゼキュータは、必要に応じてスケールアップまたはシャットダウンします。

分散クラウド上の Spark 用 Dataproc コンテナから、特殊なハードウェア(特殊なハードウェア ノードや GPU など)を使用するようにコンテナを構成します。

Dataproc Container for Spark サービスをデプロイする

サービスを使用するには、プラットフォーム管理者(PA)が Marketplace サービスをインストールする必要があります。Spark 用 Dataproc コンテナが必要な場合は、PA にお問い合わせください。詳細については、GDC Marketplace ソフトウェア パッケージをインストールするをご覧ください。

Spark アプリケーションを実行するための前提条件

Dataproc Container for Spark サービスを使用するには、ユーザー クラスタにサービス アカウントが必要です。Dataproc Container for Spark は、Spark アプリケーションを実行する Spark ドライバ Pod を作成します。Spark ドライバ Pod には、次の操作を行う権限を持つ Pod の Namespace 内の Kubernetes サービス アカウントが必要です。

  • エグゼキュータ Pod の作成、取得、一覧表示、削除を行います。
  • ドライバ用の Kubernetes ヘッドレス サービスを作成します。

Spark アプリケーションを実行する前に、次の手順を完了して、foo Namespace に上記の権限を持つサービス アカウントがあることを確認します。

  1. foo Namespace で使用する Spark ドライバ Pod のサービス アカウントを作成します。

    kubectl create serviceaccount spark --kubeconfig AO_USER_KUBECONFIG --namespace=foo
    
  2. 実行プログラム Pod の作成、取得、一覧表示、削除の権限を付与するロールを作成し、foo Namespace のドライバ用に Kubernetes ヘッドレス Service を作成します。

    kubectl create role spark-driver --kubeconfig AO_USER_KUBECONFIG --verb=* \
    --resource=pods,services,configmaps,persistentvolumeclaims \
    --namespace=foo
    
  3. foo 名前空間でサービス アカウントのロール アクセス権を付与するロール バインディングを作成します。

    kubectl create --kubeconfig AO_USER_KUBECONFIG \
    rolebinding spark-spark-driver \
    --role=spark-driver --serviceaccount=foo:spark \
    --namespace=foo
    

Spark 3 サンプル アプリケーションを実行する

Spark アプリケーションをコンテナ化すると、Distributed Cloud を使用してオンプレミスでビッグデータ アプリケーションを実行するのが簡単になります。アプリケーション オペレーター(AO)として、SparkApplication カスタム リソースタイプの GKE オブジェクトで指定された Spark アプリケーションを実行します。

Distributed Cloud で Apache Spark 3 アプリケーションを実行して使用する手順は次のとおりです。

  1. プロジェクトの spark-operator イメージを調べて、Spark アプリケーションで参照する $DATAPROC_IMAGE を見つけます。

    export DATAPROC_IMAGE=$(kubectl get pod --kubeconfig AO_USER_KUBECONFIG \
    --selector app.kubernetes.io/name=spark-operator -n foo \
    -o=jsonpath='{.items[*].spec.containers[0].image}' \
    | sed 's/spark-operator/dataproc/')
    

    次に例を示します。

    export DATAPROC_IMAGE=10.200.8.2:10443/dataproc-service/private-cloud-devel/dataproc:3.1-dataproc-17
    
  2. SparkApplication 仕様を記述し、YAML ファイルに保存します。詳細については、Spark アプリケーション仕様を記述するをご覧ください。

  3. kubectl コマンドを使用して、GKE クラスタの SparkApplication 仕様で構成された Spark アプリケーションを送信、実行、モニタリングします。詳細については、アプリケーションの例をご覧ください。

  4. アプリケーションのステータスを確認します。

  5. 省略可: アプリケーション ログを確認します。詳細については、Spark 3 アプリケーションのログを表示するをご覧ください。

  6. Spark アプリケーションを使用して、ドライバとエグゼキュータのステータスを収集し、ユーザーに表示します。

Spark アプリケーション仕様を作成する

SparkApplication 仕様には次のコンポーネントが含まれます。

  • apiVersion フィールド。
  • kind フィールド。
  • metadata フィールド。
  • spec セクション。

詳細については、GitHub の SparkApplication 仕様の作成https://github.com/kubeflow/spark-operator/blob/gh-pages/docs/user-guide.md#writing-a-sparkapplication-spec)をご覧ください。

用途例

このセクションでは、Spark アプリケーションを実行するための SparkApplication 仕様の例をいくつか紹介します。

Spark Pi

このセクションでは、円にダーツを投げて 𝛑(円周率)を推定するコンピューティング負荷の高い Spark Pi アプリケーションを実行する例を示します。

次の手順に沿って Spark Pi を実行します。

  1. ユーザー クラスタに次の SparkApplication 仕様の例を適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: foo
    spec:
      type: Python
      pythonVersion: "3"
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: IfNotPresent
      mainApplicationFile: "local:///usr/lib/spark/examples/src/main/python/pi.py"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  2. 次のコマンドを使用して、SparkApplication 仕様の例が実行され、1 ~ 2 分で完了することを確認します。

    kubectl --kubeconfig AO_USER_KUBECONFIG get SparkApplication spark-pi -n foo
    
  3. ドライバログを表示して結果を確認します。

    kubectl --kubeconfig AO_USER_KUBECONFIG logs spark-pi-driver -n foo | grep "Pi is roughly"
    

    出力は次のようになります。

    Pi is roughly 3.1407357036785184
    

詳しくは、次のリソースをご覧ください。

  • アプリケーション コードについては、Apache Spark ドキュメントの Pi の推定https://spark.apache.org/examples.html)をご覧ください。
  • Spark Pi YAML ファイルの例については、Spark アプリケーション仕様を記述するをご覧ください。

Spark SQL

次の手順で Spark SQL を実行します。

  1. 1 値を選択する Spark SQL アプリケーションを実行するには、次のクエリを使用します。

    select 1;
    
  2. 次の SparkApplication 仕様の例をユーザー クラスタに適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: pyspark-sql-arrow
      namespace: foo
    spec:
      type: Python
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: IfNotPresent
      mainApplicationFile: "local:///usr/lib/spark/examples/src/main/python/sql/arrow.py"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  3. 次のコマンドを使用して、SparkApplication 仕様の例が 1 分以内に実行されて完了することを確認します。

    kubectl --kubeconfig AO_USER_KUBECONFIG get SparkApplication pyspark-sql-arrow -n foo
    

Spark MLlib

Spark MLlib を実行する手順は次のとおりです。

  1. 次の Scala の例を使用して、統計分析を実行し、結果をコンソールに出力する Spark MLlib インスタンスを実行します。

    import org.apache.spark.ml.linalg.{Matrix, Vectors}
    import org.apache.spark.ml.stat.Correlation
    import org.apache.spark.sql.Row
    
    val data = Seq(
      Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))),
      Vectors.dense(4.0, 5.0, 0.0, 3.0),
      Vectors.dense(6.0, 7.0, 0.0, 8.0),
      Vectors.sparse(4, Seq((0, 9.0), (3, 1.0)))
    )
    
    val df = data.map(Tuple1.apply).toDF("features")
    val Row(coeff1: Matrix) = Correlation.corr(df, "features").head
    println(s"Pearson correlation matrix:\n $coeff1")
    
    val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head
    println(s"Spearman correlation matrix:\n $coeff2")
    
  2. ユーザー クラスタに次の SparkApplication 仕様の例を適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-ml
      namespace: foo
    spec:
      type: Scala
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.ml.SummarizerExample
      mainApplicationFile: "local:///usr/lib/spark/examples/jars/spark-examples_2.12-3.1.3.jar"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  3. 次のコマンドを使用して、SparkApplication 仕様の例が 1 分以内に実行されて完了することを確認します。

    kubectl --kubeconfig AO_USER_KUBECONFIG get SparkApplication spark-ml -n foo
    

SparkR

SparkR を実行する手順は次のとおりです。

  1. 次のコード例を使用して、バンドルされたデータセットを読み込み、最初の行を出力する SparkR インスタンスを実行します。

    library(SparkR)
    sparkR.session()
    df <- as.DataFrame(faithful)
    head(df)
    
  2. 次の SparkApplication 仕様の例をユーザー クラスタに適用します。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-r-dataframe
      namespace: foo
    spec:
      type: R
      mode: cluster
      image: "${DATAPROC_IMAGE?}"
      imagePullPolicy: Always
      mainApplicationFile: "local:///usr/lib/spark/examples/src/main/r/dataframe.R"
      sparkVersion: "3.1.3"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        coreLimit: "1000m"
        memory: "512m"
        serviceAccount: spark
      executor:
        cores: 1
        instances: 1
        memory: "512m"
    
  3. 次のコマンドを使用して、SparkApplication 仕様の例が 1 分以内に実行されて完了することを確認します。

    kubectl --kubeconfig AO_USER_KUBECONFIG get SparkApplication spark-r-dataframe -n foo
    

Spark 3 アプリケーションのログを表示する

Spark には、次の 2 種類のログがあり、可視化できます。

ターミナルを使用してコマンドを実行します。

ドライバログ

次の手順に沿って、Spark アプリケーションのドライバログを表示します。

  1. Spark ドライバ Pod を見つけます。

    kubectl -n spark get pods
    
  2. Spark ドライバ Pod のログを開きます。

    kubectl -n spark logs DRIVER_POD
    

    DRIVER_POD は、前の手順で確認した Spark ドライバ Pod の名前に置き換えます。

イベントログ

イベントログは、SparkApplication 仕様の YAML ファイルで指定されたパスにあります。

次の手順に沿って、Spark アプリケーションのイベントログを表示します。

  1. SparkApplication 仕様の YAML ファイルを開きます。
  2. ファイル内で spec フィールドを探します。
  3. spec フィールドにネストされている sparkConf フィールドを見つけます。
  4. sparkConf セクションにネストされている spark.eventLog.dir フィールドの値を見つけます。
  5. パスを開いてイベントログを表示します。

SparkApplication 仕様の YAML ファイルの例については、Spark アプリケーション仕様を記述するをご覧ください。

詳しくは、アカウント マネージャーにお問い合わせください。