Apache Spark ジョブの Dataproc への移行

このドキュメントでは、Apache Spark ジョブを Dataproc に移行する方法について説明します。このドキュメントは、ビッグデータ エンジニアやビッグデータ アーキテクトを対象としています。移行、準備、ジョブの移行、管理に関する考慮事項などについて説明しています。

概要

オンプレミス環境から Google Cloud に Apache Spark ワークロードを移動する場合は、Dataproc を使用して Apache Spark / Apache Hadoop クラスタを実行することをおすすめします。Dataproc は、Google Cloud が提供する、完全にサポートされているフルマネージド サービスです。ストレージとコンピューティングを分離できるため、費用を管理し、ワークロードのスケーリングを柔軟に行うことができます。

マネージド Hadoop 環境がニーズに合わない場合は、Google Kubernetes Engine(GKE)で Spark を実行するか、Compute Engine で仮想マシンをレンタルし、Hadoop または Spark クラスタをお客様ご自身で設定するなど、別の設定を使用することもできます。ただし、Dataproc を使用する以外のオプションをご選択の場合、管理はご自身で行う必要があり、サポートもコミュニティによるもののみとなることをご理解ください。

移行の計画

Spark ジョブをオンプレミスで実行する場合と、Compute Engine 上の Dataproc または Hadoop クラスタで実行する場合の間には、多くの違いがあります。ワークロードを注意深く観察し移行の準備をすることが重要です。このセクションでは、考慮すべき事項と、Spark のジョブを移行する前の準備について概説します。

ジョブの種類を特定しクラスタを計画する

このセクションで説明するように、Spark ワークロードには 3 つのタイプがあります。

定期的にスケジュールされたバッチジョブ

定期スケジュール バッチジョブには、毎日または毎時の ETL などのユースケース、または Spark ML を使用した機械学習モデルのトレーニング用パイプラインが含まれます。このような場合、バッチ ワークロードごとにクラスタを作成し、ジョブの終了後にクラスタを削除することをおすすめします。この場合、各ワークロードの構成を個別に調整できるため、クラスタを柔軟に構成できます。Dataproc クラスタは、最初の 1 分が経過した後は 1 秒単位で課金されます。したがって、クラスタにラベル付けができるため費用効率に優れています。詳しくは、Dataproc の料金のページをご覧ください。

バッチジョブの実装は、ワークフロー テンプレートを使用するか、次の手順を実行して行うことができます。

  1. クラスタの作成を行い、そのクラスタが作成されるまで待ちます(API 呼び出しまたは gcloud コマンドを使用して、クラスタが作成されたかどうかをモニタリングできます)。専用の Dataproc クラスタでジョブを実行する場合、動的割り当て外部シャッフル サービスをオフにすることが望ましい場合があります。次のように gcloud コマンドを実行すると、Dataproc クラスタを作成したときに提供される Spark 構成プロパティが表示されます。

    dataproc clusters create ... \
        --properties 'spark:spark.dynamicAllocation.enabled=false,spark:spark.shuffle.service.enabled=false,spark.executor.instances=10000'
  2. クラスタにジョブを送信します(API 呼び出しまたは gcloud コマンドを使用して、ジョブのステータスをモニタリングできます)。次に例を示します。

    jobId=$(gcloud --quiet dataproc jobs submit pyspark \
        --async \
        --format='value(reference.jobId)' \
        --cluster $clusterName \
        --region global \
        gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py)
    
    gcloud dataproc jobs describe $jobId \
        --region=global \
        --format='value(status.state)'
  3. API 呼び出しまたは gcloud コマンドを使用して、ジョブの実行後にクラスタを削除します。

ストリーミング ジョブ

ストリーミング ジョブの場合、長時間走行する Dataproc クラスタを作成し、高可用性モードで走行するようにクラスタを構成する必要があります。この場合、プリエンプティブル VM を使用することはおすすめしません。

ユーザーから送信されたアドホック型またはインタラクティブ型のワークロード

