本教程介绍如何使用 Apache Beam RunInference API 在流处理 Dataflow 流水线中运行大语言模型 (LLM)。
如需详细了解 RunInference API,请参阅 Apache Beam 文档中的 Beam ML 简介。
您可以在 GitHub 上找到示例代码。
目标
- 为模型的输入和回答创建 Pub/Sub 主题和订阅。
- 使用 Vertex AI 自定义作业将模型加载到 Cloud Storage 中。
- 运行流水线。
- 向模型提问并获取回答。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
在具有至少 5 GB 可用磁盘空间的机器上运行本教程以安装依赖项。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/aiplatform.user
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
替换以下内容:
PROJECT_ID
:您的项目 ID。PROJECT_NUMBER
:您的项目编号。 如需查找您的项目编号,请使用gcloud projects describe
命令。SERVICE_ACCOUNT_ROLE
:每个角色。
- 复制 Google Cloud 项目 ID。您在本教程的后面部分需要用到此值。
创建 Google Cloud 资源
本部分介绍如何创建以下资源:
- 用作临时存储位置的 Cloud Storage 存储桶
- 用于模型提示的 Pub/Sub 主题
- 用于模型响应的 Pub/Sub 主题和订阅
创建 Cloud Storage 存储桶
使用 gcloud CLI 创建 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。
如需创建存储桶,请使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
替换以下内容:
复制存储桶名称。您在本教程的后面部分需要用到此值。
创建 Pub/Sub 主题和订阅
创建两个 Pub/Sub 主题和一个订阅。一个主题用于发送到模型的输入提示。另一个主题及其附加的订阅用于模型的回答。
如需创建主题,请运行
gcloud pubsub topics create
命令两次,针对每个主题各一次:gcloud pubsub topics create PROMPTS_TOPIC_ID gcloud pubsub topics create RESPONSES_TOPIC_ID
替换以下内容:
- PROMPTS_TOPIC_ID:要发送给模型的输入提示的主题 ID,例如
prompts
- RESPONSES_TOPIC_ID:模型回答的主题 ID,例如
responses
- PROMPTS_TOPIC_ID:要发送给模型的输入提示的主题 ID,例如
如需创建订阅并将其附加到回答主题,请使用
gcloud pubsub subscriptions create
命令:gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
将 RESPONSES_SUBSCRIPTION_ID 替换为模型回答的订阅 ID,例如
responses-subscription
。
复制主题 ID 和订阅 ID。您在本教程的后面部分需要用到这些值。
准备环境
下载代码示例,然后设置您的环境以运行教程。
python-docs-samples GitHub 代码库中的代码示例提供了运行此流水线所需的代码。当您准备好构建自己的流水线时,可以使用此示例代码作为模板。
您可以使用 venv 创建隔离的 Python 虚拟环境来运行流水线项目。借助虚拟环境,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。如需详细了解如何安装 Python 并创建虚拟环境,请参阅设置 Python 开发环境。
使用
git clone
命令克隆 GitHub 代码库:git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
导航到
run-inference
目录:cd python-docs-samples/dataflow/run-inference
如果您使用的是命令提示符,请检查系统中是否已运行 Python 3 和
pip
:python --version python -m pip --version
根据需要安装 Python 3。
如果您使用的是 Cloud Shell,则可以跳过此步骤,因为 Cloud Shell 已安装 Python。
创建 Python 虚拟环境:
python -m venv /tmp/env source /tmp/env/bin/activate
安装依赖项:
pip install -r requirements.txt --no-cache-dir
模型加载代码示例
本教程中的模型加载代码会启动 Vertex AI 自定义作业,以将模型的 state_dict
对象加载到 Cloud Storage 中。
起始文件如下所示:
流水线代码示例
本教程中的流水线代码部署了一个执行以下操作的 Dataflow 流水线:
- 从 Pub/Sub 读取提示,并将文本编码为令牌张量。
- 运行
RunInference
转换。 - 将输出词元张量解码为文本并将响应写入 Pub/Sub。
起始文件如下所示:
加载模型
LLM 可能是非常大的模型。使用更多参数训练的较大的模型通常可提供更好的结果。不过,较大的模型需要更大的机器和更多内存才能运行。较大的模型在 CPU 上的运行速度可能较慢。
在 Dataflow 上运行 PyTorch 模型之前,您需要加载模型的 state_dict
对象。模型的 state_dict
对象会存储模型的权重。
在使用 Apache Beam RunInference
转换的 Dataflow 流水线中,模型的 state_dict
对象必须加载到 Cloud Storage 中。您用于将 state_dict
对象加载到 Cloud Storage 的机器需要有足够的内存来加载模型。该机器还需要具有快速互联网连接,以下载权重并将其上传到 Cloud Storage。
下表展示了每个模型的参数数量以及加载每个模型所需的最小内存。
模型 | 参数 | 所需内存 |
---|---|---|
google/flan-t5-small |
8000 万 | > 320 MB |
google/flan-t5-base |
2.5 亿 | > 1 GB |
google/flan-t5-large |
7.8 亿 | > 3.2 GB |
google/flan-t5-xl |
30 亿 | > 12 GB |
google/flan-t5-xxl |
110 亿 | > 44 GB |
google/flan-ul2 |
200 亿 | > 80 GB |
虽然您可以在本地加载较小的模型,但本教程介绍了如何启动 Vertex AI 自定义作业,以加载具有适当大小的虚拟机的模型。
由于 LLM 可能非常大,因此本教程中的示例将 state_dict
对象保存为 float16
格式,而不是默认 float32
格式。使用这种配置时,每个参数使用 16 位而不是 32 位,从而使 state_dict
对象的大小减半。模型越小,加载模型所需的时间就越短。不过,转换格式意味着虚拟机必须将模型和 state_dict
对象都放入内存中。
下表显示了在将 state_dict
对象保存为 float16
格式后,加载模型的最低要求。该表还显示了使用 Vertex AI 加载模型的建议机器类型。Vertex AI 的最小(默认)磁盘大小为 100 GB,但某些模型可能需要更大的磁盘。
模型名称 | 所需内存 | 机器类型 | 虚拟机内存 | 虚拟机磁盘 |
---|---|---|---|---|
google/flan-t5-small |
> 480 MB | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-base |
> 1.5 GB | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-large |
> 4.8 GB | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-xl |
> 18 GB | e2-highmem-4 |
32 GB | 100 GB |
google/flan-t5-xxl |
> 66 GB | e2-highmem-16 |
128 GB | 100 GB |
google/flan-ul2 |
> 120 GB | e2-highmem-16 |
128 GB | 150 GB |
使用 Vertex AI 自定义作业将模型的 state_dict
对象加载到 Cloud Storage 中:
python download_model.py vertex \
--model-name="MODEL_NAME" \
--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
--job-name="Load MODEL_NAME" \
--project="PROJECT_ID" \
--bucket="BUCKET_NAME" \
--location="LOCATION" \
--machine-type="VERTEX_AI_MACHINE_TYPE" \
--disk-size-gb="DISK_SIZE_GB"
替换以下内容:
- MODEL_NAME:模型的名称,例如
google/flan-t5-xl
。 - VERTEX_AI_MACHINE_TYPE:要在其中运行 Vertex AI 自定义作业的机器类型,例如
e2-highmem-4
。 - DISK_SIZE_GB:虚拟机的磁盘大小(以 GB 为单位)。最小大小为 100 GB。
加载模型可能需要几分钟,具体取决于模型的大小。如需查看状态,请进入 Vertex AI 自定义作业页面。
运行流水线
在加载模型后,您可以运行 Dataflow 流水线。为运行该流水线,每个工作器使用的模型和内存都必须放入内存中。
下表显示了运行推理流水线的推荐机器类型。
模型名称 | 机器类型 | 虚拟机内存 |
---|---|---|
google/flan-t5-small |
n2-highmem-2 |
16 GB |
google/flan-t5-base |
n2-highmem-2 |
16 GB |
google/flan-t5-large |
n2-highmem-4 |
32 GB |
google/flan-t5-xl |
n2-highmem-4 |
32 GB |
google/flan-t5-xxl |
n2-highmem-8 |
64 GB |
google/flan-ul2 |
n2-highmem-16 |
128 GB |
运行流水线:
python main.py \
--messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
--responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
--model-name="MODEL_NAME" \
--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
--runner="DataflowRunner" \
--project="PROJECT_ID" \
--temp_location="gs://BUCKET_NAME/temp" \
--region="REGION" \
--machine_type="DATAFLOW_MACHINE_TYPE" \
--requirements_file="requirements.txt" \
--requirements_cache="skip" \
--experiments="use_sibling_sdk_workers" \
--experiments="no_use_multiple_sdk_containers"
替换以下内容:
- PROJECT_ID:项目 ID
- PROMPTS_TOPIC_ID:要发送给模型的输入提示的主题 ID
- RESPONSES_TOPIC_ID:模型回答的主题 ID
- MODEL_NAME:模型的名称,例如
google/flan-t5-xl
- BUCKET_NAME:存储桶的名称
- REGION:要在其中部署作业的区域,例如
us-central1
- DATAFLOW_MACHINE_TYPE:要运行流水线的虚拟机,例如
n2-highmem-4
如需确保模型仅为每个工作器加载一次,并且不会耗尽内存,请通过设置流水线选项 --experiments=no_use_multiple_sdk_containers
将工作器配置为使用单个进程。您不必限制线程数,因为 RunInference
将转换与多个线程共享同一模型。
此示例中的流水线使用 CPU 运行。对于较大的模型,处理每个请求所需的时间会更长。如果您需要更快的回答速度,可以启用 GPU。
如需查看流水线的状态,请进入 Dataflow 作业页面。
向模型提问
流水线开始运行后,您可以向模型提供提示,并接收回答。
通过向 Pub/Sub 发布消息来发送提示。使用
gcloud pubsub topics publish
命令:gcloud pubsub topics publish PROMPTS_TOPIC_ID \ --message="PROMPT_TEXT"
将
PROMPT_TEXT
替换为包含您要提供的提示的字符串。请用英文引号将提示括起来。使用您自己的提示,或尝试以下示例之一:
Translate to Spanish: My name is Luka
Complete this sentence: Once upon a time, there was a
Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
如需获得响应,请使用
gcloud pubsub subscriptions pull
命令。模型可能需要几分钟才能生成响应,具体取决于模型的大小。较大的模型需要更长的时间来部署和生成响应。
gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
将
RESPONSES_SUBSCRIPTION_ID
替换为模型回答的订阅 ID。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
删除各个资源
-
退出 Python 虚拟环境:
deactivate
-
停止流水线:
-
列出正在运行的 Dataflow 作业的 ID,然后记下本教程作业的 ID:
gcloud dataflow jobs list --region=REGION --status=active
-
取消作业:
gcloud dataflow jobs cancel JOB_ID --region=REGION
-
-
删除存储桶及其中的所有对象:
gcloud storage rm gs://BUCKET_NAME --recursive
-
删除主题和订阅:
gcloud pubsub topics delete PROMPTS_TOPIC_ID gcloud pubsub topics delete RESPONSES_TOPIC_ID gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
-
撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/aiplatform.user
gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
可选:撤消您的 Google 账号的角色。
gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 探索 Dataflow ML。
- 详细了解 RunInference API。
- 参阅 Apache Beam AI/机器学习流水线文档,深入了解如何将机器学习与 Apache Beam 搭配使用。
- 完成将 RunInference 用于生成式 AI 笔记本。
- 探索有关 Google Cloud 的参考架构、图表和最佳做法。查看我们的 Cloud 架构中心。