Use custom containers with Dataproc Serverless for Spark

Dataproc Serverless for Spark runs workloads within Docker containers. The container provides the runtime environment for the workload's driver and executor processes. By default, Dataproc Serverless for Spark uses a container image that includes the default Spark, Java, Python and R packages associated with a runtime release version. The Dataproc Serverless for Spark batches API allows you to use a custom container image instead of the default image. Typically, a custom container image adds Spark workload Java or Python dependencies not provided by the default container image. Important: Do not include Spark in your custom container image; Dataproc Serverless for Spark will mount Spark into the container at runtime.

Submit a Spark batch workload using a custom container image

gcloud

Use the gcloud dataproc batches submit spark command with the --container-image flag to specify your custom container image when you submit a Spark batch workload.

gcloud dataproc batches submit spark \
    --container-image=custom-image, for example, "gcr.io/my-project-id/my-image:1.0.1" \
    --region=region \
    --jars=path to user workload jar located in Cloud Storage or included in the custom container \
    --class=The fully qualified name of a class in the jar file, such as org.apache.spark.examples.SparkPi \
    -- add any workload arguments here

Notes:

  • Custom-image: Specify the custom container image using the following Container Registry image naming format: {hostname}/{project-id}/{image}:{tag}, for example, "gcr.io/my-project-id/my-image:1.0.1". Note: You must host your custom container image on Container Registry or Artifact Registry. (Dataproc Serverless cannot fetch containers from other registries).
  • --jars: Specify a path to a user workload included in your custom container image or located in Cloud Storage, for example, /opt/spark/jars/spark-examples.jar or gs://my-bucket/spark/jars/spark-examples.jar.
  • Other batches command options: You can add other optional batches command flags, for example, to use a Persistent History Server (PHS). Note: The PHS must be located in the region where you run batch workloads.
  • workload argumentsYou can add any workload arguments by adding a "--" to the end of the command, followed by the workload arguments.

REST

The custom container image is provided through the RuntimeConfig.containerImage field as part of a batches.create API request.

This following example shows how to use a custom container to submit a batch workload using the Dataproc Serverless for Spark batches.create API.

Before using any of the request data, make the following replacements:

  • project-id: Google Cloud project ID
  • region: region
  • custom-container-image: Specify the custom container image using the following Container Registry image naming format: {hostname}/{project-id}/{image}:{tag}, for example, "gcr.io/my-project-id/my-image:1.0.1". Note: You must host your custom container on Container Registry or Artifact Registry . (Dataproc Serverless cannot fetch containers from other registries).
  • jar-uri: Specify a path to a workload jar included in your custom container image or located in Cloud Storage, for example, "/opt/spark/jars/spark-examples.jar" or "gs:///spark/jars/spark-examples.jar".
  • class: The fully qualified name of a class in the jar file, such as "org.apache.spark.examples.SparkPi".
  • Other options: You can use other batch workload resource fields, for example, use the sparkBatch.args field to pass arguments to your workload (see the Batch resource documentation for more information). To use a Persistent History Server (PHS), see Setting up a Persistent History Server. Note: The PHS must be located in the region where you run batch workloads.

HTTP method and URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/locations/region/batches

Request JSON body:

{
  "runtimeConfig":{
    "containerImage":"custom-container-image
  },
  "sparkBatch":{
    "jarFileUris":[
      "jar-uri"
    ],
    "mainClass":"class"
  }
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
"name":"projects/project-id/locations/region/batches/batch-id",
  "uuid":",uuid",
  "createTime":"2021-07-22T17:03:46.393957Z",
  "runtimeConfig":{
    "containerImage":"gcr.io/my-project/my-image:1.0.1"
  },
  "sparkBatch":{
    "mainClass":"org.apache.spark.examples.SparkPi",
    "jarFileUris":[
      "/opt/spark/jars/spark-examples.jar"
    ]
  },
  "runtimeInfo":{
    "outputUri":"gs://dataproc-.../driveroutput"
  },
  "state":"SUCCEEDED",
  "stateTime":"2021-07-22T17:06:30.301789Z",
  "creator":"account-email-address",
  "runtimeConfig":{
    "properties":{
      "spark:spark.executor.instances":"2",
      "spark:spark.driver.cores":"2",
      "spark:spark.executor.cores":"2",
      "spark:spark.app.name":"projects/project-id/locations/region/batches/batch-id"
    }
  },
  "environmentConfig":{
    "peripheralsConfig":{
      "sparkHistoryServerConfig":{
      }
    }
  },
  "operation":"projects/project-id/regions/region/operation-id"
}