アドホック型ワークロードの例としては、日中にクエリを作成したり分析ジョブを実行したりするユーザーなどです。

そのような場合は、高可用性モードでクラスタを実行する必要があるかどうか、プリエンプティブル VM を使用するかどうか、クラスタへのアクセスをどのように管理するかを決定する必要があります。クラスタの作成と終了はスケジューリング可能です(夜間や週末のクラスタを必要としないときなど)。また、スケジュールに従ってアップ スケーリングやダウン スケーリングを実装できます。

データソースと依存関係を特定する

各ジョブには独自の依存関係(必要なデータソースなど)があり、社内の他のチームがジョブの結果に依存している可能性があります。したがって、すべての依存関係を特定し、次の手順を含む移行計画を作成する必要があります。

  • すべてのデータソースの Google Cloud への段階的な移行。最初に、Google Cloud のデータソースをミラーリングしておくとデータを 2 か所に保存できるので役に立ちます。

  • Spark ワークロードの Google Cloud へのジョブごとの移行。これはワークロードに対応するデータソースの移行後直ちに実行します。データの状態によっては、ある時点に、従来の環境と Google Cloud の環境の両方が並行して 2 つのワークロードとして実行される場合があります。

  • Spark ワークロードからの出力に依存するその他のワークロードの移行。あるいは、出力を最初の環境に複製して戻すだけの場合もあります。

  • 従来の環境における Spark ジョブのシャットダウン。ジョブに依存しているすべてのチームがそれを必要としなくなったことを確認した後に実行します。

ストレージ オプションを選択する

Dataproc クラスタには 2 つのオプションがあります。Cloud Storage にすべてのデータを格納することと、クラスタ ワーカーとローカル ディスクや永続ディスクを一緒に使用することです。ジョブの性質によって選択すべき方法は異なります。

Cloud Storage と HDFS の比較

Dataproc クラスタの各ノードには、Cloud Storage コネクタがインストールされています。コネクタはデフォルトで /usr/lib/hadoop/lib にインストールされます。コネクタによって Hadoop FileSystem インターフェースが実装され、Cloud Storage の HDFS との互換性が実現されます。

Cloud Storage はバイナリラージ オブジェクト(BLOB)ストレージ システムであるため、コネクタによりオブジェクト名に従ってディレクトリがエミュレートされます。データにアクセスするには、hdfs:// 接頭辞の代わりに gs:// 接頭辞を使用します。

通常、Cloud Storage コネクタにはカスタマイズは必要ありません。ただし、変更が必要な場合は、指示に従ってコネクタを構成できます。構成キーもすべて利用可能です。

Cloud Storage の使用は、次の場合に適しています。

  • データが ORC、Parquet、Avro、その他あらゆる形式で表され、さまざまなクラスタやジョブで使用されている。したがって、クラスタの実行が終了してもデータを維持する必要がある。
  • 高スループットが必要であり、128 MB を超えるサイズのファイルにデータが保存されている。
  • データにクロスゾーンでの耐久性が要求される。
  • データの高可用性が必要になる場合がある。たとえば、HDFS NameNode が単一障害点となることを避ける必要がある。

次の条件においては、ローカル HDFS ストレージが良い選択肢となります。

  • 数多くのメタデータ操作が必要である場合。たとえば、何千ものパーティションやディレクトリがあり、ファイルサイズが比較的小さいなど。
  • HDFS データを頻繁に変更する場合または、ディレクトリの名前を変更する場合。Cloud Storage オブジェクトは不変オブジェクトであるため、ディレクトリ名の変更はすべてのオブジェクトをコピーしてから削除することになるので重い操作になります。
  • HDFS ファイルで、アペンド操作(追加、連結)が頻繁に実行される場合。
  • 大容量の I/O を必要とするワークロードがある場合。たとえば、次のような多くの分割された書き込みがあるなど。

    spark.read().write.partitionBy(...).parquet("gs://")
  • 特にレイテンシを重視する I/O ワークロードが存在する場合。たとえば、ストレージ操作あたり 1 桁ミリ秒単位のレイテンシが要求されるなど。

