このチュートリアルでは、Apache Beam RunInference API を使用して、ストリーミング Dataflow パイプラインで大規模言語モデル(LLM)を実行する方法について説明します。
RunInference API の詳細については、Apache Beam ドキュメントの Beam ML についてをご覧ください。
サンプルコードは GitHub で入手できます。
目標
- モデルの入力とレスポンスの Pub/Sub トピックとサブスクリプションを作成します。
- Vertex AI カスタムジョブを使用して、Cloud Storage にモデルを読み込みます。
- パイプラインを実行します。
- モデルに質問し、回答を得ます。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
このチュートリアルでは、依存関係をインストールするため 5 GB 以上の空き容量のあるマシンを使用します。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/aiplatform.user
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。PROJECT_NUMBER
: プロジェクトの番号。プロジェクト番号を確認するには、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
: 個々のロール。
- Google Cloud プロジェクト ID をコピーします。この値は、このチュートリアルの後半で必要になります。
Google Cloud リソースを作成する
このセクションでは、次のリソースを作成します。
- 一時的な格納場所として使用する Cloud Storage バケット
- モデルのプロンプトに使用する Pub/Sub トピック
- モデルのレスポンスに使用する Pub/Sub トピックとサブスクリプション
Cloud Storage バケットを作成する
gcloud CLI を使用して Cloud Storage バケットを作成します。このバケットは、Dataflow パイプラインによって一時ストレージの場所として使用されます。
バケットを作成するには、gcloud storage buckets create
コマンドを使用します。
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
次のように置き換えます。
- BUCKET_NAME: バケットの命名要件を満たす Cloud Storage バケットの名前。Cloud Storage のバケット名は、グローバルに一意である必要があります。
- LOCATION: バケットのロケーション。
バケット名をコピーします。この値は、このチュートリアルの後半で必要になります。
Pub/Sub トピックとサブスクリプションを作成する
2 つの Pub/Sub トピックと 1 つのサブスクリプションを作成します。1 つのトピックはモデルに送信する入力プロンプト用です。もう一つのトピックとそれに関連付けられたサブスクリプションはモデルのレスポンス用です。
トピックを作成するには、
gcloud pubsub topics create
コマンドを 2 回(トピックごとに 1 回ずつ)実行します。gcloud pubsub topics create PROMPTS_TOPIC_ID gcloud pubsub topics create RESPONSES_TOPIC_ID
次のように置き換えます。
- PROMPTS_TOPIC_ID: モデルに送信する入力プロンプトのトピック ID(例:
prompts
) - RESPONSES_TOPIC_ID: モデルのレスポンスのトピック ID(
responses
など)
- PROMPTS_TOPIC_ID: モデルに送信する入力プロンプトのトピック ID(例:
サブスクリプションを作成してレスポンス トピックに関連付けるには、
gcloud pubsub subscriptions create
コマンドを使用します。gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
RESPONSES_SUBSCRIPTION_ID は、モデルのレスポンスのサブスクリプション ID(
responses-subscription
など)に置き換えます。
トピック ID とサブスクリプション ID をコピーします。これらの値は、このチュートリアルの後の部分で必要になります。
環境を準備する
コードサンプルをダウンロードして、チュートリアルを実行するための環境を設定します。
python-docs-samples GitHub リポジトリのコードサンプルには、このパイプラインを実行するために必要なコードが含まれています。独自のパイプラインを構築する準備ができたら、このサンプルコードをテンプレートとして使用できます。
venv を使用して、パイプライン プロジェクトを実行するための隔離された Python 仮想環境を作成します。仮想環境を使用すると、1 つのプロジェクトの依存関係を他のプロジェクトの依存関係から分離できます。Python をインストールして仮想環境を作成する方法については、Python 開発環境の設定をご覧ください。
git clone
コマンドを使用して GitHub リポジトリのクローンを作成します。git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
run-inference
ディレクトリに移動します。cd python-docs-samples/dataflow/run-inference
コマンド プロンプトを使用している場合は、システムで Python 3 と
pip
が実行されていることを確認します。python --version python -m pip --version
必要に応じて、Python 3 をインストールします。
Cloud Shell を使用している場合は、Cloud Shell に Python がすでにインストールされているため、この手順をスキップできます。
Python 仮想環境を作成します。
python -m venv /tmp/env source /tmp/env/bin/activate
依存関係をインストールします。
pip install -r requirements.txt --no-cache-dir
モデル読み込みのコードサンプル
このチュートリアルのモデル読み込みコードは、モデルの state_dict
オブジェクトを Cloud Storage に読み込む Vertex AI カスタムジョブを起動します。
スターター ファイルは、次のようになります。
パイプラインのコードサンプル
このチュートリアルのパイプライン コードは、次の処理を行う Dataflow パイプラインをデプロイします。
- Pub/Sub からプロンプトを読み取り、テキストをトークン テンソルにエンコードします。
RunInference
変換を実行します。- 出力トークンのテンソルをテキストにデコードし、レスポンスを Pub/Sub に書き込みます。
スターター ファイルは、次のようになります。
モデルを読み込む
LLM は非常に大規模なモデルになる可能性があります。一般に、より多くのパラメータでトレーニングされた大規模なモデルほど、より良い結果が得られます。ただし、モデルが大きいほど、実行により大きなマシンとより多くのメモリが必要になります。大規模なモデルは CPU での実行速度も遅くなる可能性があります。
Dataflow で PyTorch モデルを実行する前に、モデルの state_dict
オブジェクトを読み込む必要があります。モデルの state_dict
オブジェクトにはモデルの重みが保存されます。
Apache Beam RunInference
変換を使用する Dataflow パイプラインでは、モデルの state_dict
オブジェクトを Cloud Storage に読み込む必要があります。state_dict
オブジェクトを Cloud Storage に読み込むために使用するマシンには、モデルの読み込みに十分なメモリが必要です。重みをダウンロードして Cloud Storage にアップロードするため、マシンには高速のインターネット接続も必要です。
次の表に、各モデルのパラメータの数と、各モデルの読み込みに必要な最小メモリを示します。
モデル | パラメータ | 必要なメモリ |
---|---|---|
google/flan-t5-small |
8,000 万 | 320 MB 超 |
google/flan-t5-base |
25,000 万 | 1 GB 超 |
google/flan-t5-large |
78,000 万 | 3.2 GB 超 |
google/flan-t5-xl |
30 億 | 12 GB 超 |
google/flan-t5-xxl |
110 億 | 44 GB 超 |
google/flan-ul2 |
200 億 | 80 GB 超 |
小さなモデルをローカルに読み込むこともできますが、このチュートリアルでは、適切なサイズの VM でモデルを読み込む Vertex AI カスタムジョブを起動する方法について説明します。
LLM は非常に大きくなる可能性があるため、このチュートリアルの例では、state_dict
オブジェクトをデフォルトの float32
形式ではなく float16
形式で保存します。この構成では、各パラメータが 32 ビットではなく 16 ビットを使用するため、state_dict
オブジェクトのサイズが半分になります。サイズが小さいほど、モデルの読み込みに必要な時間が短くなります。ただし、形式の変換は、VM がモデルと state_dict
オブジェクトの両方をメモリに適合させる必要があることを意味します。
次の表に、state_dict
オブジェクトが float16
形式で保存された後にモデルを読み込むための最小要件を示します。この表には、Vertex AI を使用してモデルを読み込む際の推奨マシンタイプも示します。Vertex AI の最小(デフォルト)のディスクサイズは 100 GB ですが、一部のモデルではより大きなディスクが必要になる場合があります。
モデル名 | 必要なメモリ | マシンタイプ | VM メモリ | VM ディスク |
---|---|---|---|---|
google/flan-t5-small |
480 MB 超 | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-base |
1.5 GB 超 | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-large |
4.8 GB 超 | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-xl |
18 GB 超 | e2-highmem-4 |
32 GB | 100 GB |
google/flan-t5-xxl |
66 GB 超 | e2-highmem-16 |
128 GB | 100 GB |
google/flan-ul2 |
120 GB 超 | e2-highmem-16 |
128 GB | 150 GB |
Vertex AI カスタムジョブを使用して、モデルの state_dict
オブジェクトを Cloud Storage に読み込みます。
python download_model.py vertex \
--model-name="MODEL_NAME" \
--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
--job-name="Load MODEL_NAME" \
--project="PROJECT_ID" \
--bucket="BUCKET_NAME" \
--location="LOCATION" \
--machine-type="VERTEX_AI_MACHINE_TYPE" \
--disk-size-gb="DISK_SIZE_GB"
次のように置き換えます。
- MODEL_NAME: モデルの名前(
google/flan-t5-xl
など)。 - VERTEX_AI_MACHINE_TYPE: Vertex AI カスタムジョブを実行するマシンのタイプ(
e2-highmem-4
など)。 - DISK_SIZE_GB: VM のディスクサイズ(GB)。最小サイズは 100 GB です。
モデルのサイズによっては、モデルの読み込みに数分かかることがあります。ステータスを表示するには、Vertex AI の [カスタムジョブ] ページに移動します。
パイプラインを実行する
モデルを読み込んだ後、Dataflow パイプラインを実行します。パイプラインを実行するには、各ワーカーが使用するモデルとメモリの両方がメモリに収まる必要があります。
次の表に、推論パイプラインの実行に推奨されるマシンタイプを示します。
モデル名 | マシンタイプ | VM メモリ |
---|---|---|
google/flan-t5-small |
n2-highmem-2 |
16 GB |
google/flan-t5-base |
n2-highmem-2 |
16 GB |
google/flan-t5-large |
n2-highmem-4 |
32 GB |
google/flan-t5-xl |
n2-highmem-4 |
32 GB |
google/flan-t5-xxl |
n2-highmem-8 |
64 GB |
google/flan-ul2 |
n2-highmem-16 |
128 GB |
パイプラインを実行します。
python main.py \
--messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
--responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
--model-name="MODEL_NAME" \
--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
--runner="DataflowRunner" \
--project="PROJECT_ID" \
--temp_location="gs://BUCKET_NAME/temp" \
--region="REGION" \
--machine_type="DATAFLOW_MACHINE_TYPE" \
--requirements_file="requirements.txt" \
--requirements_cache="skip" \
--experiments="use_sibling_sdk_workers" \
--experiments="no_use_multiple_sdk_containers"
次のように置き換えます。
- PROJECT_ID: プロジェクト ID
- PROMPTS_TOPIC_ID: モデルに送信する入力プロンプトのトピック ID
- RESPONSES_TOPIC_ID: モデルのレスポンスのトピック ID
- MODEL_NAME: モデルの名前(例:
google/flan-t5-xl
) - BUCKET_NAME: バケットの名前
- REGION: ジョブをデプロイするリージョン(
us-central1
など) - DATAFLOW_MACHINE_TYPE: パイプラインを実行する VM(
n2-highmem-4
など)
モデルがワーカーごとに 1 回だけ読み込まれ、メモリが不足しないようにするには、パイプライン オプション --experiments=no_use_multiple_sdk_containers
を設定して、単一プロセスを使用するようにワーカーを構成します。RunInference
変換は同じモデルを複数のスレッドと共有するため、スレッド数を制限する必要はありません。
この例のパイプラインは CPU で実行されます。モデルが大きいほど、各リクエストの処理に時間がかかります。レスポンスを高速化する必要がある場合は、GPU を有効にします。
パイプラインのステータスを表示するには、Dataflow の [ジョブ] ページに移動します。
モデルに質問する
パイプラインの実行が開始されたら、モデルにプロンプトを提供し、レスポンスを取得します。
Pub/Sub にメッセージをパブリッシュして、プロンプトを送信します。
gcloud pubsub topics publish
コマンドを実行します。gcloud pubsub topics publish PROMPTS_TOPIC_ID \ --message="PROMPT_TEXT"
PROMPT_TEXT
は、表示するプロンプトを含む文字列に置き換えます。プロンプトは引用符で囲みます。独自のプロンプトを使用するか、次のいずれかの例を試してください。
Translate to Spanish: My name is Luka
Complete this sentence: Once upon a time, there was a
Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
レスポンスを取得するには、
gcloud pubsub subscriptions pull
コマンドを使用します。モデルのサイズによっては、モデルがレスポンスを生成するまでに数分かかることがあります。モデルが大きいほど、デプロイとレスポンスの生成に時間がかかります。
gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
RESPONSES_SUBSCRIPTION_ID
は、モデルのレスポンスのサブスクリプション ID に置き換えます。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
-
Python 仮想環境を終了します。
deactivate
-
パイプラインを停止します。
-
実行中の Dataflow ジョブのジョブ ID を一覧表示し、チュートリアルのジョブのジョブ ID をメモします。
gcloud dataflow jobs list --region=REGION --status=active
-
ジョブをキャンセルします。
gcloud dataflow jobs cancel JOB_ID --region=REGION
-
-
バケットとその内容を削除します。
gcloud storage rm gs://BUCKET_NAME --recursive
-
トピックとサブスクリプションを削除します。
gcloud pubsub topics delete PROMPTS_TOPIC_ID gcloud pubsub topics delete RESPONSES_TOPIC_ID gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
-
Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/aiplatform.user
gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
省略可: Google アカウントのロールを取り消します。
gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
次のステップ
- Dataflow ML について調べる。
- RunInference API の詳細を確認する。
- Apache Beam の AI / ML パイプラインのドキュメントで、Apache Beam で ML を使用する詳細情報を確認する。
- 「生成 AI に RunInference を使用する」ノートブックで学習する。
- Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud アーキテクチャ センターをご覧ください。