在流式传输流水线中运行 LLM


本教程介绍如何使用 Apache Beam RunInference API 在流处理 Dataflow 流水线中运行大语言模型 (LLM)。

如需详细了解 RunInference API,请参阅 Apache Beam 文档中的 Beam ML 简介

可以在 GitHub 上找到示例代码。

目标

  • 为模型的输入和回答创建 Pub/Sub 主题和订阅。
  • 使用 Vertex AI 自定义作业将模型加载到 Cloud Storage 中。
  • 运行流水线。
  • 向模型提问并获取回答。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

在具有至少 5 GB 可用磁盘空间的机器上运行本教程以安装依赖项。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. 向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • PROJECT_NUMBER:您的项目编号。 如需查找您的项目编号,请使用 gcloud projects describe 命令
    • SERVICE_ACCOUNT_ROLE:每个角色。
  17. 复制 Google Cloud 项目 ID。您在本教程的后面部分需要用到此值。

创建 Google Cloud 资源

本部分介绍如何创建以下资源:

  • 用作临时存储位置的 Cloud Storage 存储桶
  • 用于模型提示的 Pub/Sub 主题
  • 用于模型响应的 Pub/Sub 主题和订阅

创建 Cloud Storage 存储桶

使用 gcloud CLI 创建 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。

如需创建存储桶,请使用 gcloud storage buckets create 命令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

替换以下内容:

  • BUCKET_NAME:符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
  • LOCATION:存储桶的位置

复制存储桶名称。您在本教程的后面部分需要用到此值。

创建 Pub/Sub 主题和订阅

创建两个 Pub/Sub 主题和一个订阅。一个主题用于发送到模型的输入提示。另一个主题及其附加的订阅用于模型的回答。

  1. 如需创建主题,请运行 gcloud pubsub topics create 命令两次,针对每个主题各一次:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    替换以下内容:

    • PROMPTS_TOPIC_ID:要发送给模型的输入提示的主题 ID,例如 prompts
    • RESPONSES_TOPIC_ID:模型回答的主题 ID,例如 responses
  2. 如需创建订阅并将其附加到回答主题,请使用 gcloud pubsub subscriptions create 命令

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    RESPONSES_SUBSCRIPTION_ID 替换为模型回答的订阅 ID,例如 responses-subscription

复制主题 ID 和订阅 ID。您在本教程的后面部分需要用到这些值。

准备环境

下载代码示例,然后设置您的环境以运行教程。

python-docs-samples GitHub 代码库中的代码示例提供了运行此流水线所需的代码。当您准备好构建自己的流水线时,可以使用此示例代码作为模板。

您可以使用 venv 创建隔离的 Python 虚拟环境来运行流水线项目。借助虚拟环境,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。如需详细了解如何安装 Python 并创建虚拟环境,请参阅设置 Python 开发环境

  1. 使用 git clone 命令克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. 导航到 run-inference 目录:

    cd python-docs-samples/dataflow/run-inference
    
  3. 如果您使用的是命令提示符,请检查系统中是否已运行 Python 3 和 pip

    python --version
    python -m pip --version
    

    根据需要安装 Python 3

    如果您使用的是 Cloud Shell,则可以跳过此步骤,因为 Cloud Shell 已安装 Python。

  4. 创建 Python 虚拟环境

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. 安装依赖项:

    pip install -r requirements.txt --no-cache-dir
    

模型加载代码示例

本教程中的模型加载代码会启动 Vertex AI 自定义作业,以将模型的 state_dict 对象加载到 Cloud Storage 中。

起始文件如下所示:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

流水线代码示例

本教程中的流水线代码部署了一个执行以下操作的 Dataflow 流水线:

  • 从 Pub/Sub 读取提示,并将文本编码为令牌张量。
  • 运行 RunInference 转换。
  • 将输出词元张量解码为文本并将响应写入 Pub/Sub。

起始文件如下所示:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

加载模型

LLM 可能是非常大的模型。使用更多参数训练的较大的模型通常可提供更好的结果。不过,较大的模型需要更大的机器和更多内存才能运行。较大的模型在 CPU 上的运行速度可能较慢。

在 Dataflow 上运行 PyTorch 模型之前,您需要加载模型的 state_dict 对象。模型的 state_dict 对象会存储模型的权重。

在使用 Apache Beam RunInference 转换的 Dataflow 流水线中,模型的 state_dict 对象必须加载到 Cloud Storage 中。您用于将 state_dict 对象加载到 Cloud Storage 的机器需要有足够的内存来加载模型。该机器还需要具有快速互联网连接,以下载权重并将其上传到 Cloud Storage。