一般に、大容量のデータ パイプラインでは、Cloud Storage をデータの最初と最後のソースとして使用することをおすすめします。たとえば、ワークフローに 5 つの Spark ジョブが連続して含まれている場合、最初のジョブは Cloud Storage から初期データを取得し、シャッフル データと中間ジョブ出力を HDFS に書き込みます。最後の Spark ジョブは、その結果を Cloud Storage に書き込みます。

ストレージのサイズを調整する

Dataproc を Cloud Storage とともに使用すると、HDFS の代わりにデータを格納することで、ディスク要件を削減し、費用を削減できます。データを Cloud Storage に保存し、ローカルの HDFS に保存しない場合、クラスタにはより小さいディスクを使用できます。クラスタの構成をオンデマンド化すれば、前述のようにストレージとコンピューティングを分離できるので、費用を大幅に削減できます。

すべてのデータを Cloud Storage に保存する場合でも、Dataproc クラスタでは、コントロール ファイルやリカバリ ファイルの保存やログの集計などの特定の操作に HDFS が必要になります。シャッフルの際には、HDFS でないローカルのディスク容量も必要になります。ローカル HDFS を頻繁に使用しない場合は、ワーカーごとのディスクサイズを小さくできます。

ローカル HDFS のサイズを調整するいくつかのオプションがあります。

  • マスターとワーカーの永続プライマリ ディスクのサイズを小さくすることにより、ローカル HDFS の合計サイズを小さくします。永続プライマリ ディスクにはブート ボリュームとシステム ライブラリも含まれているので、少なくとも 100 GB を割り当ててください。
  • ワーカーの永続プライマリ ディスクのサイズを大きくして、ローカル HDFS の合計サイズを増やします。このオプションについては慎重に検討してください。Cloud Storage または SSD を使用するローカル HDFS を使用する場合と比較して、標準永続ディスクで HDFS を使用することでパフォーマンスが向上することはまれです。
  • 最大 8 つの SSD(それぞれの容量は 375 GB)を各ワーカーに割り当てて、これらのディスクを HDFS 用に使用します。この方法は、ミリ秒単位のレイテンシが要求される I/O 集約型のワークロードに HDFS を使用する場合に適しています。ディスクの要件を実現するのに十分な CPU とメモリを持つマシンタイプがワーカーに割り当てられていることを確認してください。
  • プライマリ ディスクとして永続ディスク SSD(PD-SSD)をマスターまたはワーカーに使用します。

Dataproc にアクセスする

Compute Engine における Dataproc や Hadoop へのアクセスは、オンプレミス クラスタへのアクセスとは異なります。セキュリティ設定とネットワーク アクセス オプションを決定する必要があります。

ネットワーキング

Dataproc クラスタのすべての VM インスタンスについては相互に接続された内部ネットワークが必要であり、UDP、TCP、ICMP ポートを開いておく必要があります。デフォルトのネットワーク構成を使用するか、VPC ネットワークを使用することで、外部 IP アドレスから Dataproc クラスタへのアクセスを許可できます。どのネットワーク オプションでも、Dataproc クラスタにはすべての Google Cloud サービス(Cloud Storage バケット、API など)へのネットワーク アクセス権が付与されます。オンプレミス リソースとの双方向のネットワーク アクセスを許可するには、VPC ネットワーク構成を選択し適切なファイアウォール ルールを設定します。詳細については、Dataproc クラスタのネットワーク構成ガイドと、この後の YARN にアクセスするをご覧ください。

Identity and Access Management

