在串流管道中執行 LLM

本教學課程說明如何使用 Apache Beam RunInference API,在串流 Dataflow 管道中執行大型語言模型 (LLM)。

如要進一步瞭解 RunInference API,請參閱 Apache Beam 說明文件中的「About Beam ML」。

您可以在 GitHub 取得範例程式碼

目標

  • 為模型的輸入內容和回覆建立 Pub/Sub 主題和訂閱項目。
  • 使用 Vertex AI 自訂作業將模型載入 Cloud Storage。
  • 執行管道。
  • 向模型提問並取得回覆。

費用

本文使用下列 Google Cloud Platform 計費元件:

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。

事前準備

請在可用磁碟空間至少有 5 GB 的電腦上執行本教學課程,以便安裝依附元件。

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. 如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI

  4. 如要初始化 gcloud CLI,請執行下列指令:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. 如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI

  12. 如要初始化 gcloud CLI,請執行下列指令:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. 將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • PROJECT_NUMBER:您的專案編號。 如要找出專案編號,請使用 gcloud projects describe 指令
    • SERVICE_ACCOUNT_ROLE:每個角色。
  19. 複製 Google Cloud 專案 ID。本教學課程稍後會用到這個值。
  20. 建立 Google Cloud Platform 資源

    本節說明如何建立下列資源:

    • 做為暫時儲存位置的 Cloud Storage bucket
    • 模型提示的 Pub/Sub 主題
    • 模型回覆的 Pub/Sub 主題和訂閱項目

    建立 Cloud Storage 值區

    使用 gcloud CLI 建立 Cloud Storage bucket。 這個 bucket 是 Dataflow 管道的暫時儲存位置。

    如要建立值區,請使用 gcloud storage buckets create 指令

    gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
    

    更改下列內容:

    • BUCKET_NAME:Cloud Storage bucket 的名稱,必須符合bucket 命名規定。Cloud Storage bucket 名稱不得重複。
    • LOCATION:值區的位置

    複製 bucket 名稱。本教學課程稍後會用到這個值。

    建立 Pub/Sub 主題和訂閱項目

    建立兩個 Pub/Sub 主題和一個訂閱項目。其中一個主題是您傳送給模型的輸入提示。另一個主題及其附加的訂閱項目則用於模型回覆。

    1. 如要建立主題,請執行 gcloud pubsub topics create 指令兩次,每個主題執行一次:

      gcloud pubsub topics create PROMPTS_TOPIC_ID
      gcloud pubsub topics create RESPONSES_TOPIC_ID
      

      更改下列內容:

      • PROMPTS_TOPIC_ID:要傳送至模型的主題 ID,例如 prompts
      • RESPONSES_TOPIC_ID:模型回覆的主題 ID,例如 responses
    2. 如要建立訂閱並附加至回覆主題,請使用 gcloud pubsub subscriptions create 指令

      gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
      

      請將 RESPONSES_SUBSCRIPTION_ID 替換為模型回覆的訂閱 ID,例如 responses-subscription

    複製主題 ID 和訂閱 ID。您會在後續步驟中用到這些值。

    準備環境

    下載程式碼範例,然後設定環境來執行本教學課程。

    python-docs-samples GitHub 存放區中的程式碼範例提供執行這項管道所需的程式碼。準備好建構自己的管道時,您可以將這段程式碼範例做為範本。

    您可以使用 venv 建立獨立的 Python 虛擬環境,以便執行管道專案。虛擬環境可讓您將一個專案的依附元件與其他專案的依附元件區隔開來。如要進一步瞭解如何安裝 Python 及建立虛擬環境,請參閱設定 Python 開發環境

    1. 使用 git clone 指令複製 GitHub 存放區:

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      
    2. 請前往 run-inference 目錄:

      cd python-docs-samples/dataflow/run-inference
      
    3. 如果您使用命令提示字元,請確認系統中已執行 Python 3 和 pip

      python --version
      python -m pip --version
      

      視需要安裝 Python 3

      如果您使用 Cloud Shell,可以略過這個步驟,因為 Cloud Shell 已安裝 Python。

    4. 建立 Python 虛擬環境

      python -m venv /tmp/env
      source /tmp/env/bin/activate
      
    5. 安裝依附元件:

      pip install -r requirements.txt --no-cache-dir
      

    模型載入程式碼範例

    本教學課程中的模型載入程式碼會啟動 Vertex AI 自訂工作,將模型的 state_dict 物件載入 Cloud Storage。

    範例檔案如下所示:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Loads the state_dict for an LLM model into Cloud Storage."""
    
    from __future__ import annotations
    
    import os
    
    import torch
    from transformers import AutoModelForSeq2SeqLM
    
    
    def run_local(model_name: str, state_dict_path: str) -> None:
        """Loads the state dict and saves it into the desired path.
    
        If the `state_dict_path` is a Cloud Storage location starting
        with "gs://", this assumes Cloud Storage is mounted with
        Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        """
        print(f"Loading model: {model_name}")
        model = AutoModelForSeq2SeqLM.from_pretrained(
            model_name, torch_dtype=torch.bfloat16
        )
        print(f"Model loaded, saving state dict to: {state_dict_path}")
    
        # Assume Cloud Storage FUSE is mounted in `/gcs`.
        state_dict_path = state_dict_path.replace("gs://", "/gcs/")
        directory = os.path.dirname(state_dict_path)
        if directory and not os.path.exists(directory):
            os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
        torch.save(model.state_dict(), state_dict_path)
        print("State dict saved successfully!")
    
    
    def run_vertex_job(
        model_name: str,
        state_dict_path: str,
        job_name: str,
        project: str,
        bucket: str,
        location: str = "us-central1",
        machine_type: str = "e2-highmem-2",
        disk_size_gb: int = 100,
    ) -> None:
        """Launches a Vertex AI custom job to load the state dict.
    
        If the model is too large to fit into memory or disk, we can launch
        a Vertex AI custom job with a large enough VM for this to work.
    
        Depending on the model's size, it might require a different VM
        configuration. The model MUST fit into the VM's memory, and there
        must be enough disk space to stage the entire model while it gets
        copied to Cloud Storage.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            job_name: Job display name in the Vertex AI console.
            project: Google Cloud Project ID.
            bucket: Cloud Storage bucket name, without the "gs://" prefix.
            location: Google Cloud regional location.
            machine_type: Machine type for the VM to run the job.
            disk_size_gb: Disk size in GB for the VM to run the job.
        """
        from google.cloud import aiplatform
    
        aiplatform.init(project=project, staging_bucket=bucket, location=location)
    
        job = aiplatform.CustomJob.from_local_script(
            display_name=job_name,
            container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
            script_path="download_model.py",
            args=[
                "local",
                f"--model-name={model_name}",
                f"--state-dict-path={state_dict_path}",
            ],
            machine_type=machine_type,
            boot_disk_size_gb=disk_size_gb,
            requirements=["transformers"],
        )
        job.run()
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        subparsers = parser.add_subparsers(required=True)
    
        parser_local = subparsers.add_parser("local")
        parser_local.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_local.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_local.set_defaults(run=run_local)
    
        parser_vertex = subparsers.add_parser("vertex")
        parser_vertex.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_vertex.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_vertex.add_argument(
            "--job-name", required=True, help="Job display name in the Vertex AI console"
        )
        parser_vertex.add_argument(
            "--project", required=True, help="Google Cloud Project ID"
        )
        parser_vertex.add_argument(
            "--bucket",
            required=True,
            help='Cloud Storage bucket name, without the "gs://" prefix',
        )
        parser_vertex.add_argument(
            "--location", default="us-central1", help="Google Cloud regional location"
        )
        parser_vertex.add_argument(
            "--machine-type",
            default="e2-highmem-2",
            help="Machine type for the VM to run the job",
        )
        parser_vertex.add_argument(
            "--disk-size-gb",
            type=int,
            default=100,
            help="Disk size in GB for the VM to run the job",
        )
        parser_vertex.set_defaults(run=run_vertex_job)
    
        args = parser.parse_args()
        kwargs = args.__dict__.copy()
        kwargs.pop("run")
    
        args.run(**kwargs)
    

    管道程式碼範例

    本教學課程中的管道程式碼會部署 Dataflow 管道,執行下列操作:

    • 從 Pub/Sub 讀取提示,並將文字編碼為權杖張量。
    • 執行 RunInference 轉換。
    • 將輸出權杖張量解碼為文字,並將回覆寫入 Pub/Sub。

    範例檔案如下所示:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Runs a streaming RunInference Language Model pipeline."""
    
    from __future__ import annotations
    
    import logging
    
    import apache_beam as beam
    from apache_beam.ml.inference.base import PredictionResult
    from apache_beam.ml.inference.base import RunInference
    from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
    from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
    from apache_beam.options.pipeline_options import PipelineOptions
    import torch
    from transformers import AutoConfig
    from transformers import AutoModelForSeq2SeqLM
    from transformers import AutoTokenizer
    from transformers.tokenization_utils import PreTrainedTokenizer
    
    MAX_RESPONSE_TOKENS = 256
    
    
    def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
        """Encodes input text into token tensors.
    
        Args:
            input_text: Input text for the language model.
            tokenizer: Tokenizer for the language model.
    
        Returns: Tokenized input tokens.
        """
        return tokenizer(input_text, return_tensors="pt").input_ids[0]
    
    
    def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
        """Decodes output token tensors into text.
    
        Args:
            result: Prediction results from the RunInference transform.
            tokenizer: Tokenizer for the language model.
    
        Returns: The model's response as text.
        """
        output_tokens = result.inference
        return tokenizer.decode(output_tokens, skip_special_tokens=True)
    
    
    class AskModel(beam.PTransform):
        """Asks an language model a prompt message and gets its responses.
    
        Attributes:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            max_response_tokens: Maximum number of tokens for the model to generate.
        """
    
        def __init__(
            self,
            model_name: str,
            state_dict_path: str,
            max_response_tokens: int = MAX_RESPONSE_TOKENS,
        ) -> None:
            self.model_handler = PytorchModelHandlerTensor(
                state_dict_path=state_dict_path,
                model_class=AutoModelForSeq2SeqLM.from_config,
                model_params={"config": AutoConfig.from_pretrained(model_name)},
                inference_fn=make_tensor_model_fn("generate"),
            )
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.max_response_tokens = max_response_tokens
    
        def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
            return (
                pcollection
                | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
                | "RunInference"
                >> RunInference(
                    self.model_handler,
                    inference_args={"max_new_tokens": self.max_response_tokens},
                )
                | "Get response" >> beam.Map(decode_response, self.tokenizer)
            )
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--messages-topic",
            required=True,
            help="Pub/Sub topic for input text messages",
        )
        parser.add_argument(
            "--responses-topic",
            required=True,
            help="Pub/Sub topic for output text responses",
        )
        parser.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        args, beam_args = parser.parse_known_args()
    
        logging.getLogger().setLevel(logging.INFO)
        beam_options = PipelineOptions(
            beam_args,
            pickle_library="cloudpickle",
            streaming=True,
        )
    
        simple_name = args.model_name.split("/")[-1]
        pipeline = beam.Pipeline(options=beam_options)
        _ = (
            pipeline
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
            | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
            | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
            | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
            | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
        )
        pipeline.run()
    

    載入模型

    大型語言模型可能非常龐大,以更多參數訓練的大型模型通常能提供更優異的結果。不過,大型模型需要較大的機器和更多記憶體才能執行。在 CPU 上執行大型模型時,速度也可能較慢。

    在 Dataflow 上執行 PyTorch 模型之前,您需要載入模型的 state_dict 物件。模型的state_dict 物件會儲存模型的權重。

    在使用 Apache Beam RunInference 轉換的 Dataflow 管道中,模型的 state_dict 物件必須載入 Cloud Storage。用來將 state_dict 物件載入至 Cloud Storage 的機器,必須有足夠的記憶體來載入模型。機器也需要快速的網際網路連線,才能下載權重並上傳至 Cloud Storage。

    下表顯示每個模型的參數數量,以及載入每個模型所需的最低記憶體。

    模型 參數 所需記憶體
    google/flan-t5-small 8,000 萬 > 320 MB
    google/flan-t5-base 2.5 億 > 1 GB
    google/flan-t5-large 7.8 億 > 3.2 GB
    google/flan-t5-xl 30 億 > 12 GB
    google/flan-t5-xxl 110 億次 > 44 GB
    google/flan-ul2 200 億次 > 80 GB

    雖然您可以在本機載入較小的模型,但本教學課程會說明如何啟動 Vertex AI 自訂工作,並使用適當大小的 VM 載入模型。

    由於 LLM 可能非常龐大,本教學課程中的範例會將 state_dict 物件儲存為 float16 格式,而非預設的 float32 格式。完成這項設定後,每個參數會使用 16 位元而非 32 位元,因此 state_dict 物件的大小會減半。模型越小,載入所需的時間就越短。不過,轉換格式表示 VM 必須將模型和 state_dict 物件都放入記憶體。

    下表列出將 state_dict 物件儲存為 float16 格式後,載入模型的最低需求。表格中也會顯示建議的機器類型,方便您使用 Vertex AI 載入模型。Vertex AI 的磁碟大小下限 (也是預設值) 為 100 GB,但部分模型可能需要更大的磁碟。

    模型名稱 所需記憶體 機型 VM 記憶體 VM 磁碟
    google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
    google/flan-t5-base > 1.5 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-large > 4.8 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
    google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
    google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

    使用 Vertex AI 自訂工作,將模型的 state_dict 物件載入 Cloud Storage:

    python download_model.py vertex \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --job-name="Load MODEL_NAME" \
        --project="PROJECT_ID" \
        --bucket="BUCKET_NAME" \
        --location="LOCATION" \
        --machine-type="VERTEX_AI_MACHINE_TYPE" \
        --disk-size-gb="DISK_SIZE_GB"
    

    更改下列內容:

    • MODEL_NAME:模型名稱,例如 google/flan-t5-xl
    • VERTEX_AI_MACHINE_TYPE:用於執行 Vertex AI 自訂作業的機器類型,例如 e2-highmem-4
    • DISK_SIZE_GB:VM 的磁碟大小 (單位為 GB)。大小下限為 100 GB。

    視模型大小而定,載入模型可能需要幾分鐘。如要查看狀態,請前往 Vertex AI「自訂工作」頁面。

    前往「自訂工作」

    執行管道

    載入模型後,請執行 Dataflow pipeline。如要執行管道,模型和每個工作站使用的記憶體都必須符合記憶體容量。

    下表列出建議用來執行推論管道的機器類型。

    模型名稱 機型 VM 記憶體
    google/flan-t5-small n2-highmem-2 16 GB
    google/flan-t5-base n2-highmem-2 16 GB
    google/flan-t5-large n2-highmem-4 32 GB
    google/flan-t5-xl n2-highmem-4 32 GB
    google/flan-t5-xxl n2-highmem-8 64 GB
    google/flan-ul2 n2-highmem-16 128 GB

    執行管道:

    python main.py \
        --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
        --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --runner="DataflowRunner" \
        --project="PROJECT_ID" \
        --temp_location="gs://BUCKET_NAME/temp" \
        --region="REGION" \
        --machine_type="DATAFLOW_MACHINE_TYPE" \
        --requirements_file="requirements.txt" \
        --requirements_cache="skip" \
        --experiments="use_sibling_sdk_workers" \
        --experiments="no_use_multiple_sdk_containers"
    

    更改下列內容:

    • PROJECT_ID:專案 ID
    • PROMPTS_TOPIC_ID:要傳送至模型的輸入提示主題 ID
    • RESPONSES_TOPIC_ID:模型回覆的主題 ID
    • MODEL_NAME:模型名稱,例如 google/flan-t5-xl
    • BUCKET_NAME:值區名稱
    • REGION:要部署作業的區域,例如 us-central1
    • DATAFLOW_MACHINE_TYPE:要執行管道的 VM,例如 n2-highmem-4

    為確保每個工作站只載入一次模型,且不會耗盡記憶體,請設定管道選項 --experiments=no_use_multiple_sdk_containers,讓工作站使用單一程序。您不必限制執行緒數量,因為 RunInference 轉換會與多個執行緒共用同一個模型。

    本範例中的管道會使用 CPU 執行。模型越大,處理每項要求所需的時間就越長。如需更快速的回覆,可以啟用 GPU

    如要查看管道狀態,請前往 Dataflow 的「Jobs」(工作) 頁面。

    前往「Jobs」(工作) 頁面

    向模型提問

    pipeline 開始執行後,您就可以向模型提供提示並接收回覆。

    1. 將訊息發布至 Pub/Sub,即可傳送提示。使用 gcloud pubsub topics publish 指令

      gcloud pubsub topics publish PROMPTS_TOPIC_ID \
          --message="PROMPT_TEXT"
      

      PROMPT_TEXT 替換為包含您要提供的提示的字串。在提示前後加上引號。

      使用自己的提示,或試試下列任一範例:

      • Translate to Spanish: My name is Luka
      • Complete this sentence: Once upon a time, there was a
      • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
    2. 如要取得回覆,請使用 gcloud pubsub subscriptions pull 指令

      視模型大小而定,模型可能需要幾分鐘才能生成回覆。模型越大,部署和生成回覆的時間就越長。

      gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
      

      請將 RESPONSES_SUBSCRIPTION_ID 替換為模型回覆的訂閱 ID。

    清除所用資源

    如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。

    刪除專案

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    刪除個別資源

    1. 結束 Python 虛擬環境:

      deactivate
    2. 停止管道:

      1. 列出正在執行的 Dataflow 工作 ID,然後記下本教學課程工作的 ID:

        gcloud dataflow jobs list --region=REGION --status=active
      2. 取消工作:

        gcloud dataflow jobs cancel JOB_ID --region=REGION
    3. 刪除 bucket 和當中內容:

      gcloud storage rm gs://BUCKET_NAME --recursive
    4. 刪除主題和訂閱項目:

      gcloud pubsub topics delete PROMPTS_TOPIC_ID
      gcloud pubsub topics delete RESPONSES_TOPIC_ID
      gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
    5. 撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色,執行一次下列指令:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.admin
      • roles/pubsub.editor
      • roles/aiplatform.user
      gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
    6. 選用:從 Google 帳戶撤銷角色。

      gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
    7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

      gcloud auth application-default revoke
    8. Optional: Revoke credentials from the gcloud CLI.

      gcloud auth revoke

    後續步驟

    • 探索 Dataflow ML
    • 進一步瞭解 RunInference API
    • 如要深入瞭解如何搭配使用機器學習與 Apache Beam,請參閱 Apache Beam AI/機器學習管道說明文件。
    • 完成筆記本「Use RunInference for Generative AI」(將 RunInference 用於生成式 AI)
    • 探索 Google Cloud 的參考架構、圖表和最佳做法。 歡迎瀏覽我們的雲端架構中心