Dataproc ベスト プラクティス ガイド
Google Cloud Japan Team
※この投稿は米国時間 2021 年 6 月 15 日に、Google Cloud blog に投稿されたものの抄訳です。
Dataproc は、Apache Spark、Presto、Apache Flink、Apache Hadoop など、オープンソースの分散処理プラットフォームを Google Cloud でホストするためのフルマネージド サービスです。オンプレミス クラスタとは異なり、Dataproc を使用すると、さまざまなサイズのクラスタをオンデマンドで柔軟にプロビジョニングしたり、構成したりできます。また、費用の低減やパフォーマンスの向上、クラウドで実行しているワークロードの効率的な運用管理を実現する優れた機能も利用できます。このブログ投稿では、Hadoop や Spark ベースのワークロードを実行するために Dataproc を導入する際のストレージ、コンピューティング、運用についてベスト プラクティスを紹介します。
コンピューティング
自動スケーリング
自動スケーリングを使用すると、クラスタは YARN メモリの指標に応じてスケールアップまたはスケールダウンします。クラスタに対する適切な自動スケーリング ポリシーを判断するには、一定期間にわたって慎重にモニタリングとチューニングを行わなければならない場合があります。また、あるタイプのワークロードには適している自動スケーリング ポリシーが、他のタイプにはうまく機能しない可能性もあります。こうした状況に対応するには、特定のタイプのワークロードに合わせてチューニングしたさまざまな自動スケーリング ポリシーを使用して、複数のクラスタを生成することができます。
ワークロードに基づいて自動スケーリング ポリシーを設定する際は、各種の構成方法について詳しく説明したドキュメントを参照してください。こうした構成を活用することにより、クラスタのスケールアップやスケールダウンをどれぐらい厳しく行うかを調整することができます。たとえば、業務上重要なアプリケーションやジョブを実行しているクラスタにはより積極的なスケールアップ構成を適用するのが理にかなっていますが、優先度の低いジョブを実行しているクラスタにはそれほど厳しい構成は必要ないかもしれません。
通常は、次の点に留意してください。
Dataproc の自動スケーリングは、割り当て済みのメモリ、使用可能なメモリ、保留中のメモリといった YARN の指標によって左右されます。
スケールダウンは、スケールアップほど単純ではなく、タスクの再処理やジョブの失敗につながる場合があります。これは、マップタスクが中間シャッフル データをローカル ディスクに保存するためです。ノードがデコミッションされると、実行中のジョブが使用しているシャッフル データが失われる可能性があります。これが起こると、通常は次のような形で表出します。
MapReduce ジョブ - 「取得に失敗しました」
Spark - 「FetchFailedException」「接続できませんでした」
可能であれば、セカンダリ ワーカー(データノードがないもの)のみをスケーリングしてください。これにより、削除するノードから HDFS を動かす必要がなくなります。その結果、HDFS ブロックの破損や Namenode の競合状態でクラスタの健全性が失われるリスクが大幅に減少します。次のセクションを参照し、セカンダリ ワーカーをプリエンプティブとして構成するかどうか判断してください。デフォルトでは、セカンダリ ワーカーはプリエンプティブになっています。
正常なデコミッションは、クラスタで実行する最長のジョブよりも長い値に設定することをおすすめします。正常なデコミッションやクールダウンの期間中は、クラスタはスケールアップまたはスケールダウンしません。そのため、リソース不足が原因で小さいジョブの速度が低下する可能性があります。長時間実行される機密性の高いワークロードについては、別のエフェメラル クラスタにスケジューリングすることをご検討ください。逆に、短時間で完了するジョブは、1 つの自動スケーリング クラスタで実行するようにします。
高度な柔軟性モード(EFM)を使用して、より厳しい自動スケーリング ポリシーを定義することができます。シャッフル データはプライマリ ワーカーノードに保存されるため、セカンダリを積極的にスケールアップ / スケールダウンできます。
プリエンプティブル VM(PVM)
プリエンプティブル VM は、バッチジョブやフォールト トレラントなワークロードに適した、有効時間が短く手頃な料金のコンピューティング インスタンスです。通常の VM と比較すると大幅に割引されていますが、予告なしにいつでもクラスタから削除される可能性があります。フォールト トレラント ワークロードを実行しているクラスタ内で一定の割合で PVM を使用すると、費用を節約できます。ただし、PVM の比率が高くなったり、フォールト トレラントではないジョブに利用したりすると、ジョブの失敗や関連するその他の問題が発生するおそれがあります。一般的に、推奨される方法は次のとおりです。
セカンダリ ワーカーは HDFS を実行しないため、セカンダリ ワーカーのみに PVM を使用します。
PVM に設定するのは最大セカンダリ ワーカー数の 30% 未満にとどめます。
PVM はフォールト トレラントなジョブのみに使用し、下位レベルの環境で厳密なテストを終えてから本番環境にアップグレードします。
アプリケーション マスターやタスク / エグゼキュータの最大試行回数を増やすことにより、アプリケーション(MapReduce/Spark など)のフォールト トレランスを強化します。さらに、dataproc:am.primary_only のフラグを true に設定し、アプリケーション マスターがプリエンプティブルでないワーカーでのみ開始されるようにしてください。
ストレージ
従来のオンプレミス型の Hadoop と異なり、Dataproc はコンピューティングとストレージが切り離されています。これにより、コンピューティングとストレージのスケーリングも分離されます。ここでは、Dataproc で使用可能なストレージ オプションの概要を示します。
Google Cloud Storage は、永続ストレージのあらゆるニーズに対応できる、おすすめのストレージ オプションです。GCS は Hadoop 互換ファイル システム(HCFS)であるため、最小限の変更で Hadoop や Spark のジョブの読み取りと書き込みが可能になります。さらに、GCS に保存されているデータには、他の Dataproc クラスタやプロダクト(BigQuery など)からアクセスできます。
Dataproc の HDFS ストレージは、ワーカーノードにアタッチされている永続ディスク(PD)上に構築されます。これは、HDFS に保存されているデータは(GCS または他の永続ストレージにコピーされない限り)一時的なものであり、ストレージ費用が相対的に高くなることを意味します。そのため、HDFS ストレージの使用を最小限に抑えることをおすすめします。ただし、パフォーマンス上の理由などから、小さな HDFS フットプリントを維持することが妥当な場合もあります。その場合、限られた HDFS ストレージ容量で Dataproc クラスタをプロビジョニングし、永続ストレージのすべてのニーズを GCS にオフロードすることができます。ワーカーノードにアタッチするストレージ ディスクのサイズと性質を判断する際は、以下の点を考慮してください。
通常、標準 PD の読み取り / 書き込みスループットは、アタッチされたディスクのサイズが大きくなると増加します。
ゾーンディスクは、リージョン ディスクよりも読み取り / 書き込みスループットが高くなります。
PD には数種類あり、パフォーマンスや費用に関するさまざまな要件に応じて使い分けることができます。
ブートディスクを補うために、クラスタのマスターノード、プライマリ ワーカーノード、セカンダリ ワーカーノードにローカル SSD(ソリッド ステート ドライブ)をアタッチできます。
ローカル SSD を使用すると、永続ディスクよりも読み取りと書き込みの処理時間を短縮できます。各ローカル SSD ディスクのサイズは 375 GB に固定されていますが、複数のローカル SSD をアタッチして SSD ストレージ容量を増やすことができます。
パフォーマンスに関する注意事項
HDFS のパフォーマンスと GCS の柔軟性と耐久性のバランスをとるために、ソースと最終のデータセットを GCS に保存し、中間のデータセットを HDFS に保存するようにワークロードを設計できます。
さらに、GCS ファイルへの読み取り / 書き込みのレイテンシを短縮するには、次の対策の導入をご検討ください。
GCS バケットがクラスタと同じリージョン内にあることを確認します。
CS 上の Hive マネージド テーブルの auto.purge を無効にすることをご検討ください。無効にすることで、上書きや削除の際に作成されるゴミ箱へのコピーを排除できます。
Spark SQL を使用する場合は、Spark 3 以降(Dataproc 2.0 以降で使用可能)を使用してください。たとえば、Spark 2.x では、INSERT OVERWRITE について報告されている問題があります。
通常、GCS にファイルがたくさんあるほど、GCS 上でのデータの読み込み / 書き込み / 移動 / 削除により多くの時間がかかります。そのため、spark.sql.shuffle.partitions や spark.default.parallelism、Hadoop パラメータの mapreduce.job.reduces など、Spark SQL / RDD / DataFrame のパラメータをチューニングして、サイズの異なるファイルや小さいサイズのファイルが GCS にたくさん書き込まれる可能性を減らすようにしてください。
ワーカーノード PD はデフォルトでシャッフル データを保持しています。YARN ノードに異常が表示される一般的な原因は、ノードのディスク容量が不足しているためです。該当するクラスタの YARN Node Manager ログで異常を示すノードのディスク容量を確認することによって、状況を検証できます。ディスク サイズを増やすか、同時に実行するジョブの数を減らしてください。
高度な柔軟性モード(EFM)
スケールダウンやプリエンプション(以下のセクションを参照)のためにワーカーノードを削除することにより、ノードにローカル保存されているシャッフル(中間データ)が失われることは珍しくありません。このような場合でのジョブの遅延を最小限に抑えるには、クラスタで高度な柔軟性モードを有効にすることを強くおすすめします。EFM には 2 つのモードがあります。
プライマリ ワーカー シャッフル - Spark ジョブに推奨。マッパーがプライマリ ワーカーにデータを書き込めるようになります。Map フェーズよりも少ないノードで実行されることが多い Reduce フェーズは、プライマリ ワーカーからデータを読み込みます。
HCFS(Hadoop 互換ファイル システム)シャッフル - マッパーは HCFS 実装にデータを書き込みます(デフォルトは HDFS)。プライマリ ワーカー モードと同様に、プライマリ ワーカーのみが HDFS と HCFS の実装に参加します(HCFS シャッフルが Cloud Storage コネクタを使用する場合、データはクラスタ外に保存されます)。このモードは、データ量があまり大きくないジョブにはメリットがあります。ただし、大量のデータを処理するジョブにはおすすめできません。シャッフル データのレイテンシが高くなり、ジョブの実行時間が長くなる可能性があります。
EFM の利用は、プリエンプティブル VMを使用するクラスタの場合や、セカンダリ ワーカー グループを使って自動スケーリングの安定性を改善する場合に強くおすすめします。EFM を有効にする場合は、プライマリ ワーカーのサイズが、セカンダリ ワーカー、CPU、ディスクサイズとの比率でみて正しく設定されているか確認します。たとえば、プライマリ ワーカーに対しセカンダリ ワーカーの比率が非常に高い場合、シャッフルがプライマリ ワーカーのボトルネックになるため、クラスタの安定性にマイナスの影響が出るおそれがあります。シャッフルのパフォーマンスをさらに向上させるには、シャッフル パラメータをデフォルト値から増やしてください。
運用
Dataproc ラベル
ラベルは Key-Value ペアで、Dataproc のクラスタやジョブにタグ付けできます。クラスタの作成時やジョブの送信時にラベルが追加されます。以下にラベルの使用例を示します。
請求 - ユーザー、チーム、あるいは部門ごとの費用を把握するため、Dataproc クラスタ関連費用を追跡および統合できます。これは、クラスタ、ジョブ、その他のリソースでラベルごとに請求データをフィルタすることで実現できます(team:marketing、team:analytics など)。
検索と分類 - ラベルに基づいて、さまざまなリソース(ジョブ、クラスタ、環境、コンポーネントなど)のタグ付け、フィルタリング、検索を容易に行うことができます。
自動化 - クラスタやジョブのラベルに基づいて、ジョブ / ワークフローを Dataproc クラスタプールに動的に送信します。たとえば、優先度の高いジョブを積極的な自動スケーリングを使用してクラスタに送信したり、ML やデータ サイエンスのラベルでタグ付けされたジョブを TPU を使用してクラスタで実行したりできます。
モニタリング - ラベルはモニタリングでも非常に役立ちます。Cloud Logging で高度なフィルタを使いながら、特定のラベルのイベントを除外する場合などです。
アップグレード - ラベルを使用しているクラスタに対し、ローリング アップグレードを適用できます。これは、ワークロードやチームに応じて特定のバージョンの Dataproc を維持する場合に特に役立ちます。
クラスタプール
従来、組織には、異なるハードウェア(ノード、スイッチなど)で構成された Hadoop クラスタが 1 つ以上存在していました。このような環境では、クラスタの管理やスケーリング、ジョブのチューニングは面倒な作業です。Dataproc を使用する際は、ラベルを使用してクラスタプールを構築してください。これらのプールには、Dataproc ワークフロー テンプレートを割り当てることができます。これにより、クラスタプールにワークフロー テンプレートを使用してジョブを送信できます。1 つのクラスタプールには、1 つ以上のクラスタを割り当てることができます。ワークフロー テンプレートがクラスタプールに割り当てられると、クラスタプール内のいずれかの Dataproc クラスタに対して実行されます。
クラスタプールを編成する方法をいくつか紹介します。
ジョブタイプ - 優先度(重大、高、低など)やリソース使用率(CPU やメモリ集約、ML など)といった特性に応じて、ジョブを分類できます。たとえば、コンピューティング集約型、I/O 集約型、ML 関連のユースケースを個別に実行するために異なるクラスタプールを用意すると、パフォーマンスが向上し、費用も抑えられる可能性があります(ハードウェアと構成がワークロードのタイプに合わせてカスタマイズされるため)。
ユーザーとグループ - 特定のチームやグループからのジョブを実行するようにクラスタを構成する場合も、クラスタプールは有効な手段です。たとえば、データ サイエンティストは、標準的な CPU、PD の Dataproc クラスタで標準的な ETL ジョブを実行しながら、TPU を用いたクラスタに Spark ML ジョブを送信できます。
コンプライアンスとデータ ガバナンス - ラベルとクラスタプールを活用すると、データ ガバナンスやコンプライアンスのニーズにも容易に対応できます。たとえば、特定のセキュリティやコンプライアンスのニーズを持つジョブを他の環境よりもセキュリティが強化された環境で実行できます。これは、クラスタプールを使用して実現できます。
クラスタプールは実行時間の長いクラスタにも役立ちます。特定の種類のワークフローに合わせて最適化した自動スケーリング クラスタプールを作成すると便利な場合があります。クラスタプールを使用した Dataproc クラスタのローリング アップグレードをまったく新しい方法で実行できます。現在のワークロードにダウンタイムを発生させずに Dataproc のクラスタプールをアップグレードするには、以下の手順に沿って操作してください。
新しいクラスタプールをわかりやすいタグ(「dataproc-2.1」など)を付けてターゲット バージョンでスピンアップし、自動スケーリングを true に設定します。
新しいワークフロー / ジョブをすべて新規のクラスタプールに送信します。ラベルを使用して、クラスタプールにジョブを送信できます。
古いバージョンを使用した以前のクラスタプールで処理中のワークロードが完了するのを待ちます。すべてのジョブが完了したら、クリーンアップします。
クラスタのスケジュール設定された削除
使用されていないアイドル リソースの費用削減は、どの IT 部門にとっても非常に重要な課題です。このようなシナリオを回避するには、クラスタのスケジュール設定された削除機能を有効にしてクラスタを作成します。この構成は Infrastructure as Code(IaC)コードに埋め込むことができます(Cloud Build や Terraform スクリプトなどの IaC)。スケジュールされた削除を有効にする場合、構成済みの削除条件を注意深く確認し、組織の優先事項に適合していることを確認します。
実行時間の長いクラスタと有効期間が短い(エフェメラル)クラスタ
お客様からよく寄せられる質問に、有効期間が短い(エフェメラル)クラスタと実行時間の長いクラスタを使い方があります。名前のとおり、エフェメラル クラスタは短い期間しか使えません。Hadoop や Spark の大半のユースケースでは、エフェメラル クラスタを使用することで、ワークロードのサイズに合わせた自動スケーリング クラスタを柔軟に作成できます。エフェメラル クラスタがもたらす主な利点は次のとおりです。
ワークロード固有のクラスタ構成: エフェメラル クラスタを使用すると、個別のワークフローに合わせてクラスタ構成をカスタマイズでき、異なるハードウェア プロファイルや構成を管理する必要がなくなります。
ワークロード固有のハードウェア プロファイル: エフェメラル クラスタの作成に使用される VM も、個別のユースケースに応じてカスタマイズできます。Dataproc では、多岐にわたる VM を提供しています(汎用、メモリ最適化、コンピューティング最適化など)。ワークフローのサイズに合わせたクラスタを使用することにより、実行するうえで最適なハードウェア(コンピューティング インスタンス)を選択できます。コンピューティング集約型のユースケースなどでは、vCPU(コンピューティング最適化マシン [C2])が多いうえ、I/O 集約型のユースケースによりメモリの多い永続ディスク(メモリ最適化マシン)を割り当てることができるためメリットがあります。詳しくは、このトピックについて詳細を説明したブログをご覧ください。
費用アトリビューション: クラスタの有効期間は個別のワークフローに限定されるため、費用のアトリビューションが簡単かつわかりやすくなります。
運用オーバーヘッドの削減
クラスタの有効期間が短いため、従来のようなメンテナンスが必要ありません。CICD と統合すると、最小限の介入でエフェメラル クラスタをデプロイしたり、クリーンアップしたりできます。
開発、テスト、本番環境用に個別のインフラストラクチャを維持する必要はありません。同じクラスタ定義を使用して、必要な数だけ異なるバージョンのクラスタをスピンアップし、完了後にはそれらをクリーンアップできます。
セキュリティの簡素化: 1 つのユースケース、1 人のユーザーに 1 つのクラスタを使用するため、対応するセキュリティ要件もシンプルになります。マルチテナンシーからネットワーク ファイアウォール ルールまで、大規模なモノリス クラスタでは、さまざまなセキュリティ要件に対応する必要があります。エフェメラル クラスタを使用することにより、一度に 1 つのユースケースに集中できるため、こうしたニーズを簡素化することができます。
エフェメラル クラスタについて詳しくは、こちらの公式ドキュメントを参照してください。多くのユースケースでは、エフェメラル クラスタを使用します。ただし、常にウォーム状態でクラスタを維持しなければならない場合などでは、一部のインスタンスに実行時間の長いクラスタが必要になることがあります。たとえば、継続的な分析を要するユースケースでは、実行時間の長いクラスタをスピンアップすることが理にかなっています。24 時間無休で実行されるバッチジョブやストリーミング ジョブ(定期的に実行する、または常時実行するリアルタイムのジョブ)などがこれに該当します。未加工のイベントを集約し、1 日を通して BigQuery に取り込む処理を 1 時間ごとに実行するバッチジョブには適しているかもしれません。実行時間の長いクラスタが使用されることが多いもう 1 つのシナリオとしては、アドホックな分析クエリがあります。アナリストが Hive や Presto、Spark で比較的短期間のクエリをすばやく実行できるようにする必要がある場合です。
この 2 つ目のシナリオにはエフェメラル クラスタが適しているように思えますが、数分で実行される Hive クエリ用にエフェメラル クラスタを作成すると、オーバーヘッドが発生する可能性があります。ライブユーザーが多くなる場合は特にそうです。前述のように、これは自動スケーリング クラスタプールを実行するのが有効なユースケースといえるでしょう。現在、Dataproc ではクラスタ用に「開始」と「停止」の機能をサポートしています。実行時間の長いクラスタを使用しないときは、この機能を使用してクラスタをオフにできます。
モニタリングとロギング
デフォルトの Dataproc ビューは、Cloud Monitoring をベースにクラスタの健全性の概要が表示されるダッシュボードです。Cloud Monitoring とは、指標のモニタリング、アラート送信、フィルタリング、集計を行う Google Cloud サービスです。モニタリング、VM インスタンス、構成、ウェブ インターフェースなどのタブがあります。
Dataproc の UI
[ジョブ] タブには、最近のジョブがタイプ、開始時間、経過時間、ステータスの情報とともに表示されます。
[VM インスタンス] ビューには、クラスタを構成する GCE インスタンスのステータスが表示されます。GCE VM インスタンス名をクリックすると、インスタンス構成を確認できます。各 GCE VM ノードには、GCP 全体で指標を収集するユニバーサル ソリューションの Cloud Monitoring エージェントが装備されています。
指標
ユーザーは、MonitoringAPI や Cloud Monitoring ダッシュボードを通じて GCP 指標にアクセスすることもできます。これらの指標は、モニタリングやアラートのほか、クラスタで飽和状態にあるリソースを見つける目的で使用できます。また、カスタム指標を Stackdriver に排出し、その指標を基にダッシュボードを作成することも可能です。Stackdriver には、指標に関連付けられた固有の費用があります。一部は無料ですが、追加費用が発生するものもありますので注意してください。
Hadoop エコシステムの UI
YARN、HDFS、Spark のサーバー UI など、Hadoop エコシステムの UI を有効にできます。詳しくは、コンポーネント ゲートウェイの有効化に関するドキュメントをご覧ください。
ロギング
dataproc:job.history.to-gcs.enabled を true に設定し、ローカル ディスクの消費量を最小限に抑えてください。イメージ 1.5 以降では、デフォルトで有効になっています。この設定により、GCS バケットに MapReduce や Spark の履歴ファイルが残るため、ノードのディスク容量が不足し、クラスタの健全性が失われる可能性が減ります。
--driver-log-levels パラメータを使用し、Cloud Logging にログインするレベルをコントロールします。
診断とデバッグ
Spark 履歴サーバーや YARN 履歴サーバーの UI は、対応するアプリケーションの表示やデバッグに役立ちます。
診断ユーティリティを使用して、その時点におけるクラスタの状態のスナップショットを提供する tarball を取得します。GCS 上に tarball が生成されたら、クラスタを安全に削除できます。tarball には、クラスタの設定、Dataproc エージェントの jstack とログ、NodeManager の JMX 指標、ResourceManager や他のシステムのログが含まれます。GCP サポートに問い合わせる際は、エンジニアがクラスタの問題を診断し、トラブルシューティングを行うことができるように、該当の tarball を提供してください。
まとめ
ここまで、GCP で Dataproc サービスを使用する際のベスト プラクティスについて理解を深めていただいたと思います。このセクションでは、ストレージ、パフォーマンス、クラスタプール、ラベルについておすすめの使い方を紹介しました。また、「エフェメラル クラスタと実行時間の長いクラスタの使い方」など、よくある質問にもお答えしました。Dataproc を有効に活用いただくために、ここにある情報をぜひお役立てください。
-Google データ分析担当 SCE Ajay Kumar
-Google データ分析担当 SCE Timothy Manuel