Dataproc クラスタを利用するには、ネットワーク アクセス権に加えて、リソースへのアクセス権が必要です。たとえば、Cloud Storage バケットにデータを書き込むには、Dataproc クラスタがバケットへの書き込みアクセス権を持つ必要があります。ロールを使用してアクセス権を確立します。Spark のコードをスキャンして、コードが必要とする Dataproc 以外のすべてのリソースを検出し、クラスタのサービス アカウントに対する正しいロールを付与します。また、クラスタ、ジョブ、操作、およびワークフロー テンプレートを作成するユーザーに、適切な権限が与えられていることを確認してください。

詳細とベスト プラクティスについては、IAM のドキュメントをご覧ください。

Spark と他のライブラリの依存関係を確認する

Spark のバージョンと他のライブラリのバージョンを公式の Dataproc バージョン リストと比較し、利用が開始されていないライブラリを確認します。Dataproc で公式にサポートしている Spark のバージョンの使用をおすすめします。

ライブラリを追加する必要がある場合は、次の操作を行います。

  • Dataproc クラスタのカスタム イメージを作成します。
  • クラスタ用のクラウド ストレージに初期化スクリプトを作成します。初期化スクリプトを使用して、追加の依存関係をインストールしたり、バイナリをコピーしたりできます。
  • Java または Scala コードを再コンパイルし、Gradle、Maven、Sbt、または別のツールを使用して、ベース ディストリビューションの一部ではない追加の依存関係をすべて Fat Jar としてパッケージ化します。

Dataproc のクラスタサイズを調整する

オンプレミス、クラウドいずれのクラスタ構成でも、クラスタのサイズは Spark のジョブのパフォーマンスにとって重要です。十分なリソースを持たない Spark ジョブは遅くなるか、失敗します。特に実行メモリが不足している場合は失敗します。Hadoop クラスタのサイズを変更する際に考慮する必要がある事項については、Hadoop 移行ガイドのクラスタのサイズ変更に関するセクションをご覧ください。

以下のセクションでは、クラスタのサイズを設定する方法のいくつかについて説明します。

現在の Spark ジョブの構成を取得する

現在の Spark ジョブ構成を見て、Dataproc クラスタのサイズが十分に大きいことを確認してください。共有クラスタから複数の Dataproc クラスタ(各バッチ ワークロードごとに 1 つ)に移行する場合は、各アプリケーションの YARN 構成を調べて、必要なエクゼキュータ数、エクゼキュータあたりの CPU 数、エクゼキュータ メモリの総計を割り出します。オンプレミス クラスタに YARN キューが設定されている場合は、各キューのリソースを共有するジョブを確認し、ボトルネックを特定します。この移行は、オンプレミス クラスタで発生している可能性があるリソースの制限を取り除く良い機会となります。

マシンタイプとディスク オプションの選択

ワークロードのニーズに合わせて VM の種類と数を選択します。ローカル HDFS をストレージとして使用する場合は、VM のディスクタイプとサイズが正しいことを確認してください。その際、ドライバ プログラムのためのリソースも計算に含めてください。

各 VM には vCPU あたり 2 Gbps のネットワークの下り(外向き)上限があります。永続ディスクや永続 SSD への書き込みはこの上限にカウントされるため、vCPU の台数が非常に少ない VM では、ディスク書き込みの際にスループットが制限される可能性があります。この現象は、シャッフル フェーズで発生する可能性があります。Spark によってシャッフル データがディスクに書き込まれたり、ネットワークを介してエグゼキュータ間でシャッフル データが移動したりする際に発生します。最大書き込み速度を達成するには、永続ディスクの場合 2 台、永続 SSD の場合 4 台の vCPU が少なくとも必要です。ただし、これらの値には、VM 間の通信などのトラフィックが含まれていないことに注意してください。また、ディスクのサイズもピーク パフォーマンスに影響します。

構成の選択は、Dataproc クラスタの費用にも影響します。Dataproc の料金は、各 VM およびその他の Google Cloud リソースの Compute Engine インスタンスあたりの料金に加算されます。Google Cloud の料金計算ツールを使用して費用を見積もる方法については、Dataproc の料金ページをご覧ください。

パフォーマンスのベンチマークと最適化

