Dataproc サーバーレス用 Spark でカスタム コンテナを使用する

Dataproc Serverless for Spark は、Docker コンテナ内でワークロードを実行します。コンテナは、ワークロードのドライバとエグゼキュータのプロセス用のランタイム環境を提供します。Dataproc Serverless for Spark のデフォルトでは、ランタイム リリース バージョンに関連付けられたデフォルトの Spark、Java、Python、R パッケージを含むコンテナ イメージを使用します。Dataproc Serverless for Spark バッチ API では、デフォルトのイメージの代わりにカスタム コンテナ イメージを使用できます。通常、カスタム コンテナ イメージは、デフォルトのコンテナ イメージで提供されていない Spark ワークロードの Java または Python の依存関係を追加します。重要: カスタム コンテナ イメージには Spark を使用しないでください。Dataproc Serverless for Spark は、実行時に Spark をコンテナにマウントします。

カスタム コンテナ イメージを使用して Spark バッチ ワークロードを送信する

gcloud

Spark バッチ ワークロードを送信するときに、--container-image フラグを指定して gcloud dataproc batchs submit spark コマンドを使用し、カスタム コンテナ イメージを指定します。

gcloud dataproc batches submit spark \
    --container-image=custom-image, for example, "gcr.io/my-project-id/my-image:1.0.1" \
    --region=region \
    --jars=path to user workload jar located in Cloud Storage or included in the custom container \
    --class=The fully qualified name of a class in the jar file, such as org.apache.spark.examples.SparkPi \
    -- add any workload arguments here