下表展示了每个模型的参数数量以及加载每个模型所需的最小内存。

模型 参数 所需内存
google/flan-t5-small 8000 万 > 320 MB
google/flan-t5-base 2.5 亿 > 1 GB
google/flan-t5-large 7.8 亿 > 3.2 GB
google/flan-t5-xl 30 亿 > 12 GB
google/flan-t5-xxl 110 亿 > 44 GB
google/flan-ul2 200 亿 > 80 GB

虽然您可以在本地加载较小的模型,但本教程介绍了如何启动 Vertex AI 自定义作业,以加载具有适当大小的虚拟机的模型。

由于 LLM 可能非常大,因此本教程中的示例将 state_dict 对象保存为 float16 格式,而不是默认 float32 格式。使用这种配置时,每个参数使用 16 位而不是 32 位,从而使 state_dict 对象的大小减半。模型越小,加载模型所需的时间就越短。不过,转换格式意味着虚拟机必须将模型和 state_dict 对象都放入内存中。

下表显示了在将 state_dict 对象保存为 float16 格式后,加载模型的最低要求。该表还显示了使用 Vertex AI 加载模型的建议机器类型。Vertex AI 的最小(默认)磁盘大小为 100 GB,但某些模型可能需要更大的磁盘。

模型名称 所需内存 机器类型 虚拟机内存 虚拟机磁盘
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1.5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4.8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

使用 Vertex AI 自定义作业将模型的 state_dict 对象加载到 Cloud Storage 中:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

替换以下内容:

  • MODEL_NAME:模型的名称,例如 google/flan-t5-xl
  • VERTEX_AI_MACHINE_TYPE:要在其中运行 Vertex AI 自定义作业的机器类型,例如 e2-highmem-4
  • DISK_SIZE_GB:虚拟机的磁盘大小(以 GB 为单位)。最小大小为 100 GB。

加载模型可能需要几分钟,具体取决于模型的大小。如需查看状态,请进入 Vertex AI 自定义作业页面。

打开“自定义作业”

运行流水线

在加载模型后,您可以运行 Dataflow 流水线。为运行该流水线,每个工作器使用的模型和内存都必须放入内存中。

下表显示了运行推理流水线的推荐机器类型。

模型名称 机器类型 虚拟机内存
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

运行流水线:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

替换以下内容:

  • PROJECT_ID:项目 ID
  • PROMPTS_TOPIC_ID:要发送给模型的输入提示的主题 ID
  • RESPONSES_TOPIC_ID:模型回答的主题 ID
  • MODEL_NAME:模型的名称,例如 google/flan-t5-xl
  • BUCKET_NAME:存储桶的名称
  • REGION:要在其中部署作业的区域,例如 us-central1
  • DATAFLOW_MACHINE_TYPE:要运行流水线的虚拟机,例如 n2-highmem-4

如需确保模型仅为每个工作器加载一次,并且不会耗尽内存,请通过设置流水线选项 --experiments=no_use_multiple_sdk_containers 将工作器配置为使用单个进程。您不必限制线程数,因为 RunInference 将转换与多个线程共享同一模型。

此示例中的流水线使用 CPU 运行。对于较大的模型,处理每个请求所需的时间会更长。如果您需要更快的回答速度,可以启用 GPU

如需查看流水线的状态,请进入 Dataflow 作业页面。

进入“作业”

向模型提问

流水线开始运行后,您可以向模型提供提示,并接收回答。

  1. 通过向 Pub/Sub 发布消息来发送提示。使用 gcloud pubsub topics publish 命令

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    PROMPT_TEXT 替换为包含您要提供的提示的字符串。请用英文引号将提示括起来。

    使用您自己的提示,或尝试以下示例之一:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. 如需获得响应,请使用 gcloud pubsub subscriptions pull 命令

    模型可能需要几分钟才能生成响应,具体取决于模型的大小。较大的模型需要更长的时间来部署和生成响应。

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    RESPONSES_SUBSCRIPTION_ID 替换为模型回答的订阅 ID。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

删除各个资源

  1. 退出 Python 虚拟环境:

    deactivate
  2. 停止流水线:

    1. 列出正在运行的 Dataflow 作业的 ID,然后记下本教程作业的 ID:

      gcloud dataflow jobs list --region=REGION --status=active
    2. 取消作业:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. 删除存储桶及其中的所有对象:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. 删除主题和订阅:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. 撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. 可选:撤消您的 Google 账号的角色。

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

后续步骤