ジョブ移行フェーズが完了したら、オンプレミス クラスタ上で Spark ワークロードの実行を停止する前に、Spark ジョブのベンチマークを行って最適化を検討してください。構成が最適でない場合には、クラスタのサイズを変更して改善できます。

Dataproc Serverless for Spark の自動スケーリング

Dataproc Serverless を使用すると、独自のクラスタのプロビジョニングと管理を行うことなく、Spark ワークロードを実行できます。ワークロード パラメータを指定してから、ワークロードを Dataproc Serverless サービスに送信します。このサービスは、マネージド コンピューティング インフラストラクチャでワークロードを実行し、必要に応じてリソースを自動スケーリングします。Dataproc Serverless の料金は、ワークロードが実行されている時間に対してのみ適用されます。

移行を実行する

このセクションでは、データの移行、ジョブコードの変更、およびジョブの実行方法の変更について説明します。

データの移行

Dataproc クラスタで Spark ジョブを実行する前に、データを Google Cloud に移行する必要があります。詳細については、データ移行ガイドを参照してください。

Spark コードの移行

Dataproc への移行を計画し、必要なデータソースを移動したら、ジョブコードを移行できます。ローカル HDFS の代わりにクラウド ストレージにデータを保存する場合、2 つのクラスタ間で Spark のバージョンに違いがなければ、実施するのはすべての HDFS ファイルパスのプレフィックスを hdfs:// から gs:// に変更することだけです。

Spark のバージョンが異なる場合は、Spark のリリースノートで 2 つのバージョンを比較参照して、Spark コードを適合させてください。

Spark アプリケーションの jar ファイルは、Dataproc クラスタや HDFS フォルダに関連付けられている Cloud Storage バケットにコピーできます。次のセクションでは、Spark ジョブを実行する際に使用できるオプションについて説明します。

ワークフロー テンプレートを使用する場合は、追加する予定の各 Spark ジョブを個別にテストすることをおすすめします。次に、テンプレートの最終テストを実行して、テンプレートのワークフローが正しいことを確認します(上流のジョブがすべて揃っており正しい場所に出力が格納されているなど)。

ジョブの実行

Spark ジョブは次の方法で実行できます。

  • 次の gcloud コマンドを使用します。

    gcloud dataproc jobs submit [COMMAND]

    ここで

    [COMMAND]sparkpyspark、または spark-sql です。

    --properties オプションにより Spark のプロパティを設定できます。詳しくは、このコマンドのドキュメントをご覧ください。

  • ジョブの Dataproc への移行以前に使用したのと同じプロセスを使用。Dataproc クラスタはオンプレミスでアクセス可能でなければならず、同じ構成を使用する必要があります。

  • Cloud Composer を使用。環境(管理された Apache Airflow サーバー)を作成し、複数の Spark ジョブを DAG ワークフローとして定義した後、ワークフロー全体を実行できます。

詳しくは、ジョブを送信するを参照してください。

移行後のジョブを管理する

Spark ジョブの Google Cloud への移行後は、Google Cloud が提供するツールとメカニズムを使用してジョブの管理を確立することが重要です。このセクションでは、ロギング、モニタリング、クラスタへのアクセス、クラスタのスケーリング、ジョブの最適化について説明します。

ロギングとパフォーマンスのモニタリングを使用する

Google Cloud では、Cloud Logging でログの表示やカスタマイズを行い、Cloud Monitoring でジョブとリソースをモニタリングできます。

Spark ジョブのエラーの原因を見つける最良の方法は、ドライバの出力と Spark エグゼキュータによって生成されたログを調べることです。

ドライバ プログラムの出力は、Google Cloud Console を使用するか、gcloud コマンドを使用して取得してくささい。この出力は Dataproc クラスタの Cloud Storage バケットにも格納されます。詳細については、Dataproc ドキュメントのジョブドライバ出力に関するセクションをご覧ください。

