Dataproc サーバーレスで GPU を使用する

Dataproc サーバーレス バッチ ワークロードに GPU アクセラレータを接続して、次の結果を得ることができます。

  • 大規模なデータ分析ワークロードの処理を高速化します。

  • GPU 機械学習ライブラリを使用して、大規模なデータセットでのモデル トレーニングを高速化します。

  • 動画や自然言語処理などの高度なデータ分析を実行します。

サポートされているすべての Dataproc サーバーレス Spark ランタイムでは、各ワークロード ノードに Spark RAPIDS ライブラリが追加されます。Dataproc Serverless for Spark ランタイム バージョン 1.1 では、XGBoost ライブラリもワークロード ノードに追加されます。これらのライブラリは、GPU 高速化ワークロードで使用できる強力なデータ変換ツールと機械学習ツールを提供します。

GPU の利点

Dataproc Serverless Spark ワークロードで GPU を使用する場合の利点は次のとおりです。

  • パフォーマンスの向上: GPU アクセラレーションにより、特に機械学習とディープ ラーニング、グラフ処理、複雑な分析などのコンピューティング負荷の高いタスクを行う場合に、Spark ワークロードのパフォーマンスを大幅に向上させることができます。

  • モデルのトレーニングの高速化: 機械学習のタスクでは、GPU を接続することでモデルのトレーニングに必要な時間を大幅に短縮し、データ サイエンティストやエンジニアがすばやく反復処理とテストを行うことができます。

  • スケーラビリティ: ノードに対する GPU ノードやより強力な GPU を追加して、より複雑な処理のニーズに対応できます。

  • コスト効率: GPU は初期投資を必要としますが、処理時間が短縮され、リソース利用の効率が高いため、時間の経過とともにコストを削減できます。

  • 強化されたデータ分析: GPU アクセラレーションにより、大規模なデータセットに対して、画像や動画の分析や自然言語処理などの高度な分析を実行できます。

  • プロダクトの改善: より高速な処理により、迅速な意思決定とより応答性の高いアプリケーションが可能になります。

制限事項と考慮事項

料金

アクセラレータの料金については、Dataproc サーバーレスの料金をご覧ください。

準備

GPU アクセラレータが接続されたサーバーレス バッチ ワークロードを作成する前に、次の操作を行います。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Dataproc, Compute Engine, and Cloud Storage API を有効にします。

    API を有効にする

  5. Google Cloud CLI をインストールします。
  6. gcloud CLI を初期化するには:

    gcloud init
  7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  8. Google Cloud プロジェクトで課金が有効になっていることを確認します

  9. Dataproc, Compute Engine, and Cloud Storage API を有効にします。

    API を有効にする

  10. Google Cloud CLI をインストールします。
  11. gcloud CLI を初期化するには:

    gcloud init
  12. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets page

  13. Click Create bucket.
  14. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    • For Name your bucket, enter a name that meets the bucket naming requirements.
    • For Choose where to store your data, do the following:
      • Select a Location type option.
      • Select a Location option.
    • For Choose a default storage class for your data, select a storage class.
    • For Choose how to control access to objects, select an Access control option.
    • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
  15. Click Create.

GPU アクセラレータを使用してサーバーレス バッチ ワークロードを作成する

NVIDIA L4 GPU を使用する Dataproc サーバーレス バッチ ワークロードを送信して、並列化された PySpark タスクを実行します。gcloud CLI を使用する手順は次のとおりです。

  1. [展開する] をクリックし、テキストまたはコードエディタを使用して、一覧表示された PySpark コードを作成し、ローカルマシンの test-py-spark-gpu.py ファイルに保存します。

    #!/usr/bin/env python
    
    """S8s Accelerators Example."""
    
    import subprocess
    from typing import Any
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StructType
    
    spark = SparkSession.builder.appName("joindemo").getOrCreate()
    
    def get_num_gpus(_: Any) -> int:
      """Returns the number of GPUs."""
      p_nvidia_smi = subprocess.Popen(
          ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
      )
      p_wc = subprocess.Popen(
          ["wc", "-l"],
          stdin=p_nvidia_smi.stdout,
          stdout=subprocess.PIPE,
          stderr=subprocess.PIPE,
          universal_newlines=True,
      )
      [out, _] = p_wc.communicate()
      return int(out)
    
    num_workers = 5
    result = (
        spark.sparkContext.range(0, num_workers, 1, num_workers)
        .map(get_num_gpus)
        .collect()
    )
    num_gpus = sum(result)
    print(f"Total accelerators: {num_gpus}")
    
    # Run the join example
    schema = StructType([StructField("value", IntegerType(), True)])
    df = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    df2 = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    joined_df = (
        df.select(col("value").alias("a"))
        .join(df2.select(col("value").alias("b")), col("a") == col("b"))
        .explain()
    )
    
    
  2. ローカルマシンで gcloud CLI を使用して、各ワーカーを L4 GPU で高速化した 5 ワーカーで、Dataproc Serverless サーバーレス バッチジョブを送信します。

    gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_NAME \
        --version=1.1 \
        --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
    

