Spark ワークロードの自動チューニング

Spark 構成オプションの数が多く、それらのオプションがワークロードに与える影響を評価するのが難しいため、パフォーマンスと回復性を重視して Spark ワークロードを最適化するのは困難です。Dataproc サーバーレスの自動チューニングは、Spark の最適化のベスト プラクティスとワークロード実行の分析に基づいて、繰り返される Spark ワークロードに Spark 構成設定を自動的に適用することで、手動ワークロード構成に代わる手段を提供します。

Dataproc サーバーレス自動チューニングに申し込む

このページで説明する Dataproc サーバーレス自動チューニングプレビュー リリースへのアクセスに登録するには、BigQuery の Gemini のプレビューサインアップフォームに記入して提出してください。フォームが承認されると、フォームにリストされているプロジェクトはプレビュー機能にアクセスできるようになります。

利点

Dataproc サーバーレスの自動チューニングには、次の利点があります。

  • パフォーマンスの向上: パフォーマンスを向上させる最適化の調整
  • 迅速な最適化: 時間のかかる手動構成テストを回避する自動構成
  • 復元性の向上: メモリ関連の障害を回避する自動メモリ割り当て

制限事項

Dataproc サーバーレスの自動チューニングには次の制限があります。

  • 自動調整は計算され、ワークロードの 2 回目以降の実行に適用されます。Dataproc サーバーレスの自動調整はワークロードの履歴を使用して最適化を行うため、繰り返しワークロードの初回実行は自動スケーリングされません。
  • メモリの縮小はサポートされていません。
  • 自動チューニングは、実行中のワークロードに遡及的には適用されず、新しく送信されたワークロード コホートにのみ適用されます。

コホートの自動チューニング

自動チューニングは、バッチ ワークロードの定期的な実行(コホート)に適用されます。ワークロードを送信するときに指定するコホート名は、繰り返しワークロードの連続実行の 1 つとして識別されます。ワークロードのタイプを説明するコホート名の使用をおすすめします。または、繰り返されるワークロードの一部としてワークロードの実行を識別しやすくすることも推奨されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定します。

自動チューニングのシナリオ

次の自動チューニング シナリオを 1 つ以上選択して、Dataproc サーバーレス自動チューニングをワークロードに適用します。

  • MEMORY: Spark のメモリ割り当てを自動で調整して、潜在的なワークロードのメモリ不足エラーを予測して回避します。メモリ不足(OOM)エラーのために以前に失敗したワークロードを修正します。
  • SCALING: Spark 自動スケーリングの構成設定を自動調整します。
  • BROADCAST_HASH_JOIN: SQL ブロードキャスト結合のパフォーマンスを最適化するために、Spark 構成設定を自動調整します。

料金

Dataproc サーバーレス自動調整は、プレビュー期間中は追加料金なしで提供されます。標準の Dataproc サーバーレスの料金が適用されます。

ご利用いただけるリージョン

Dataproc サーバーレス自動チューニングは、利用可能な Compute Engine リージョンで送信されたバッチで使用できます。

Dataproc サーバーレスの自動チューニングを使用する

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、ワークロードで Dataproc サーバーレス自動調整を有効にできます。

Console

定期的なバッチ ワークロードを送信するたびに Dataproc サーバーレス自動チューニングを有効にするには、次の手順を行います。

  1. Google Cloud コンソールで、Dataproc バッチ ページに移動します。

    Dataproc バッチに移動

  2. バッチ ワークロードを作成するには、[作成] をクリックします。

  3. [コンテナ] セクションで、Spark ワークロードの次のフィールドに入力します。

    • コホート: コホート名。バッチを一連の反復的なワークロードの 1 つとして識別します。自動チューニングは、このコホート名で送信された 2 番目以降のワークロードに適用されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定します。

    • 自動チューニングのシナリオ: 1 つ以上(たとえば、BROADCAST_HASH_JOINMEMORYSCALING )の 自動チューニングのシナリオワークロードの最適化に使用できます。シナリオの選択は、バッチ コホートの送信ごとに変更できます。

  4. 必要に応じて [バッチ作成] ページの他のセクションに入力し、[送信] をクリックします。これらのフィールドの詳細については、バッチ ワークロードを送信するをご覧ください。

gcloud

定期的なバッチ ワークロードが送信されるたびに Dataproc サーバーレスの自動チューニングを有効にするには、次の gcloud CLI の gcloud dataproc batches submit コマンドをターミナル ウィンドウまたは Cloud Shell でローカルに実行します。

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=SCENARIOS \
    other arguments ...

次のように置き換えます。

  • COMMAND: Spark ワークロード タイプ(SparkPySparkSpark-SqlSpark-R など)。
  • REGION: ワークロードが実行される リージョンを指定します。
  • COHORT: コホート名。バッチを一連の反復的なワークロードの 1 つとして識別します。自動チューニングは、このコホート名で送信された 2 番目以降のワークロードに適用されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定します。

  • SCENARIOS: 1 つ以上のカンマ区切りの自動チューニングのシナリオ(たとえば、--autotuning-scenarios=MEMORY,SCALING)。シナリオリストは、バッチ コホートを送信するたびに変更できます。