その他のログは、クラスタのマシン上のさまざまな別々のファイルに保存されます。コンテナごとのログは、Spark アプリのウェブ UI(またはプログラムが終了した後の履歴サーバー)からエグゼキュータのタブに表示されます。各ログを表示するには、各 Spark コンテナを参照する必要があります。アプリケーション コード上でログに書き込んだり stdoutstderr に出力したりすると、ログが stdoutstderr のリダイレクト先に保存されます。

Dataproc クラスタでは、YARN によりこれらのログがすべて収集されるようにデフォルトで構成されて、Cloud Logging で利用できます。Cloud Logging においては、すべてのログが統合され簡潔に表示されるので、時間をかけてコンテナログをブラウズしエラーを見つける必要がありません。

次の図は、Google Cloud コンソールの [Cloud Logging] ページを示しています。セレクタ メニューでクラスタの名前を選択することで、Dataproc クラスタのすべてのログを表示できます。タイムレンジ セレクタで期間を拡大すれば見やすくなります。

Google Cloud コンソールの [Cloud Logging] ページ

また、ID でフィルタリングすれば、Spark アプリケーションからログを取得できます。このアプリケーションの ID は、ドライバ出力から取得できます。

ラベルの作成と使用

迅速にログを検索するには、クラスタまたは Dataproc ジョブごとに独自に作成したラベルを使用します。たとえば、キー env と値 exploration のラベルを作成し、それをデータ探索ジョブに使用できます。次に、Cloud Logging で label:env:exploration をフィルタリングすることによって、すべてのデータ探索ジョブの作成ログを取得できます。ただし、このフィルタで返されるのは、このジョブのすべてのログではなく、リソース作成ログだけです。

ログレベルの設定

次の gcloud コマンドを使用して、ドライバのログレベルを設定できます。

gcloud dataproc jobs submit hadoop --driver-log-levels

Spark のコンテキストから、アプリケーションのその他のログレベルを設定します。例:

spark.sparkContext.setLogLevel("DEBUG")

ジョブのモニタリング

Cloud Monitoring では、クラスタの CPU、ディスク、ネットワーク使用量、YARN リソースをモニタリングできます。カスタム ダッシュボードを作成すれば、これらの指標と他の指標の最新のグラフを取得できます。Dataproc は Compute Engine 上で実行されます。グラフ上で、CPU 使用率、ディスク I/O の数値、ネットワークの指標を可視化するには、Compute Engine VM インスタンスをリソースタイプとして選択し、クラスタ名でフィルタリングします。次の図は、出力の例を示しています。

Google Cloud コンソールの [Monitoring] ページ

Spark のクエリ、ジョブ、ステージ、タスクについての指標を表示するには、Spark アプリケーションのウェブ UI に接続します。次のセクションでは、この方法について説明します。カスタム指標の作成方法については、エージェントのカスタム指標をご覧ください。

YARN にアクセスする

SSH トンネルを設定すれば、Dataproc クラスタの外から YARN リソース マネージャーのウェブ インターフェースにアクセスできます。ウェブ インターフェースを介したブラウジングが簡単に行えるため、ローカルポート転送の代わりに軽量の SOCKS プロキシの使用をおすすめします。

YARN のアクセスには次の URL を使用します。

  • YARN Resource Manager: http://[MASTER_HOST_NAME]:8088

  • Spark History Server: http://[MASTER_HOST_NAME]:18080

Dataproc クラスタが内部 IP アドレスのみを使用している場合は、VPN 接続または踏み台インスタンス経由で接続できます。詳しくは、内部専用 VM の接続オプションを選択するをご覧ください。

Dataproc クラスタのスケーリングとサイズ変更

Dataproc クラスタをスケーリングするには、プライマリまたはセカンダリ(プリエンプティブルな)のワーカーの台数を増減します。また、Dataproc では正常なデコミッションを利用できます。

