分散データの前処理に GKE と Ray を活用: エンタープライズ向けにスケーリング
Kavitha Rajendran
AI Specialist, Solutions Architect
Shobhit Gupta
Solutions Architect
※この投稿は米国時間 2025 年 1 月 9 日に、Google Cloud blog に投稿されたものの抄訳です。
ML モデルの急速な進化に伴い、データセットも増加の一途をたどっています。データの前処理として従来から利用されている方法はスケーリングが困難なため、このようなデータの急増は ML オペレーション(MLOps)のライフサイクルに重大なボトルネックを引き起こします。データの前処理は、元データをモデルのトレーニングに適した形式に変換するうえで重要なフェーズですが、生産性を低下させる要因になる可能性があります。
この記事では、この課題に対処する方法として、マネージド Kubernetes サービスである Google Kubernetes Engine(GKE)と、Python アプリケーションをスケーリングするための分散コンピューティング フレームワークである Ray を活用した分散データの前処理パイプラインをご紹介します。この組み合わせにより、大規模なデータセットの前処理を効率的に行い、複雑な変換を処理して、ML ワークフロー全体を高速化できます。
データの前処理は必須
MLOps におけるデータの前処理フェーズは基礎となるものであり、ML モデルの品質とパフォーマンスに直接影響します。前処理には、データ クリーニング、特徴量エンジニアリング、スケーリング、エンコードなどのタスクが含まれ、これらはすべて、モデルがデータから効果的に学習できるようにするために不可欠です。
データの前処理に多くのオペレーションが必要な場合、ボトルネックが発生してデータの処理速度が全体的に低下する可能性があります。次の例では、Google Cloud Storage バケットへの複数の画像のアップロードを含む、前処理データセットのユースケースについて説明します。最大 140,000 ものオペレーションを伴い、順次実行するとボトルネックが発生して、完了までに 8 時間以上かかるものです。
データセットこの例では、20,000 個のプロダクトからなる事前にクロールしたデータセットを使用します。
データの前処理手順このデータセットには 15 の異なる列があります。注目する列は、「uniq_id」、「product_name」、「description」、「brand」、「product_category_tree」、「image」、「product_specifications」です。
null 値と重複を削除するほか、関連する列に対して以下の処理を行います。
-
description: ストップワードと句読点を削除して、プロダクトの説明をクリーンアップします。
-
product_category_tree: 異なる列に分割します。
-
product_specifications: プロダクトの仕様を Key-Value ペアに変換します。
-
image: 画像 URL のリストを解析します。URL を検証して画像をダウンロードします。
ここで、前処理タスクとして大規模なデータセットの各行から複数の画像 URL を抽出し、それらの画像を Cloud Storage バケットにアップロードするシナリオを考えてみましょう。簡単な作業に思えるかもしれませんが、各データセットは 20,000 行以上あり、各行に最大 7 つの URL が含まれる可能性があるため、この処理を Python で順次実行すると非常に時間がかかります。これまでの経験上、このようなタスクが完了するまで 8 時間以上かかることもあります。
ソリューション: 並列処理を実装してスケーラビリティを確保する
このスケーラビリティの問題に対処するために、並列処理に目を向けます。データセットをより小さなチャンクに分割し、複数のスレッドに処理を分散すれば、全体の実行時間を大幅に短縮できます。これを実現する分散コンピューティング プラットフォームとして、ここでは Ray を使用します。
Ray: 分散コンピューティングを簡素化
Ray は、Python のアプリケーションやライブラリをスケーリングするために設計された強力なフレームワークです。複数のワーカーに計算を分散できるシンプルな API を提供しており、並列データ前処理パイプラインの実装に最適です。
今回の特定のユースケースでは、Ray を利用して、URL から Cloud Storage バケットに画像をダウンロードする役割を担う Python 関数を複数の Ray ワーカーに分散します。Ray の抽象化レイヤーが、ワーカーの管理とコミュニケーションの複雑さを処理するため、コアの前処理ロジックに集中できるようになります。
Ray のコア機能には以下が含まれます。
-
タスク並列処理: Ray を使用すると、任意の関数を個別の Python ワーカー上のタスクとして非同期で実行できるようになり、画像のダウンロード プロセスを簡単に並列化できます。
-
アクターモデル: Ray の「アクター」には、ステートフルなコンピューティングをカプセル化する方法があるため、共有状態が必要となる複雑な前処理シナリオに適しています。
-
簡素化されたスケーリング: Ray は、単一のマシンから本格的なクラスタに至るまでシームレスにスケーリングできるため、さまざまなデータサイズやコンピューティング ニーズに対応できる柔軟なソリューションとなります。
実装の詳細
GKE 上でデータの前処理を実行するために、accelerated-platforms リポジトリを使用しました。このリポジトリでは、GKE クラスタを構築して前提条件(クラスタ上で Ray を実行するなど)を構成するコードが提供されるため、コンテナとしてクラスタ上でデータの前処理を実行できるようになります。このジョブは 3 つのフェーズに分かれています。
1. データセットのパーティショニング: 大量のデータセットをより小さなチャンクに分割します。
20,000 行の入力データを、それぞれ 199 行からなる 101 個の小さなチャンクに分割しました。各チャンクは Ray タスクに割り当てられ、Ray ワーカー上で実行されます。
2. Ray タスクの分配: Ray のリモートタスクを作成します。Ray はワーカーを作成して管理し、ワーカーにタスクを分配します。
3. 並列データ処理: Ray タスクがデータを準備し、同時に画像を Cloud Storage にダウンロードします。
結果
Ray と GKE を活用することで、処理時間を大幅に短縮できました。20,000 行に対して 8 時間以上かかっていた前処理時間は、わずか 17 分に短縮され、約 23 倍の高速化を実現しました。データサイズが増加した場合は、バッチサイズを調整し、Ray の自動スケーリング機能を使用することで、同様のパフォーマンスを実現できます。
データの前処理の課題を克服
分散データの前処理における GKE と Ray の活用は、今日の ML チームが直面するデータの前処理の課題に対処できる堅牢でスケーラブルなソリューションです。並列処理とクラウド インフラストラクチャを活用することで、データ準備の加速やボトルネックの軽減が可能になり、データ サイエンティストや ML エンジニアは、モデル開発とイノベーションに集中できるようになります。さらに詳しく知りたい方は、GKE クラスタ上で Ray を使用する、このデータの前処理のユースケースを示す Deployment を実行してください。