GPU アクセラレータを Dataproc サーバーレス バッチ ワークロードに接続すると、次の結果が得られます。
大規模なデータ分析ワークロードの処理を高速化します。
GPU 機械学習ライブラリを使用して、大規模なデータセットでのモデルのトレーニングを高速化します。
動画や自然言語処理などの高度なデータ分析を実行します。
サポートされている Dataproc サーバーレス Spark ランタイムはすべて、各ワークロード ノードに Spark RAPIDS ライブラリを追加します。Dataproc サーバーレス Spark ランタイム バージョン 1.1 では、ワークロード ノードに XGBoost ライブラリも追加されます。これらのライブラリは、GPU 高速化ワークロードで使用できる強力なデータ変換ツールと機械学習ツールを提供します。
GPU のメリット
Dataproc Serverless Spark ワークロードで GPU を使用すると、次のような利点があります。
パフォーマンスの向上: GPU アクセラレーションは、Spark ワークロードのパフォーマンスを大幅に向上させることができます。特に、機械学習やディープ ラーニング、グラフ処理、複雑な分析など、コンピューティング負荷の高いタスクに適しています。
モデルのトレーニングの高速化: 機械学習タスクの場合、GPU を接続すると、モデルのトレーニングに必要な時間が大幅に短縮され、データ サイエンティストやエンジニアは迅速なイテレーションとテストが可能になります。
スケーラビリティ: ますます複雑化する処理ニーズに対処するために、より多くの GPU ノードや強力な GPU をノードに追加できます。
コスト効率: GPU には初期投資が必要ですが、処理時間の短縮とリソース使用率の効率化により、時間の経過とともにコストを節約できます。
強化されたデータ分析: GPU アクセラレーションにより、大規模なデータセットに対して画像と動画の分析や自然言語処理などの高度な分析を実行できます。
改善されたプロダクト: より高速な処理により、迅速な意思決定とより応答性の高いアプリケーションを実現します。
制限事項と考慮事項
NVIDIA A100 または NVIDIA L4 の GPU を Dataproc サーバーレス バッチ ワークロードに接続できます。A100 および L4 アクセラレータは、Compute Engine GPU を利用できるリージョンの対象です。
XGBoost ライブラリは、Dataproc サーバーレス Spark ランタイム バージョン 1.x を使用しているときに、Dataproc サーバーレス GPU 高速化ワークロードにのみ提供されます。
XGBoost を使用した Dataproc サーバーレス GPU 高速化バッチでは、Compute Engine 割り当ての増加が使用されます。たとえば、NVIDIA L4 GPU を使用するサーバーレス バッチ ワークロードを実行するには、NVIDIA_L4_GPUS 割り当てを割り当てる必要があります。
アクセラレータが有効になっているジョブには、
constraints/compute.requireShieldedVm
組織のポリシーとの互換性がありません。組織でこのポリシーが適用されると、アクセラレータが有効になっているジョブは正常に実行されません。バージョン
2.2
より前の Dataproc サーバーレス ランタイムで RAPIDS GPU アクセラレーションを使用する場合は、デフォルトの文字セットを UTF-8 に設定する必要があります。詳細については、GPU アクセラレータを使用してサーバーレス バッチ ワークロードを作成するをご覧ください。
料金
アクセラレータの料金については、Dataproc サーバーレスの料金をご覧ください。
準備
GPU アクセラレータが接続されたサーバーレス バッチ ワークロードを作成する前に、次の操作を行います。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
GPU アクセラレータを使用してサーバーレス バッチ ワークロードを作成する
NVIDIA L4 GPU を使用する Dataproc サーバーレス バッチ ワークロードを送信して、並列化された PySpark タスクを実行します。gcloud CLI を使用して、次の手順を行います。
[展開する] をクリックし、テキストまたはコードエディタを使用して、一覧表示された PySpark コードを作成し、ローカルマシンの
test-py-spark-gpu.py
ファイルに保存します。#!/usr/bin/env python """S8s Accelerators Example.""" import subprocess from typing import Any from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import IntegerType from pyspark.sql.types import StructField from pyspark.sql.types import StructType spark = SparkSession.builder.appName("joindemo").getOrCreate() def get_num_gpus(_: Any) -> int: """Returns the number of GPUs.""" p_nvidia_smi = subprocess.Popen( ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE ) p_wc = subprocess.Popen( ["wc", "-l"], stdin=p_nvidia_smi.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) [out, _] = p_wc.communicate() return int(out) num_workers = 5 result = ( spark.sparkContext.range(0, num_workers, 1, num_workers) .map(get_num_gpus) .collect() ) num_gpus = sum(result) print(f"Total accelerators: {num_gpus}") # Run the join example schema = StructType([StructField("value", IntegerType(), True)]) df = ( spark.sparkContext.parallelize(range(1, 10000001), 6) .map(lambda x: (x,)) .toDF(schema) ) df2 = ( spark.sparkContext.parallelize(range(1, 10000001), 6) .map(lambda x: (x,)) .toDF(schema) ) joined_df = ( df.select(col("value").alias("a")) .join(df2.select(col("value").alias("b")), col("a") == col("b")) .explain() )
ローカルマシンで gcloud CLI を使用して、各ワーカーを L4 GPU で高速化した 5 ワーカーで、Dataproc Serverless サーバーレス バッチジョブを送信します。
gcloud dataproc batches submit pyspark test-py-spark-gpu.py \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=1.1 \ --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
注:
- PROJECT_ID: Google Cloud プロジェクト ID
- REGION: ワークロードを実行する利用可能な Compute Engine リージョン。
- BUCKET_NAME: Cloud Storage バケットの名前。Spark は、バッチ ワークロードを実行する前に、このバケットの
/dependencies
フォルダにワークロードの依存関係をアップロードします。 - --version: すべてのサポートされている Dataproc サーバーレス ランタイムで、GPU 高速化ワークロードの各ノードに RAPIDS ライブラリが追加されます。現時点では、GPU 高速化ワークロードの各ノードに XGBoost ライブラリが追加されるのは、ランタイム バージョン 1.1 のみです。
--properties(Spark のリソース割り当てプロパティを参照):
spark.dataproc.driverEnv.LANG=C.UTF-8
とspark.executorEnv.LANG=C.UTF-8
(2.2
より前のランタイム バージョンで必要): これらのプロパティは、デフォルトの文字セットを C.UTF-8 に設定します。spark.dataproc.executor.compute.tier=premium
(必須): GPU 高速化ワークロードは、プレミアム データ コンピューティング ユニット(DCU)を使用して課金されます。Dataproc サーバーレスのアクセラレータの料金をご覧ください。spark.dataproc.executor.disk.tier=premium
(必須): A100-40、A100-80、または L4 アクセラレータを搭載したノードでは、プレミアム ディスク階層を使用する必要があります。spark.dataproc.executor.resource.accelerator.type=l4
(必須): GPU タイプは 1 つだけ指定する必要があります。この例のジョブでは、L4 GPU を選択します。次のアクセラレータ タイプでは、次の引数名を指定できます。GPU のタイプ 引数名 A100 40GB a100-40
A100 80GB a100-80
spark.executor.instances=5
(必須): 2 つ以上にする必要があります。この例では 5 に設定します。spark.executor.cores
(省略可): このプロパティを設定して、コア vCPU の数を指定できます。L4 GPU の有効な値は、4
、デフォルト、または8
、12
、16
です。A100 GPU の有効かつデフォルトの値は12
のみです。spark.dataproc.executor.disk.size
(必須): L4 GPU には 375 GB のサイズが必要です。L4 高速化ワークロードの送信時に、このプロパティを別の値に設定すると、エラーが発生します。A100 40 または A100 80 GPU を選択した場合、有効なサイズは 375g、750g、1500g、3000g、6000g、9000g です。spark.executor.memory
(省略可)とspark.executor.memoryOverhead
(省略可): これらのプロパティのいずれかを設定できますが、両方は設定できません。設定されたプロパティで使用されていない使用可能なメモリの量は、設定されていないプロパティに適用されます。 デフォルトでは、spark.executor.memoryOverhead
は PySpark バッチ ワークロードの使用可能なメモリの 40% に設定され、他のワークロードの場合は 10% に設定されます(Spark のリソース割り当てプロパティをご覧ください)。次の表に、さまざまな A100 GPU 構成と L4 GPU 構成に設定できる最大メモリ容量を示します。どちらのプロパティの最小値も
1024
MB です。A100 (40 GB) A100 (80 GB) L4(4 コア) L4(8 コア) L4(12 コア) L4(16 コア) 最大合計メモリ(MB) 78040 165080 13384 26768 40152 53536 Spark RAPIDS のプロパティ(省略可): デフォルトでは、Dataproc サーバーレスは次の Spark RAPIDS プロパティ値を設定します。
spark.plugins
=com.nvidia.spark.SQLPluginspark.executor.resource.gpu.amount
=1spark.task.resource.gpu.amount
=1/$spark_executor_coresspark.shuffle.manager
=''。デフォルトでは、このプロパティは設定されていません。 ただし、パフォーマンス向上のために GPU を使用する場合には、RAPIDS シャッフル マネージャーを有効にすることが NVIDIA により推奨されています。これを行うには、ワークロードを送信するときにspark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
を設定します。
Spark RAPIDS プロパティを設定するには、Apache Spark 構成用の RAPIDS アクセラレータをご覧ください。また、Spark 詳細プロパティを設定するには、Apache Spark 詳細構成用の RAPIDS アクセラレータをご覧ください。