始める前に
- クラスタで Workload Identity が有効になっている自動パイロットではない標準的なGoogle Kubernetes Engine(GKE)ゾーンクラスタまたはリージョン クラスタを作成済みである必要があります。
Dataproc-on-GKE 仮想クラスタを作成する
Dataproc on GKE 仮想クラスタは、Dataproc コンポーネントのデプロイ プラットフォームとして作成されます。これは仮想リソースであり、Dataproc on Compute Engine クラスタとは異なり、個別の Dataproc マスター VM とワーカー VM が含まれません。
Dataproc on GKE では、Dataproc on GKE 仮想クラスタを作成するときに、GKE クラスタ内にノードプールが作成されます。
Dataproc on GKE ジョブは、こうしたノードプールで Pod として実行されます。ノードプールとノードプール上の Pod のスケジューリングは、GKE が管理します。
複数の仮想クラスタを作成します。GKE クラスタで複数の仮想クラスタを作成して実行し、仮想クラスタ間でノードプールを共有することで、リソース使用率を改善できます。
- 各仮想クラスタ:
- Spark エンジンのバージョンやワークロード ID などの個別のプロパティで作成されます。
- GKE クラスタの個別の GKE Namespace 内で分離されています。
- 各仮想クラスタ:
Console
Google Cloud コンソールで、Dataproc の [クラスタ] ページに移動します。
[作成] をクリックします。
[Dataproc クラスタの作成] ダイアログで、[Cluster on GKE] 行の [作成] をクリックします。
[クラスタの設定] パネルで次の操作を行います。
- [クラスタ名] フィールドに、クラスタの名前を入力します。
- [リージョン] リストで、Dataproc on GKE 仮想クラスタのリージョンを選択します。このリージョンは、既存の GKE クラスタが配置されている同じリージョン(次の項目で選択)に存在する必要があります。
- [Kubernetes クラスタ] フィールドで、[参照] をクリックして、既存の GKE クラスタが配置されているリージョンを選択します。
- 省略可: [Cloud Storage ステージング バケット] フィールドで [参照] をクリックして、既存の Cloud Storage バケットを選択できます。Dataproc on GKE は、バケットにアーティファクトをステージングします。Dataproc on GKE でステージング バケットを作成するには、このフィールドを省略します。
左側のパネルで [ノードプールの構成] をクリックし、[ノードプール] パネルで [プールを追加] をクリックします。
- GKE のノードプールで既存の Dataproc を再利用するには:
- [既存のノードプールを再利用] をクリックします。
- 既存のノードプールの名前を入力して、[ロール] を選択します。少なくとも 1 つのノードプールで DEFAULT のロールを持つ必要があります。
- [完了] をクリックします。
- 新しい Dataproc on GKE ノードプールを作成するには:
- [プールを追加] をクリックしてノードプールをさらに追加します。すべてのノードプールにはロケーションが必要です。合計 4 つのノードプールを追加できます。
- GKE のノードプールで既存の Dataproc を再利用するには:
(オプション)アクティブおよび削除された Dataproc on GKE クラスタで Spark ジョブ履歴を表示するために使用する Dataproc Persistent History Server(PHS)を設定した場合は、[クラスタのカスタマイズ] をクリックします。次に、[History サーバー クラスタ] フィールドで、PHS クラスタを参照して選択します。PHS クラスタは、Dataproc on GKE 仮想クラスタと同じリージョンに配置する必要があります。
[作成] をクリックして Dataproc クラスタを作成します。Dataproc on GKE クラスタが、[クラスタ] ページのリストに表示されます。クラスタが使用できるようになるまでのステータスは [プロビジョニング] で、その後 [実行中] に変わります。
gcloud
環境変数を設定し、ローカルまたは Cloud Shell で gcloud dataproc clusters gke create
コマンドを実行して GKE クラスタ上に Dataproc を作成します。
環境変数を設定します。
注:DP_CLUSTER=Dataproc on GKE cluster-name \ REGION=region \ GKE_CLUSTER=GKE cluster-name \ BUCKET=Cloud Storage bucket-name \ DP_POOLNAME=node pool-name PHS_CLUSTER=Dataproc PHS server name
DP_CLUSTER
: Dataproc の仮想クラスタ名は、小文字で始まり、54 文字以下の小文字、数字、ハイフンで構成される必要があります。末尾をハイフンにはできません。REGION
: region は、GKE クラスタが配置されているリージョンと同じである必要があります。GKE_CLUSTER
: 既存の GKE クラスタの名前。BUCKET
: (省略可)Cloud Storage バケットの名前を指定できます。Dataproc は、この名前を使用してアーティファクトをステージングします。バケットを指定しない場合は、Dataproc on GKE がステージング バケットを作成します。DP_POOLNAME
: GKE クラスタに作成するノードプールの名前。PHS_CLUSTER
: (省略可)アクティブな Dataproc on GKE クラスタと削除された Dataproc on GKE クラスタの Spark ジョブ履歴の表示に使用する Dataproc PHS サーバーPHS クラスタは、Dataproc on GKE 仮想クラスタと同じリージョンに配置する必要があります。
次のコマンドを実行します。
注:gcloud dataproc clusters gke create ${DP_CLUSTER} \ --region=${REGION} \ --gke-cluster=${GKE_CLUSTER} \ --spark-engine-version=latest \ --staging-bucket=${BUCKET} \ --pools="name=${DP_POOLNAME},roles=default" \ --setup-workload-identity \ --history-server-cluster=${PHS_CLUSTER}
--spark-engine-version
: Dataproc クラスタで使用される Spark イメージ バージョン。3
、3.1
、latest
などの識別子を使用することも、3.1-dataproc-5
などの完全なサブマイナー バージョンを指定することもできます。--staging-bucket
: Dataproc on GKE にステージング バケットを作成させるには、このフラグを削除します。--pools
: Dataproc が、ワークロードの作成やワークロードを実行するために使用する、新規または既存のノードプールの指定に使用します。Dataproc on GKE ノードプールの設定をカンマで区切って列挙します(例: )。 ノードプールの--pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
name
とrole
を指定する必要があります。その他のノードプールの設定は省略可能です。複数の--pools
フラグを使用して、複数のノードプールを指定できます。少なくとも 1 つのノードプールでdefault
のロールを持つ必要があります。すべてのノードプールには同じロケーションが必要です。--setup-workload-identity
: このフラグは、Workload Identity バインディングを有効にします。これらのバインディングにより、Kubernetes サービス アカウント(KSA)が仮想クラスタのデフォルトの Dataproc VM サービス アカウント(データプレーン ID)として機能できるようになります。
REST
Dataproc API の cluster.create
リクエストの一部として virtualClusterConfig を作成します。
リクエストのデータを使用する前に、次のように置き換えます。
- PROJECT: Google Cloud プロジェクト ID
- REGION: Dataproc 仮想クラスタのリージョン(既存の GKE クラスタ リージョンと同じリージョン)
- DP_CLUSTER: Dataproc クラスタ名
- GKE_CLUSTER: GKE クラスタ名。
- NODE_POOL: ノードプール名
- PHS_CLUSTER: 永続履歴サーバー(PHS)のクラスタ名
- BUCKET: (省略可)ステージング バケット名。GKE on Dataproc でステージング バケットを作成する場合は、これを空のままにします。
HTTP メソッドと URL:
POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters
リクエストの本文(JSON):
{ "clusterName":"DP_CLUSTER", "projectId":"PROJECT", "virtualClusterConfig":{ "auxiliaryServicesConfig":{ "sparkHistoryServerConfig":{ "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER" } }, "kubernetesClusterConfig":{ "gkeClusterConfig":{ "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER", "nodePoolTarget":[ { "nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL", "roles":[ "DEFAULT" ] } ] }, "kubernetesSoftwareConfig":{ "componentVersion":{ "SPARK":"latest" } } }, "stagingBucket":"BUCKET" } }
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{ "projectId":"PROJECT", "clusterName":"DP_CLUSTER", "status":{ "state":"RUNNING", "stateStartTime":"2022-04-01T19:16:39.865716Z" }, "clusterUuid":"98060b77-...", "statusHistory":[ { "state":"CREATING", "stateStartTime":"2022-04-01T19:14:27.340544Z" } ], "labels":{ "goog-dataproc-cluster-name":"DP_CLUSTER", "goog-dataproc-cluster-uuid":"98060b77-...", "goog-dataproc-location":"REGION", "goog-dataproc-environment":"prod" }, "virtualClusterConfig":{ "stagingBucket":"BUCKET", "kubernetesClusterConfig":{ "kubernetesNamespace":"dp-cluster", "gkeClusterConfig":{ "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER", "nodePoolTarget":[ { "nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL", "roles":[ "DEFAULT" ] } ] }, "kubernetesSoftwareConfig":{ "componentVersion":{ "SPARK":"3.1-..." }, "properties":{ "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...", "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history", "spark:spark.eventLog.enabled":"true" } } }, "auxiliaryServicesConfig":{ "sparkHistoryServerConfig":{ "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER" } } }
Spark ジョブの送信
Dataproc on GKE 仮想クラスタが実行されたら、Google Cloud コンソール、gcloud CLI、または jobs.submit
API を使用して(直接 HTTP リクエストまたは Cloud クライアント ライブラリを使用)Spark ジョブを送信します。
gcloud CLI Spark ジョブの例:
gcloud dataproc jobs submit spark \ --region=${REGION} \ --cluster=${DP_CLUSTER} \ --class=org.apache.spark.examples.SparkPi \ --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \ -- 1000
gcloud CLI PySpark ジョブの例:
gcloud dataproc jobs submit pyspark \ --region=${REGION} \ --cluster=${DP_CLUSTER} \ local:///usr/lib/spark/examples/src/main/python/pi.py \ -- 10
gcloud CLI SparkR ジョブの例:
gcloud dataproc jobs submit spark-r \ --region=${REGION} \ --cluster=${DP_CLUSTER} \ local:///usr/lib/spark/examples/src/main/r/dataframe.R
クリーンアップ
このクイックスタートで使用した以下のリソースのうち、今後使用しないリソースは削除します。
Dataproc on GKE クラスタによって使用されるノードプールを削除します。