Build a custom container image

Dataproc Serverless for Spark custom container images are Docker images. You can use the tools for building Docker images to build custom container images, but there are conditions the images must meet to be compatible with Dataproc Serverless for Spark. The following sections explain these conditions.

Operating system

You can choose any operating system image as your custom container image's base image. Recommendation: The default Debian 11 images are preferred (for example, debian:11-slim), since they have received testing to avoid compatibility issues.

Utilities

You must include the following utility packages, which are required to run Spark, in your custom container image:

  • procps
  • tini

To run XGBoost from Spark (Java or Scala), you must include libgomp1

Container user

Dataproc Serverless for Spark runs containers as the spark Linux user with a 1099 UID and a 1099 GID. USER directives set in custom container image Dockerfiles are ignored at runtime. Use the UID and GID for filesystem permissions. For example, if you add a jar file at /opt/spark/jars/my-lib.jar in the image as a workload dependency, you must give the spark user read permission to the file.

Image streaming

Dataproc Serverless for Spark normally begins a workload requiring a custom container image by downloading the entire image to disk. This can mean a delay in initialization time, especially for customers with large images.

You can instead use image streaming, which is a method to pull image data on an as-needed basis. This lets the workload start up without waiting for the entire image to download, potentially improving initialization time. To enable image streaming, you must enable the Container Filesystem API. Your must also store your container images in Artifact Registry and the Artifact Registry repository must be in the same region as your Dataproc workload or in a multi-region that corresponds with the region where your workload is running. If Dataproc does not support the image or the image streaming service is not available, our streaming implementation downloads the entire image. Note that we do not support the following for image streaming:

In these cases, Dataproc pulls the entire image before starting the workload.

Spark

Do not include Spark in your custom container image. At runtime, Dataproc Serverless for Spark mounts Spark binaries and configs from the host into the container: binaries are mounted to the /usr/lib/spark directory and configs are mounted to the /etc/spark/conf directory. Existing files in these directories are overridden by Dataproc Serverless for Spark at runtime.

Java Runtime Environment

Don't include your own Java Runtime Environment (JRE) in your custom container image. At run time, Dataproc Serverless for Spark mounts OpenJDK from the host into the container. If you include a JRE in your custom container image, it will be ignored.

Java packages