Spark のダウン スケーリングはいくつかの要因の影響を受けます。次の点を考慮してください。

  • ExternalShuffleService の使用はおすすめしません。特にクラスタを定期的にダウンスケーリングする場合は、使用を避けてください。シャッフリングの際には、計算フェーズが実行された後にワーカーのローカル ディスクに書き込まれた結果が使用されるため、コンピューティング リソースが消費されなくてもノードを削除できません。

  • メモリ内のデータ(RDD とデータセットの両方)は Spark によってキャッシュされるため、いったんキャッシュに使用されたエグゼキュータが終了しなくなります。したがって、キャッシュに使用されたワーカーは、正常にデコミッションされなくなります。ワーカーを強制的に削除した場合、キャッシュされたデータが失われるため、全体的なパフォーマンスに影響します。

  • Spark Streaming において、デフォルトでは動的割り当てが無効にされています。また、この設定を行う構成キーはドキュメントになっていません(Spark についてのスレッドで、動的割り当ての動作についてのディスカッションを参照できます)。Spark Streaming や Spark Structured Streaming を使用している場合は、前述のジョブの種類を特定しクラスタを計画するで説明されているように、動的割り当てを明示的に無効にする必要があります。

一般に、バッチ型またはストリーミング型のワークロードを実行している場合は、Dataproc クラスタのダウン スケーリングは回避することをおすすめします。

パフォーマンスの最適化

このセクションでは、Spark ジョブの実行中にパフォーマンスを向上させ、費用を削減する方法について説明します。

クラウド ストレージのファイルサイズの管理

最適なパフォーマンスを得るには、Cloud Storage のデータを 128 MB から 1 GB のサイズのファイルに分割してください。たくさんの小サイズのファイルを使用すると、ボトルネックが発生する可能性があります。小サイズのファイルがたくさん存在している場合は、処理するファイルをローカルの HDFS にコピーし、処理結果を再びコピーして戻すことを検討してください。

SSD ディスクへの切り替え

多数のシャッフル処理または分割書き込みの処理を実行する場合は、パフォーマンスを向上させるために SSD に切り替えてください。

VM の同じゾーンへの配置

ネットワークの費用を削減し、パフォーマンスを向上させるには、Dataproc クラスタで使用する Cloud Storage バケットと同じリージョンのロケーションを使用します。

デフォルトでは、グローバルまたはリージョンの Dataproc のエンドポイントを使用すると、クラスタの作成時に、クラスタの VM が同じゾーン(または同じ領域内に十分な容量を持つ別のゾーン)に配置されます。また、クラスタの作成時にゾーンを指定することもできます。

プリエンプティブル VM を使用する

Dataproc クラスタでは、プリエンプティブル VM のインスタンスをワーカーとして使用できます。これにより、クリティカルでないワークロードの時間当たりコンピューティング コストが、通常のインスタンスを使用する場合よりも低くなります。ただし、プリエンプティブル VM を使用する場合は、いくつか考慮すべき要素があります。

  • プリエンプティブル VM は、HDFS ストレージには使用できません。
  • プリエンプティブル VM の作成時には、デフォルトでブートディスクのサイズが小さく設定されます。シャッフルを多用するワークロードを実行している場合は、この構成を無効にすることをおすすめします。詳しくは、Dataproc のドキュメントのプリエンプティブル VM についてのページをご覧ください。
  • 半数以上のワーカーをプリエンプティブルにすることはおすすめできません。
  • プリエンプティブル VM を使用する場合は、VM の可用性が低くなる可能性があるため、タスク障害に対する耐性が高めになるよう、クラスタの構成を調整することをおすすめします。たとえば、YARN の構成で次のような設定を行います。

    yarn.resourcemanager.am.max-attempts
    mapreduce.map.maxattempts
    mapreduce.reduce.maxattempts
    spark.task.maxFailures
    spark.stage.maxConsecutiveAttempts
  • プリエンプティブル VM は、クラスタに簡単に追加または削除できます。詳しくは、プリエンプティブ VM をご覧ください。

次のステップ