注:

  • カスタム イメージ: Container Registry イメージ命名形式 {hostname}/{project-id}/{image}:{tag} を使用して、カスタム コンテナ イメージを指定します(例: gcr.io/my-project-id/my-image:1.0.1)。注: カスタム コンテナ イメージは Container Registry または Artifact Registry でホストする必要があります (Dataproc Serverless は他のレジストリからコンテナを取得できません)。
  • --jars: カスタム コンテナ イメージに含まれているか、または Cloud Storage に配置されているユーザー ワークロードへのパス(/opt/spark/jars/spark-examples.jar または gs://my-bucket/spark/jars/spark-examples.jar など)を指定します。
  • その他の batch コマンド オプション: 永続履歴サーバー(PHS)を使用する場合などは、他のオプションの batch コマンド フラグを追加できます。注: PHS は、バッチ ワークロードを実行するリージョンに配置する必要があります。
  • ワークロード引数 - ワークロード引数を追加するには、コマンドの末尾に「--」を付加し、その後にワークロード引数を追加します。

REST

カスタム コンテナ イメージは、batches.create API リクエストの一部として RuntimeConfig.containerImage フィールドを介して提供されます。

次の例は、Dataproc Serverless for Spark batches.create API を使用してバッチ ワークロードを送信するために、カスタム コンテナを使用する方法を示しています。

リクエストのデータを使用する前に、次のように置き換えます。

  • project-id: Google Cloud プロジェクト ID
  • region: リージョン
  • custom-container-image: Container Registry イメージ命名規則 {hostname}/{project-id}/{image}:{tag} を使用して、カスタム コンテナ イメージを指定します(例: gcr.io/my-project-id/my-image:1.0.1)。注: カスタム コンテナは Container Registry または Artifact Registry でホストする必要があります。 (Dataproc Serverless は他のレジストリからコンテナを取得できません)。
  • jar-uri: カスタム コンテナ イメージに含まれている、または Cloud Storage にあるワークロード jar のパスを指定します(例: /opt/spark/jars/spark-examples.jar、gs:///spark/jars/spark-examples.jar)。
  • class: jar ファイル内のクラスの完全修飾名(例: org.apache.spark.examples.SparkPi)。
  • その他のオプション: 他のバッチ ワークロード リソース フィールドを使用できます。たとえば、sparkBatch.args フィールドを使用して、ワークロードに引数を渡すことができます(詳細については、Batch リソースのドキュメントをご覧ください)。永続履歴サーバー(PHS)を使用するには、永続的な履歴サーバーの設定をご覧ください。 注: PHS は、バッチ ワークロードを実行するリージョンに配置する必要があります。

HTTP メソッドと URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/locations/region/batches

リクエストの本文(JSON):

{
  "runtimeConfig":{
    "containerImage":"custom-container-image
  },
  "sparkBatch":{
    "jarFileUris":[
      "jar-uri"
    ],
    "mainClass":"class"
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
"name":"projects/project-id/locations/region/batches/batch-id",
  "uuid":",uuid",
  "createTime":"2021-07-22T17:03:46.393957Z",
  "runtimeConfig":{
    "containerImage":"gcr.io/my-project/my-image:1.0.1"
  },
  "sparkBatch":{
    "mainClass":"org.apache.spark.examples.SparkPi",
    "jarFileUris":[
      "/opt/spark/jars/spark-examples.jar"
    ]
  },
  "runtimeInfo":{
    "outputUri":"gs://dataproc-.../driveroutput"
  },
  "state":"SUCCEEDED",
  "stateTime":"2021-07-22T17:06:30.301789Z",
  "creator":"account-email-address",
  "runtimeConfig":{
    "properties":{
      "spark:spark.executor.instances":"2",
      "spark:spark.driver.cores":"2",
      "spark:spark.executor.cores":"2",
      "spark:spark.app.name":"projects/project-id/locations/region/batches/batch-id"
    }
  },
  "environmentConfig":{
    "peripheralsConfig":{
      "sparkHistoryServerConfig":{
      }
    }
  },
  "operation":"projects/project-id/regions/region/operation-id"
}

カスタム コンテナ イメージを構築する

Dataproc Serverless for Spark のカスタム コンテナ イメージは Docker イメージです。Docker イメージをビルドするツールを使用してカスタム コンテナ イメージを作成できますが、イメージが Dataproc Serverless for Spark との互換性を満たさなければならないという条件があります。以下のセクションでは、これらの条件について説明します。

OS

カスタム コンテナ イメージのベース イメージとして、任意のオペレーティング システムのイメージを選択できます。推奨: 互換性の問題を回避するテストが実施されているため、デフォルトの Debian 11 イメージ(debian:11-slim など)をおすすめします。

公共事業

Spark の実行に必要な次のユーティリティ パッケージを、カスタム コンテナ イメージに含める必要があります。

  • procps
  • tini

Spark(Java または Scala)から XGBoost を実行するには、libgomp1 を含める必要があります。

コンテナ ユーザー

Dataproc Serverless for Spark は、1099 UID と 1099 GID を使用して spark Linux ユーザーとしてコンテナを実行します。カスタム コンテナ イメージの Dockerfile で設定された USER ディレクティブは、実行時に無視されます。ファイル システムの権限には、UID と GID を使用します。たとえば、イメージ内の /opt/spark/jars/my-lib.jar に jar ファイルを追加する場合は、spark ユーザーにファイルの読み取り権限を付与する必要があります。

イメージ ストリーミング

Dataproc Serverless for Spark は、通常、イメージ全体をディスクにダウンロードして、カスタム コンテナ イメージを必要とするワークロードを開始します。これにより、特にイメージのサイズが大きい場合、初期化に遅延が生じることがあります。

代わりに、必要に応じて画像データを pull する方法である画像ストリーミングを使用できます。これにより、イメージ全体がダウンロードされるのを待たずにワークロードを起動できるため、初期化時間が改善される可能性があります。イメージ ストリーミングを有効にするには、Container Filesystem API を有効にする必要があります。また、コンテナ イメージを Artifact Registry に保存する必要があります。Artifact Registry リポジトリは、Dataproc ワークロードと同じリージョン、またはワークロードが実行されているリージョンに対応するマルチリージョンに存在する必要があります。Dataproc がイメージをサポートしていない場合、またはイメージ ストリーミング サービスが利用できない場合は、ストリーミング実装によってイメージ全体がダウンロードされます。なお、イメージ ストリーミングに対して以下はサポートされていません。

この場合、Dataproc はワークロードを開始する前にイメージ全体を pull します。

Spark

カスタム コンテナ イメージに Spark を含めないでください。Dataproc Serverless for Spark は、実行時にホストから Spark バイナリと構成ファイルをコンテナにマウントします。バイナリは /usr/lib/spark ディレクトリにマウントされ、構成ファイルは /etc/spark/conf ディレクトリにマウントされます。これらのディレクトリ内の既存のファイルは、実行時に Dataproc Serverless for Spark によってオーバーライドされます。

Java ランタイム環境

カスタム コンテナ イメージに独自の Java Runtime Environment(JRE)を含めないでください。Dataproc Serverless for Spark は、実行時にホストから OpenJDK をコンテナにマウントします。カスタム コンテナ イメージに JRE を含めても無視されます。

Java パッケージ

カスタム コンテナ イメージに Spark ワークロードの依存関係として jar ファイルを含めることができ、jar を含めるように SPARK_EXTRA_CLASSPATH 環境変数を設定できます。Dataproc Serverless for Spark は、Spark JVM プロセスのクラスパスに環境変数値を追加します。おすすめの方法: jar を /opt/spark/jars ディレクトリに配置し、SPARK_EXTRA_CLASSPATH/opt/spark/jars/* に設定します。

カスタム コンテナ イメージにワークロード jar を含めて、ワークロードの送信時にローカル パスでそのワークロード jar を参照できます(例: file:///opt/spark/jars/my-spark-job.jar。例については、カスタム コンテナ イメージを使用して Spark バッチ ワークロードを送信するをご覧ください)。

Python パッケージ

デフォルトでは、Dataproc Serverless for Spark は、実行時にホストからコンテナ内の /opt/dataproc/conda ディレクトリに Conda をマウントします。PYSPARK_PYTHON/opt/dataproc/conda/bin/python に設定されています。そのベース ディレクトリ /opt/dataproc/conda/binPATH に含まれています。

Python 環境をカスタム コンテナ イメージ内の別のディレクトリ(/opt/conda など)のパッケージに含めて、PYSPARK_PYTHON 環境変数を /opt/conda/bin/python に設定できます。

カスタム コンテナ イメージには、Python 環境に含まれない他の Python モジュール(ユーティリティ関数を含む Python スクリプトなど)を含めることができます。モジュールが配置されているディレクトリを含めるように PYTHONPATH 環境変数を設定します。

R 環境

次のいずれかのオプションを使用して、カスタム コンテナ イメージで R 環境をカスタマイズできます。 - Conda を使用して conda-forge チャネルから R パッケージを管理およびインストールする - コンテナ イメージの Linux OS に R リポジトリを追加し、Linux OS パッケージ マネージャーを使用して R パッケージをインストールします(R ソフトウェア パッケージ インデックスをご覧ください)。

どちらのオプションを使用する場合でも、カスタム R 環境を指すように R_HOME 環境変数を設定する必要があります。例外: R 環境の管理と Python 環境のカスタマイズの両方にConda を使用する場合、R_HOME 環境変数は PYSPARK_PYTHON 環境変数に基づいて自動的に設定されるため、設定する必要はありません。

カスタム コンテナ イメージのビルド例

Dockerfile

# Debian 11 is recommended.
FROM debian:11-slim

# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive

# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini libjemalloc2

# (Optiona) Install utilities required by XGBoost for Spark.
RUN apt install -y procps libgomp1

# Enable jemalloc2 as default memory allocator
ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2

# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "${SPARK_EXTRA_JARS_DIR}"

# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
# Packages ipython and ipykernel are required if using custom conda and want to
# use this container for running notebooks.
RUN ${CONDA_HOME}/bin/conda install ipython ipykernel

# (Optional) Install Conda packages.
#
# The following packages are installed in the default image, it is strongly
# recommended to include all of them.
#
# Use mamba to install packages quickly.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
    && ${CONDA_HOME}/bin/mamba install \
      conda \
      cython \
      fastavro \
      fastparquet \
      gcsfs \
      google-cloud-bigquery-storage \
      google-cloud-bigquery[pandas] \
      google-cloud-bigtable \
      google-cloud-container \
      google-cloud-datacatalog \
      google-cloud-dataproc \
      google-cloud-datastore \
      google-cloud-language \
      google-cloud-logging \
      google-cloud-monitoring \
      google-cloud-pubsub \
      google-cloud-redis \
      google-cloud-spanner \
      google-cloud-speech \
      google-cloud-storage \
      google-cloud-texttospeech \
      google-cloud-translate \
      google-cloud-vision \
      koalas \
      matplotlib \
      nltk \
      numba \
      numpy \
      openblas \
      orc \
      pandas \
      pyarrow \
      pysal \
      pytables \
      python \
      regex \
      requests \
      rtree \
      scikit-image \
      scikit-learn \
      scipy \
      seaborn \
      sqlalchemy \
      sympy \
      virtualenv

# (Optional) Add extra Python modules.
ENV PYTHONPATH=/opt/python/packages
RUN mkdir -p "${PYTHONPATH}"
COPY test_util.py "${PYTHONPATH}"

# (Optional) Install R and R libraries.
RUN apt update \
  && apt install -y gnupg \
  && apt-key adv --no-tty \
      --keyserver "hkp://keyserver.ubuntu.com:80" \
      --recv-keys E19F5F87128899B192B1A2C2AD5F960A256A04AF \
  && echo "deb http://cloud.r-project.org/bin/linux/debian bullseye-cran40/" \
      >/etc/apt/sources.list.d/cran-r.list \
  && apt update \
  && apt install -y \
      libopenblas-base \
      libssl-dev \
      r-base \
      r-base-dev \
      r-recommended \
      r-cran-blob

ENV R_HOME=/usr/lib/R

# (Required) Create the 'spark' group/user.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

ビルドコマンド

Dockerfile ディレクトリで実行します。

IMAGE=gcr.io/my-project/my-image:1.0.1

# Download the BigQuery connector.
gsutil cp \
  gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar .

# Download the Miniconda3 installer.
wget https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh

# Python module example
cat >test_util.py <<EOF
def hello(name):
  print("hello {}".format(name))

def read_lines(path):
  with open(path) as f:
    return f.readlines()
EOF

# Build and push the image.
docker build -t "${IMAGE}" .
docker push "${IMAGE}"