Apache Spark 用 Google Cloud Serverless で GPU を使用する

GPU アクセラレータを Google Cloud Serverless for Apache Spark バッチ ワークロードに接続すると、次の結果が得られます。

  • 大規模なデータ分析ワークロードの処理を高速化します。

  • GPU ML ライブラリを使用して、大規模なデータセットでのモデル トレーニングを高速化します。

  • 動画や自然言語処理などの高度なデータ分析を実行します。

サポートされているすべての Serverless for Apache Spark Spark ランタイムで、各ワークロード ノードに Spark RAPIDS ライブラリが追加されます。Apache Spark 用 Serverless Spark ランタイム バージョン 1.1 では、ワークロード ノードに XGBoost ライブラリも追加されます。これらのライブラリは、GPU 高速化ワークロードで使用できる強力なデータ変換ツールと機械学習ツールを提供します。

GPU のメリット

Apache Spark 用 Serverless で GPU を使用するメリットは次のとおりです。

  • パフォーマンスの向上: GPU アクセラレーションは、特に機械学習やディープ ラーニング、グラフ処理、複雑な分析などのコンピューティング負荷の高いタスクで、Spark ワークロードのパフォーマンスを大幅に向上させることができます。

  • モデルのトレーニングの高速化: 機械学習タスクでは、GPU を接続することでモデルのトレーニングに必要な時間を大幅に短縮できます。これにより、データ サイエンティストやエンジニアは迅速に反復処理とテストを行うことができます。

  • スケーラビリティ: 複雑化する処理ニーズに対応するため、GPU ノードを追加したり、ノードに強力な GPU を追加したりできます。

  • 費用対効果: GPU には初期投資が必要ですが、処理時間の短縮とリソースの効率的な使用により、長期的にコスト削減を実現できます。

  • 高度なデータ分析: GPU アクセラレーションにより、大規模なデータセットで画像や動画の分析、自然言語処理などの高度な分析を実行できます。

  • プロダクトの改善: 処理速度の向上により、意思決定の迅速化とアプリケーションの応答性の向上を実現します。

制限事項と考慮事項

料金

アクセラレータの料金については、Apache Spark 用サーバーレスの料金をご覧ください。

始める前に

