Use GPUs with Dataproc Serverless

You can attach GPU accelerators to your Dataproc Serverless batch workloads to achieve the following results:

  • Speed up the processing of large-scale data analytics workloads.

  • Accelerate model training on large datasets using GPU machine learning libraries.

  • Perform advanced data analytics, such as video or natural language processing.

All supported Dataproc Serverless Spark runtimes add the Spark RAPIDS library to each workload node. Dataproc Serverless Spark runtime version 1.1 also adds the XGBoost library to workload nodes. These libraries provide a powerful data transformation and machine learning tools that you can use in your GPU-accelerated workloads.

GPU benefits

Here are some of the benefits when you use GPUs with your Dataproc Serverless Spark workloads:

  • Performance improvement: GPU acceleration can significantly boost Spark workloads performance, particularly for compute-intensive tasks, such as machine and deep learning, graph processing, and complex analytics.

  • Faster model training: For machine learning tasks, attaching GPUs can dramatically reduce the time required to train models, enabling data scientists and engineers to iterate and experiment quickly.

  • Scalability: Customers can add more GPUs nodes or more powerful GPUs to nodes for handling increasingly complex processing needs.

  • Cost efficiency: Although GPUs require an initial investment, you can achieve cost savings over time due to reduced processing times and more efficient resource utilization.

  • Enhanced data analytics: GPU acceleration lets you perform advanced analytics, such as image and video analysis and natural language processing, on large datasets.

  • Improved products: Faster processing enables quicker decision-making and more responsive applications.

Limitations and considerations

Pricing

See Dataproc Serverless pricing for accelerator pricing information.

Before you begin

Before creating a serverless batch workload with attached GPU accelerators, do the following:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets page

  13. Click Create bucket.
  14. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    • For Name your bucket, enter a name that meets the bucket naming requirements.
    • For Choose where to store your data, do the following:
      • Select a Location type option.
      • Select a Location option.
    • For Choose a default storage class for your data, select a storage class.
    • For Choose how to control access to objects, select an Access control option.
    • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
  15. Click Create.

Create a serverless batch workload with GPU accelerators

Submit a Dataproc Serverless batch workload that uses NVIDIA L4 GPUs to run a parallelized PySpark task. Follow these steps using the gcloud CLI:

  1. Click Expand me, then create and save the listed PySpark code to a test-py-spark-gpu.py file on your local machine using a text or code editor.

    #!/usr/bin/env python
    
    """S8s Accelerators Example."""
    
    import subprocess
    from typing import Any
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StructType
    
    spark = SparkSession.builder.appName("joindemo").getOrCreate()
    
    
    def get_num_gpus(_: Any) -> int:
      """Returns the number of GPUs."""
      p_nvidia_smi = subprocess.Popen(
          ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
      )
      p_wc = subprocess.Popen(
          ["wc", "-l"],
          stdin=p_nvidia_smi.stdout,
          stdout=subprocess.PIPE,
          stderr=subprocess.PIPE,
          universal_newlines=True,
      )
      [out, _] = p_wc.communicate()
      return int(out)
    
    
    num_workers = 5
    result = (
        spark.sparkContext.range(0, num_workers, 1, num_workers)
        .map(get_num_gpus)
        .collect()
    )
    num_gpus = sum(result)
    print(f"Total accelerators: {num_gpus}")
    
    # Run the join example
    schema = StructType([StructField("value", IntegerType(), True)])
    df = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    df2 = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    joined_df = (
        df.select(col("value").alias("a"))
        .join(df2.select(col("value").alias("b")), col("a") == col("b"))
        .explain()
    )
  2. Use the gcloud CLI on your local machine to submit the Dataproc Serverless serverless batch job with five workers, with each worker accelerated with L4 GPUs:

    gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_NAME \
        --version=1.1 \
        --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
    

Notes:

  • PROJECT_ID: Your Google Cloud project ID.
  • REGION: An available Compute Engine region to run the workload.
  • BUCKET_NAME: The name of your Cloud Storage bucket. Spark uploads workload dependencies to a /dependencies folder in this bucket before running the batch workload.
  • --version: All Supported Dataproc Serverless runtimes add the RAPIDS library to each node of a GPU-accelerated workload. Currently, only runtime version 1.1 adds the XGBoost library to each node of a GPU-accelerated workload.
  • --properties (see Spark resource allocation properties) :

    • spark.dataproc.driverEnv.LANG=C.UTF-8 and spark.executorEnv.LANG=C.UTF-8 (required with runtime versions prior to 2.2): These properties set the default character set to C.UTF-8.
    • spark.dataproc.executor.compute.tier=premium (required): GPU-accelerated workloads are billed using premium Data Compute Units (DCUs). See Dataproc Serverless Accelerator pricing.

    • spark.dataproc.executor.disk.tier=premium (required): Nodes with A100-40, A100-80, or L4 accelerators must use the premium disk tier.

    • spark.dataproc.executor.resource.accelerator.type=l4 (required): Only one GPU type must be specified. The example job selects the L4 GPU. The following accelerator types can be specified with the following argument names:

      GPU type Argument name
      A100 40GB a100-40
      A100 80GB a100-80

    • spark.executor.instances=5 (required): Must be at least two. Set to five for this example.

    • spark.executor.cores (optional): You can set this property to specify the number of core vCPUs. Valid values for L4 GPUs are 4, the default, or 8, 12, 16, 24, 48 or 96. The only valid, and default, value for A100 GPUs is 12. Configurations with L4 GPUs and 24, 48 or 96 cores have 2, 4 or 8 GPUs attached to each executor. All other configurations have 1 GPU attached.

    • spark.dataproc.executor.disk.size (required): L4 GPUs have a fixed disk size of 375 GB, except for configurations with 24, 48 or 96 cores, which have 750, 1,500, or 3,000 GB, respectively. If you set this property to a different value when submitting an L4-accelerated workload, an error occurs. If you select an A100 40 or A100 80 GPU, valid sizes are 375g, 750g, 1500g, 3000g, 6000g, and 9000g.

    • spark.executor.memory (optional) and spark.executor.memoryOverhead (optional): You can set one of these properties, but not both. The amount of available memory not consumed by the set property is applied to the unset property. As a default, spark.executor.memoryOverhead is set to 40% of available memory for PySpark batch workloads, and 10% for other workloads (see Spark resource allocation properties).

      The following table shows the maximum amount of memory that can be set for different A100 and L4 GPU configurations. The minimum value for either property is 1024 MB.

      A100 (40 GB) A100 (80 GB) L4 (4 cores) L4 (8 cores) L4 (12 cores) L4 (16 cores) L4 (24 cores) L4 (48 cores) L4 (96 cores)
      Maximum total memory (MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216
    • Spark RAPIDS properties (optional): By default, Dataproc Serverless sets the following Spark RAPIDS property values:

      • spark.plugins=com.nvidia.spark.SQLPlugin
      • spark.executor.resource.gpu.amount=1
      • spark.task.resource.gpu.amount=1/$spark_executor_cores
      • spark.shuffle.manager=''. By default, this property is unset. However, NVIDIA recommends turning on the RAPIDS shuffle manager when using GPUs to improve performance. To do this, set spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager when you submit a workload.

      See RAPIDS Accelerator for Apache Spark Configuration to set Spark RAPIDS properties, and RAPIDS Accelerator for Apache Spark Advanced Configuration to set Spark advanced properties.