API

定期的なバッチ ワークロードが送信されるたびに Dataproc サーバーレスの自動チューニングを有効にするには、次のフィールドを含む batches.create リクエストを送信します。

  • RuntimeConfig.cohort: コホート名。一連の繰り返しワークロードの 1 つとしてバッチを識別します。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定します。
  • AutotuningConfig.scenarios: 1 つ以上(たとえば、BROADCAST_HASH_JOINMEMORYSCALING )の 自動チューニングのシナリオワークロードの最適化に使用できます。シナリオリストは、バッチ コホートを送信するたびに変更できます。

例:

...
runtimeConfig:
  cohort: daily_sales_aggregation
  autotuningConfig:
    scenarios:
    - BROADCAST_HASH_JOIN
    - MEMORY
    - SCALING
...

Java

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Java の設定手順を行ってください。 詳細については、Dataproc Java API のリファレンス ドキュメントをご覧ください。

Dataproc サーバーレスへの認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

定期的なバッチ ワークロードを送信するたびに Dataproc サーバーレスの自動チューニングを有効にするには、次のフィールドを含む CreateBatchRequest を指定して BatchControllerClient.createBatch を呼び出します。

  • Batch.RuntimeConfig.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定できます。
  • Batch.RuntimeConfig.AutotuningConfig.scenarios: 1 つ以上 自動チューニングのシナリオワークロードの最適化に使用できますBROADCAST_HASH_JOINMEMORYSCALING 。シナリオリストは、バッチコホート送信ごとに変更できます。シナリオの一覧については、AutotuningConfig.Scenario Javadoc をご覧ください。

例:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.SCALING))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

API を使用するには、google-cloud-dataproc クライアント ライブラリのバージョン 4.43.0 以降を使用する必要があります。プロジェクトにライブラリを追加するには、次のいずれかの構成を使用します。

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Python の設定手順を行ってください。 詳細については、Dataproc Python API のリファレンス ドキュメントをご覧ください。

Dataproc サーバーレスへの認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

定期的なバッチ ワークロードを送信するたびに Dataproc サーバーレスの自動チューニングを有効にするには、次のフィールドを含む BatchBatchControllerClient.create_batch を呼び出します。

  • batch.runtime_config.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定できます。
  • batch.runtime_config.autotuning_config.scenarios: 1 つ以上(たとえば、BROADCAST_HASH_JOINMEMORYSCALING )の 自動チューニングのシナリオワークロードの最適化に使用できます。{101 シナリオ リストは、コホートの送信ごとに変更できます。シナリオの一覧については、シナリオ リファレンスをご覧ください。

例:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.SCALING
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

API を使用するには、google-cloud-dataproc クライアント ライブラリのバージョン 5.10.1 以降を使用する必要があります。これをプロジェクトに追加するには、次の要件を使用できます。

google-cloud-dataproc>=5.10.1

Airflow

定期的なバッチ ワークロードを送信するたびに Dataproc サーバーレスの自動チューニングを有効にするには、次のフィールドを含む BatchBatchControllerClient.create_batch を呼び出します。

  • batch.runtime_config.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の売上集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を指定できます。
  • batch.runtime_config.autotuning_config.scenarios: 1 つ以上(たとえば、BROADCAST_HASH_JOINMEMORYSCALING )の 自動チューニングのシナリオワークロードの最適化に使用できます。{101 シナリオ リストは、コホートの送信ごとに変更できます。シナリオの一覧については、シナリオ リファレンスをご覧ください。

例:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.SCALING,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

API を使用するには、google-cloud-dataproc クライアント ライブラリのバージョン 5.10.1 以降を使用する必要があります。次の Airflow 環境要件を使用できます。

google-cloud-dataproc>=5.10.1

Cloud Composer のパッケージを更新するには、Cloud Composer の Python 依存関係をインストールする をご覧ください。

自動チューニングの変更を表示する

バッチ ワークロードに対する Dataproc サーバーレス自動チューニングの変更を表示するには、gcloud dataproc batches describe コマンドを実行します。

例: gcloud dataproc batches describe 出力は次のようになります。

...
runtimeInfo:
   propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties
      spark.driver.memory:
        annotation: Driver OOM was detected
        value: 11520m
      spark.driver.memoryOverhead:
        annotation: Driver OOM was detected
        value: 4608m
    # Old overwritten properties.
    userProperties
...

実行中のワークロード、完了したワークロード、または失敗したワークロードに適用された最新の自動チューニングの変更を表示できます。バッチ リクエストの詳細ページを開き、調査アクセスできます。

自動チューニング調査パネル。