GPU アクセラレータがアタッチされたサーバーレス Batch ワークロードを作成する前に、次の操作を行います。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. 外部 ID プロバイダ(IdP)を使用している場合は、まずフェデレーション ID を使用して gcloud CLI にログインする必要があります。

  7. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. 外部 ID プロバイダ(IdP)を使用している場合は、まずフェデレーション ID を使用して gcloud CLI にログインする必要があります。

  13. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  14. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  15. Click Create.
  16. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    1. In the Get started section, do the following:
      • Enter a globally unique name that meets the bucket naming requirements.
      • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
    2. In the Choose where to store your data section, do the following:
      1. Select a Location type.
      2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
      3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

        Set up cross-bucket replication

        1. In the Bucket menu, select a bucket.
        2. In the Replication settings section, click Configure to configure settings for the replication job.

          The Configure cross-bucket replication pane appears.

          • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
          • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
          • Click Done.
    3. In the Choose how to store your data section, do the following:
      1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
      2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
    4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
    5. In the Choose how to protect object data section, do the following:
      • Select any of the options under Data protection that you want to set for your bucket.
        • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
        • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
        • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
          • To enable Object Retention Lock, click the Enable object retention checkbox.
          • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
      • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
  17. Click Create.
  18. GPU アクセラレータを使用してサーバーレス バッチ ワークロードを作成する

    NVIDIA L4 GPU を使用する Apache Spark 用サーバーレス バッチ ワークロードを送信して、並列化された PySpark タスクを実行します。gcloud CLI を使用して次の操作を行います。

    1. [展開する] をクリックし、テキストまたはコードエディタを使用して、一覧表示された PySpark コードを作成し、ローカルマシンの test-py-spark-gpu.py ファイルに保存します。

      #!/usr/bin/env python
      
      """S8s Accelerators Example."""
      
      import subprocess
      from typing import Any
      from pyspark.sql import SparkSession
      from pyspark.sql.functions import col
      from pyspark.sql.types import IntegerType
      from pyspark.sql.types import StructField
      from pyspark.sql.types import StructType
      
      spark = SparkSession.builder.appName("joindemo").getOrCreate()
      
      
      def get_num_gpus(_: Any) -> int:
        """Returns the number of GPUs."""
        p_nvidia_smi = subprocess.Popen(
            ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
        )
        p_wc = subprocess.Popen(
            ["wc", "-l"],
            stdin=p_nvidia_smi.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
        )
        [out, _] = p_wc.communicate()
        return int(out)
      
      
      num_workers = 5
      result = (
          spark.sparkContext.range(0, num_workers, 1, num_workers)
          .map(get_num_gpus)
          .collect()
      )
      num_gpus = sum(result)
      print(f"Total accelerators: {num_gpus}")
      
      # Run the join example
      schema = StructType([StructField("value", IntegerType(), True)])
      df = (
          spark.sparkContext.parallelize(range(1, 10000001), 6)
          .map(lambda x: (x,))
          .toDF(schema)
      )
      df2 = (
          spark.sparkContext.parallelize(range(1, 10000001), 6)
          .map(lambda x: (x,))
          .toDF(schema)
      )
      joined_df = (
          df.select(col("value").alias("a"))
          .join(df2.select(col("value").alias("b")), col("a") == col("b"))
          .explain()
      )
    2. ローカルマシンで gcloud CLI を使用して、各ワーカーを L4 GPU で高速化した 5 ワーカーで、Apache Spark 用 Serverless サーバーレス バッチジョブを送信します。

      gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
          --project=PROJECT_ID \
          --region=REGION \
          --deps-bucket=BUCKET_NAME \
          --version=1.1 \
          --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
      

    注:

    • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
    • REGION: ワークロードを実行できる利用可能な Compute Engine リージョン
    • BUCKET_NAME: Cloud Storage バケットの名前。Spark は、バッチ ワークロードを実行する前に、ワークロードの依存関係をこのバケットの /dependencies フォルダにアップロードします。
    • --version: すべてのサポートされている Google Cloud Apache Spark 用サーバーレス ランタイムで、GPU 高速化ワークロードの各ノードに RAPIDS ライブラリが追加されます。ランタイム バージョン 1.1 のみで、GPU 高速化ワークロードの各ノードに XGBoost ライブラリが追加されます。
    • --propertiesSpark リソース割り当てプロパティを参照):

      • spark.dataproc.driverEnv.LANG=C.UTF-8spark.executorEnv.LANG=C.UTF-8 (2.2 より前のランタイム バージョンで必須): これらのプロパティは、デフォルトの文字セットを C.UTF-8 に設定します。
      • spark.dataproc.executor.compute.tier=premium(必須): GPU アクセラレータ ワークロードは、プレミアム データ コンピューティング ユニット(DCU)を使用して課金されます。Apache Spark 用サーバーレスのアクセラレータの料金をご覧ください。

      • spark.dataproc.executor.disk.tier=premium(必須): A100-40、A100-80、または L4 アクセラレータを使用するノードは、プレミアム ディスク ティアを使用する必要があります。

      • spark.dataproc.executor.resource.accelerator.type=l4(必須): GPU タイプは 1 つだけ指定する必要があります。このジョブの例では、L4 GPU を選択しています。次のアクセラレータ タイプでは、次の引数名を指定できます。

        GPU のタイプ 引数名
        A100 40GB a100-40
        A100 80GB a100-80

      • spark.executor.instances=5(必須): 2 以上にする必要があります。この例では 5 に設定します。

      • spark.executor.cores(省略可): このプロパティを設定して、コア vCPU の数を指定できます。L4 GPU の有効な値は、デフォルトの 4、または 81216244896 です。A100 GPU の有効なデフォルト値は 12 のみです。L4 GPU と 244896 コアを含む構成では、各エグゼキュータに 248 GPU がアタッチされています。他のすべての構成には 1 GPU が接続されています。

      • spark.dataproc.executor.disk.size(必須): L4 GPU のディスクサイズは 375 GB に固定されています。ただし、244896 コアを含む構成では、それぞれ 7501,5003,000 GB になります。L4 高速化ワークロードの送信時に、このプロパティを別の値に設定すると、エラーが発生します。A100 40 または A100 80 GPU を選択した場合、有効なサイズは 375g、750g、1500g、3000g、6000g、9000g です。

      • spark.executor.memory(省略可)と spark.executor.memoryOverhead(省略可): これらのプロパティはどちらか一方のみ設定できます。設定されたプロパティで使用されていない使用可能なメモリの量は、設定されていないプロパティに適用されます。デフォルトでは、spark.executor.memoryOverhead は PySpark バッチ ワークロードの使用可能なメモリの 40% に設定され、他のワークロードの場合は 10% に設定されます(Spark のリソース割り当てプロパティをご覧ください)。

        次の表に、A100 と L4 のさまざまな GPU 構成で設定できる最大メモリ容量を示します。どちらのプロパティの最小値も 1024 MB です。

        A100 (40 GB) A100 (80 GB) L4(4 コア) L4(8 コア) L4(12 コア) L4(16 コア) L4(24 コア) L4(48 コア) L4(96 コア)
        最大合計メモリ(MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216
      • Spark RAPIDS プロパティ(省略可): デフォルトでは、Apache Spark 用サーバーレスは次の Spark RAPIDS プロパティ値を設定します。

        • spark.plugins=com.nvidia.spark.SQLPlugin
        • spark.executor.resource.gpu.amount=1
        • spark.task.resource.gpu.amount=1/$spark_executor_cores
        • spark.shuffle.manager=''。デフォルトでは、このプロパティは設定されていません。パフォーマンス向上のために GPU を使用する場合には、RAPIDS シャッフル マネージャーを有効にすることが NVIDIA により推奨されています。これを行うには、ワークロードを送信するときに spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager を設定します。
        • spark.rapids.sql.concurrentGpuTasks= 最小値(gpuMemoryinMB / 8、4)
        • spark.rapids.shuffle.multiThreaded.writer.threads= min (VM の CPU コア数 / VM あたりの GPU 数, 32)
        • spark.rapids.shuffle.multiThreaded.reader.threads= min (VM の CPU コア数 / VM あたりの GPU 数, 32)

        Spark RAPIDS プロパティを設定するには、Apache Spark 構成用の RAPIDS アクセラレータをご覧ください。また、Spark 詳細プロパティを設定するには、Apache Spark 詳細構成用の RAPIDS アクセラレータをご覧ください。