Dataproc on Google Kubernetes Engine で Spark ジョブを実行する

始める前に

  1. クラスタで Workload Identity が有効になっている自動パイロットではない標準的なGoogle Kubernetes Engine(GKE)ゾーンクラスタまたはリージョン クラスタを作成済みである必要があります。

Dataproc-on-GKE 仮想クラスタを作成する

Dataproc on GKE 仮想クラスタは、Dataproc コンポーネントのデプロイ プラットフォームとして作成されます。これは仮想リソースであり、Dataproc on Compute Engine クラスタとは異なり、個別の Dataproc マスター VM とワーカー VM が含まれません。

  • Dataproc on GKE では、Dataproc on GKE 仮想クラスタを作成するときに、GKE クラスタ内にノードプールが作成されます。

  • Dataproc on GKE ジョブは、こうしたノードプールで Pod として実行されます。ノードプールとノードプール上の Pod のスケジューリングは、GKE が管理します。

  • 複数の仮想クラスタを作成します。GKE クラスタで複数の仮想クラスタを作成して実行し、仮想クラスタ間でノードプールを共有することで、リソース使用率を改善できます。

    • 各仮想クラスタ:
      • Spark エンジンのバージョンやワークロード ID など、別々のプロパティで作成されます
      • GKE クラスタの個別の GKE 名前空間内で分離されている

Console

  1. Google Cloud コンソールで、Dataproc の [クラスタ] ページに移動します。

    クラスタに移動

  2. [作成] をクリックします。

  3. [Dataproc クラスタの作成] ダイアログで、[Cluster on GKE] 行の [作成] をクリックします。

  4. [クラスタの設定] パネルで次の操作を行います。

    1. [クラスタ名] フィールドに、クラスタの名前を入力します。
    2. [リージョン] リストで、Dataproc on GKE 仮想クラスタのリージョンを選択します。このリージョンは、既存の GKE クラスタが配置されている同じリージョン(次の項目で選択)に存在する必要があります。
    3. [Kubernetes クラスタ] フィールドで、[参照] をクリックして、既存の GKE クラスタが配置されているリージョンを選択します。
    4. 省略可: [Cloud Storage ステージング バケット] フィールドで [参照] をクリックして、既存の Cloud Storage バケットを選択できます。Dataproc on GKE は、バケット内のアーティファクトをステージングします。Dataproc on GKE でステージング バケットを作成するには、このフィールドを省略します。
  5. 左側のパネルで [ノードプールの構成] をクリックし、[ノードプール] パネルで [プールを追加] をクリックします。

    1. GKE のノードプールで既存の Dataproc を再利用するには:
      1. [既存のノードプールを再利用] をクリックします。
      2. 既存のノードプールの名前を入力して、[ロール] を選択します。少なくとも 1 つのノードプールで DEFAULT のロールを持つ必要があります。
      3. [完了] をクリックします。
    2. 新しい Dataproc on GKE ノードプールを作成するには:
      1. [新しいノードプールを作成] をクリックします。
      2. 次のノードプール値を入力します。
    3. [プールを追加] をクリックしてノードプールをさらに追加します。すべてのノードプールにはロケーションが必要です。合計 4 つのノードプールを追加できます。
  6. (オプション)アクティブおよび削除された Dataproc on GKE クラスタで Spark ジョブ履歴を表示するために使用する Dataproc Persistent History Server(PHS)を設定した場合は、[クラスタのカスタマイズ] をクリックします。次に、[History サーバー クラスタ] フィールドで、PHS クラスタを参照して選択します。PHS クラスタは、Dataproc on GKE 仮想クラスタと同じリージョンに配置する必要があります。

  7. [作成] をクリックして Dataproc クラスタを作成します。Dataproc on GKE クラスタが、[クラスタ] ページのリストに表示されます。クラスタが使用できるようになるまでのステータスは [プロビジョニング] で、その後 [実行中] に変わります。

gcloud

環境変数を設定し、ローカルまたは Cloud Shell で gcloud dataproc clusters gke create コマンドを実行して GKE クラスタ上に Dataproc を作成します。

  1. 環境変数を設定します。

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    メモ:

    • DP_CLUSTER: Dataproc の仮想クラスタ名は、小文字で始まり、54 文字以下の小文字、数字、ハイフンで構成される必要があります。末尾をハイフンにはできません。
    • REGION: region は、GKE クラスタが配置されているリージョンと同じである必要があります。
    • GKE_CLUSTER: 既存の GKE クラスタの名前。
    • BUCKET: (省略可)Cloud Storage バケットの名前を指定できます。Dataproc は、この名前を使用してアーティファクトをステージングします。バケットを指定しない場合は、Dataproc on GKE がステージング バケットを作成します。
    • DP_POOLNAME: GKE クラスタに作成するノードプールの名前。
    • PHS_CLUSTER: (省略可)アクティブな Dataproc on GKE クラスタと削除された Dataproc on GKE クラスタの Spark ジョブ履歴の表示に使用する Dataproc PHS サーバーPHS クラスタは、Dataproc on GKE 仮想クラスタと同じリージョンに配置する必要があります。
  2. 次のコマンドを実行します。

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    メモ:

    • --spark-engine-version: Dataproc クラスタで使用される Spark イメージ バージョン33.1latest などの識別子を使用することも、3.1-dataproc-5 などの完全なサブマイナー バージョンを指定することもできます。
    • --staging-bucket: Dataproc on GKE にステージング バケットを作成させるには、このフラグを削除します。
    • --pools: Dataproc が、ワークロードの作成やワークロードを実行するために使用する、新規または既存のノードプールの指定に使用します。Dataproc on GKE ノードプールの設定をカンマで区切って列挙します(例: )。
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      ノードプールの namerole を指定する必要があります。その他のノードプール設定は省略可能です。複数の --pools フラグを使用して、複数のノードプールを指定できます。少なくとも 1 つのノードプールで default のロールを持つ必要があります。すべてのノードプールには同じロケーションが必要です。
    • --setup-workload-identity: このフラグは、Workload Identity バインディングを有効にします。これらのバインディングにより、Kubernetes サービス アカウント(KSA)が仮想クラスタのデフォルトの Dataproc VM サービス アカウント(データプレーン ID)として機能できるようになります。

REST

Dataproc API の cluster.create リクエストの一部として virtualClusterConfig を作成します。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT: Google Cloud プロジェクト ID
  • REGION: Dataproc 仮想クラスタのリージョン(既存の GKE クラスタ リージョンと同じリージョン)
  • DP_CLUSTER: Dataproc クラスタ名
  • GKE_CLUSTER: GKE クラスタ名。
  • NODE_POOL: ノードプール名
  • PHS_CLUSTER: 永続履歴サーバー(PHS)のクラスタ名
  • BUCKET: (省略可)ステージング バケット名。GKE on Dataproc でステージング バケットを作成する場合は、これを空のままにします。

HTTP メソッドと URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

リクエストの本文(JSON):

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

Spark ジョブの送信

Dataproc on GKE 仮想クラスタが実行されたら、Google Cloud コンソール、gcloud CLI、または jobs.submit API を使用して(直接 HTTP リクエストまたは Cloud クライアント ライブラリを使用)Spark ジョブを送信します。

gcloud CLI Spark ジョブの例:

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark ジョブの例:

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR ジョブの例:

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

クリーンアップ