You can include jar files as Spark workload dependencies in your custom container image, and you can set the SPARK_EXTRA_CLASSPATH env variable to include the jars. Dataproc Serverless for Spark will add the env variable value in the classpath of Spark JVM processes. Recommendation: put jars under the /opt/spark/jars directory and set SPARK_EXTRA_CLASSPATH to /opt/spark/jars/*.

You can include the workload jar in your custom container image, then reference it with a local path when submitting the workload, for example file:///opt/spark/jars/my-spark-job.jar (see Submit a Spark batch workload using a custom container image for an example).

Python packages

By default, Dataproc Serverless for Spark mounts Conda from the host to the /opt/dataproc/conda directory in the container at runtime. PYSPARK_PYTHON is set to /opt/dataproc/conda/bin/python. Its base directory, /opt/dataproc/conda/bin, is included in PATH.

You can include your Python environment with packages in a different directory in your custom container image, for example in /opt/conda, and set the PYSPARK_PYTHON environment variable to /opt/conda/bin/python.

Your custom container image can include other Python modules that are not part of the Python environment, for example, Python scripts with utility functions. Set the PYTHONPATH environment variable to include the directories where the modules are located.

R environment

You can customize the R environment in your custom container image using one of the following options: - use Conda to manage and install R packages from conda-forge channel - add an R repository for your container image Linux OS, and install R packages via the Linux OS package manager (see the R Software package index)

When you use either option, you must set the R_HOME environment variable to point to your custom R environment. Exception: If you are using Conda to both manage your R environment and customize your Python environment, you do not need to set the R_HOME environment variable; it is automatically set based on the PYSPARK_PYTHON environment variable.

Example custom container image build

Dockerfile

# Debian 11 is recommended.
FROM debian:11-slim

# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive

# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini libjemalloc2

# (Optiona) Install utilities required by XGBoost for Spark.
RUN apt install -y procps libgomp1

# Enable jemalloc2 as default memory allocator
ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2

# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "${SPARK_EXTRA_JARS_DIR}"

# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
# Packages ipython and ipykernel are required if using custom conda and want to
# use this container for running notebooks.
RUN ${CONDA_HOME}/bin/conda install ipython ipykernel

# (Optional) Install Conda packages.
#
# The following packages are installed in the default image, it is strongly
# recommended to include all of them.
#
# Use mamba to install packages quickly.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
    && ${CONDA_HOME}/bin/mamba install \
      conda \
      cython \
      fastavro \
      fastparquet \
      gcsfs \
      google-cloud-bigquery-storage \
      google-cloud-bigquery[pandas] \
      google-cloud-bigtable \
      google-cloud-container \
      google-cloud-datacatalog \
      google-cloud-dataproc \
      google-cloud-datastore \
      google-cloud-language \
      google-cloud-logging \
      google-cloud-monitoring \
      google-cloud-pubsub \
      google-cloud-redis \
      google-cloud-spanner \
      google-cloud-speech \
      google-cloud-storage \
      google-cloud-texttospeech \
      google-cloud-translate \
      google-cloud-vision \
      koalas \
      matplotlib \
      nltk \
      numba \
      numpy \
      openblas \
      orc \
      pandas \
      pyarrow \
      pysal \
      pytables \
      python \
      regex \
      requests \
      rtree \
      scikit-image \
      scikit-learn \
      scipy \
      seaborn \
      sqlalchemy \
      sympy \
      virtualenv

# (Optional) Add extra Python modules.
ENV PYTHONPATH=/opt/python/packages
RUN mkdir -p "${PYTHONPATH}"
COPY test_util.py "${PYTHONPATH}"

# (Optional) Install R and R libraries.
RUN apt update \
  && apt install -y gnupg \
  && apt-key adv --no-tty \
      --keyserver "hkp://keyserver.ubuntu.com:80" \
      --recv-keys E19F5F87128899B192B1A2C2AD5F960A256A04AF \
  && echo "deb http://cloud.r-project.org/bin/linux/debian bullseye-cran40/" \
      >/etc/apt/sources.list.d/cran-r.list \
  && apt update \
  && apt install -y \
      libopenblas-base \
      libssl-dev \
      r-base \
      r-base-dev \
      r-recommended \
      r-cran-blob

ENV R_HOME=/usr/lib/R

# (Required) Create the 'spark' group/user.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

Build commands

Run in the Dockerfile directory.

IMAGE=gcr.io/my-project/my-image:1.0.1

# Download the BigQuery connector.
gsutil cp \
  gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar .

# Download the Miniconda3 installer.
wget https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh

# Python module example
cat >test_util.py <<EOF
def hello(name):
  print("hello {}".format(name))

def read_lines(path):
  with open(path) as f:
    return f.readlines()
EOF

# Build and push the image.
docker build -t "${IMAGE}" .
docker push "${IMAGE}"