メモ:

  • PROJECT_ID: Google Cloud プロジェクト ID
  • REGION: ワークロードを実行するための使用可能な Compute Engine リージョン
  • BUCKET_NAME: Cloud Storage バケットの名前。Spark は、バッチ ワークロードを実行する前に、このバケットの /dependencies フォルダにワークロードの依存関係をアップロードします。
  • --version: すべてのサポートされている Dataproc サーバーレス ランタイムで、GPU 高速化ワークロードの各ノードに RAPIDS ライブラリが追加されます。現在、GPU 高速化ワークロードの各ノードに XGBoost ライブラリが追加されるのは、ランタイム バージョン 1.1 のみです。
  • --propertiesSpark のリソース割り当てプロパティを参照):

    • spark.dataproc.driverEnv.LANG=C.UTF-8spark.executorEnv.LANG=C.UTF-82.2 より前のランタイム バージョンで必須): これらのプロパティでは、デフォルトの文字セットが C.UTF-8 に設定されます。
    • spark.dataproc.executor.compute.tier=premium(必須): GPU 高速化ワークロードは、プレミアム Data Compute Unit(DCU)を使用して課金されます。Dataproc サーバーレスのアクセラレータの料金をご覧ください。

    • spark.dataproc.executor.disk.tier=premium(必須): A100-40、A100-80、または L4 アクセラレータを含むノードは、プレミアム ディスク階層を使用する必要があります。

    • spark.dataproc.executor.resource.accelerator.type=l4(必須): GPU タイプは 1 つだけ指定する必要があります。サンプルジョブでは L4 GPU を選択します。次のアクセラレータ タイプでは、次の引数名を指定できます。

      GPU のタイプ 引数名
      A100 40GB a100-40
      A100 80GB a100-80

    • spark.executor.instances=5(必須): 少なくとも 2 つにする必要があります。この例では 5 に設定します。

    • spark.executor.cores(省略可): このプロパティを設定すると、コア vCPU の数を指定できます。L4 GPU の有効な値は、4、デフォルト、81216 です。A100 GPU の唯一の有効なデフォルト値は 12 です。

    • spark.dataproc.executor.disk.sizeA100-40A100-80 GPU の場合は省略可): Dataproc サーバーレスは、デフォルトの GPU SSD ディスクサイズを 375 GB に設定します。 A100 40 または A100 80 GPU を使用する場合は、サイズを変更できます(Spark のリソース割り当てプロパティをご覧ください)。L4 GPU には 375 GB のサイズが必要です。L4 高速化ワークロードの送信時に、このプロパティを別の値に設定すると、エラーが発生します。

    • spark.executor.memory(省略可)と spark.executor.memoryOverhead(省略可): これらのプロパティはいずれかを設定できますが、両方は設定できません。セットされたプロパティによって消費されないメモリ量は、設定されていないプロパティに適用されます。デフォルトでは、spark.executor.memoryOverhead は PySpark バッチ ワークロードの使用可能なメモリの 40% に設定され、他のワークロードの場合は 10% に設定されます(Spark のリソース割り当てプロパティをご覧ください)。

      次の表は、さまざまな A100 および L4 GPU 構成で設定できるメモリの最大量を示しています。いずれかのプロパティの最小値は 1024 MB です。

      A100 (40 GB) A100 (80 GB) L4(4 コア) L4(8 コア) L4(12 コア) L4(16 コア)
      最大合計メモリ(MB) 78040 165080 13384 26768 40152 53536
    • Spark RAPIDS のプロパティ(省略可): デフォルトでは、Dataproc サーバーレスは次の Spark RAPIDS プロパティ値を設定します。

      • spark.plugins=com.nvidia.spark.SQLPlugin
      • spark.executor.resource.gpu.amount=1
      • spark.task.resource.gpu.amount=1/$spark_executor_cores
      • spark.shuffle.manager=''。デフォルトでは、このプロパティは設定されていません。 ただし、パフォーマンス向上のために GPU を使用する場合には、RAPIDS シャッフル マネージャーを有効にすることが NVIDIA により推奨されています。これを行うには、ワークロードの送信時に spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager を設定します。

      Spark RAPIDS プロパティを設定するには、Apache Spark 構成用の RAPIDS アクセラレータをご覧ください。また、Spark 詳細プロパティを設定するには、Apache Spark 詳細構成用の RAPIDS アクセラレータをご覧ください。