LLM in einer Streaming-Pipeline ausführen


In dieser Anleitung erfahren Sie, wie Sie mithilfe der Apache Beam RunInference API ein großes Sprachmodell (LLM) in einer Dataflow-Streamingpipeline ausführen.

Weitere Informationen zur RunInference API finden Sie unter Informationen zu Beam ML in der Apache Beam-Dokumentation.

Der Beispielcode ist auf GitHub verfügbar.

Lernziele

  • Pub/Sub-Themen und -Abos für die Eingabe und Antworten des Modells erstellen
  • Laden Sie das Modell mithilfe eines benutzerdefinierten Vertex AI-Jobs in Cloud Storage.
  • Pipeline ausführen.
  • Stellen Sie dem Modell eine Frage und erhalten Sie eine Antwort.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

Führen Sie diese Anleitung auf einem Computer aus, auf dem mindestens 5 GB freier Speicherplatz zur Installation der Abhängigkeiten verfügbar sind.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  13. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • PROJECT_NUMBER: Ihre Projektnummer. Ihre Projektnummer finden Sie mit dem Befehl gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: Jede einzelne Rolle.
  17. Kopieren Sie die Google Cloud-Projekt-ID. Sie benötigen diesen Wert später in dieser Anleitung.

Google Cloud-Ressourcen erstellen

In diesem Abschnitt werden die folgenden Ressourcen erstellt:

  • Ein Cloud Storage-Bucket, der als temporärer Speicherort verwendet werden soll
  • Ein Pub/Sub-Thema für die Prompts des Modells
  • Ein Pub/Sub-Thema und ein Abo für die Antworten des Modells

Cloud Storage-Bucket erstellen

Erstellen Sie mit der gcloud CLI einen Cloud Storage-Bucket. Dieser Bucket wird von der Dataflow-Pipeline als temporärer Speicherort verwendet.

Verwenden Sie den Befehl gcloud storage buckets create, um den Bucket zu erstellen:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ersetzen Sie Folgendes:

Kopieren Sie den Bucket-Namen. Sie benötigen diesen Wert später in dieser Anleitung.

Pub/Sub-Themen und -Abos erstellen

Erstellen Sie zwei Pub/Sub-Themen und ein Abo. Ein Thema ist für die Prompts, die Sie an das Modell senden. Das andere Thema und das zugehörige Abo wird für die Antworten des Modells genutzt.

  1. Führen Sie zum Erstellen der Themen den Befehl gcloud pubsub topics create zweimal aus, einmal für jedes Thema:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Ersetzen Sie Folgendes:

    • PROMPTS_TOPIC_ID: Die Themen-ID für die Prompts, die an das Modell gesendet werden sollen, z. B. prompts
    • RESPONSES_TOPIC_ID: Die Themen-ID für die Antworten des Modells, z. B. responses
  2. Verwenden Sie den Befehl gcloud pubsub subscriptions create, um das Abo zu erstellen und an das Antwortthema anzuhängen:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Ersetzen Sie RESPONSES_SUBSCRIPTION_ID durch die Abo-ID für die Antworten des Modells, z. B. responses-subscription.

Kopieren Sie die Themen-IDs und die Abo-ID. Sie benötigen diese Werte später in dieser Anleitung.

Umgebung vorbereiten

Laden Sie die Codebeispiele herunter und richten Sie Ihre Umgebung für die Ausführung der Anleitung ein.

Die Codebeispiele im GitHub-Repository "python-docs-samples" enthalten den Code, den Sie zum Ausführen dieser Pipeline benötigen. Wenn Sie bereit sind, Ihre eigene Pipeline zu erstellen, können Sie diesen Beispielcode als Vorlage verwenden.

Erstellen Sie mit venv eine isolierte virtuelle Python-Umgebung, um Ihr Pipelineprojekt auszuführen. Mit einer virtuellen Umgebung können Sie die Abhängigkeiten eines Projekts von den Abhängigkeiten anderer Projekte isolieren. Weitere Informationen zum Installieren von Python und zum Erstellen einer virtuellen Umgebung finden Sie unter Python-Entwicklungsumgebung einrichten.

  1. Klonen Sie das GitHub-Repository mit dem Befehl git clone:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Rufen Sie das Verzeichnis run-inference auf:

    cd python-docs-samples/dataflow/run-inference
    
  3. Wenn Sie eine Eingabeaufforderung verwenden, prüfen Sie, ob Python 3 und pip in Ihrem System ausgeführt werden:

    python --version
    python -m pip --version
    

    Installieren Sie gegebenenfalls Python 3.

    Wenn Sie Cloud Shell verwenden, können Sie diesen Schritt überspringen, da in Cloud Shell bereits Python installiert ist.

  4. Erstellen Sie eine virtuelle Python-Umgebung::

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Installieren Sie die Abhängigkeiten:

    pip install -r requirements.txt --no-cache-dir
    

Codebeispiel zum Laden von Modellen

Mit dem Modellladecode in dieser Anleitung wird ein benutzerdefinierter Vertex AI-Job gestartet, der das state_dict-Objekt des Modells in Cloud Storage lädt.

Die Startdatei sieht so aus:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM

def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")

def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Pipeline-Codebeispiel

Der Pipelinecode in dieser Anleitung stellt eine Dataflow-Pipeline bereit, die Folgendes ausführt:

  • Liest eine Eingabeaufforderung aus Pub/Sub und codiert den Text in Tokentensoren.
  • Führt die RunInference-Transformation aus.
  • Decodiert die Ausgabetoken-Tensoren in Text und schreibt die Antwort in Pub/Sub.

Die Startdatei sieht so aus:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256

def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]

def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)

class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Modell laden

LLMs können sehr große Modelle sein. Größere Modelle, die mit mehr Parametern trainiert werden, liefern in der Regel bessere Ergebnisse. Größere Modelle benötigen jedoch einen größeren Rechner und mehr Arbeitsspeicher. Größere Modelle können auf CPUs auch langsamer ausgeführt werden.

Bevor Sie ein PyTorch-Modell in Dataflow ausführen, müssen Sie das state_dict-Objekt des Modells laden. Das Objekt state_dict eines Modells speichert die Gewichtungen für das Modell.

In einer Dataflow-Pipeline, die die Apache Beam-RunInference-Transformation verwendet, muss das state_dict-Objekt des Modells in Cloud Storage geladen werden. Der Rechner, den Sie zum Laden des state_dict-Objekts in Cloud Storage verwenden, muss genügend Arbeitsspeicher haben, um das Modell zu laden. Der Rechner benötigt außerdem eine schnelle Internetverbindung, um die Gewichtungen herunterzuladen und in Cloud Storage hochzuladen.

In der folgenden Tabelle sehen Sie die Anzahl der Parameter für die einzelnen Modelle und den Mindestspeicher, der zum Laden der jeweiligen Modelle erforderlich ist.

Modell Parameter Benötigter Arbeitsspeicher
google/flan-t5-small 80 Millionen > 320 MB
google/flan-t5-base 250 Millionen > 1 GB
google/flan-t5-large 780 Millionen > 3,2 GB
google/flan-t5-xl 3 Milliarden > 12 GB
google/flan-t5-xxl 11 Milliarden > 44 GB
google/flan-ul2 20 Milliarden > 80 GB

Sie können zwar ein kleineres Modell lokal laden, aber in dieser Anleitung wird gezeigt, wie Sie einen benutzerdefinierten Vertex AI-Job starten, der das Modell mit einer VM mit angemessener Größe lädt.

Da LLMs so groß sein können, wird im Beispiel in dieser Anleitung das state_dict-Objekt als float16-Format anstelle des Standardformats float32 gespeichert. Bei dieser Konfiguration verwendet jeder Parameter 16 Bits anstelle von 32 Bits, sodass das state_dict-Objekt nur halb so groß ist. Mit einer kleineren Größe wird die Zeit zum Laden des Modells minimiert. Die Konvertierung des Formats bedeutet jedoch, dass die VM sowohl das Modell als auch das state_dict-Objekt in ihren Speicher aufnehmen muss.

In der folgenden Tabelle sind die Mindestanforderungen für das Laden eines Modells aufgeführt, nachdem das state_dict-Objekt im float16-Format gespeichert wurde. Die Tabelle enthält auch die vorgeschlagenen Rechnertypen zum Laden eines Modells mithilfe von Vertex AI. Die minimale (und standardmäßige) Laufwerkgröße für Vertex AI beträgt 100 GB. Bei einigen Modellen ist jedoch möglicherweise ein größeres Laufwerk erforderlich.

Modellname Benötigter Arbeitsspeicher Maschinentyp VM-Arbeitsspeicher VM-Laufwerk
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

Laden Sie das Objekt state_dict des Modells mithilfe eines benutzerdefinierten Vertex AI-Jobs in Cloud Storage:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Ersetzen Sie Folgendes:

  • MODEL_NAME: Der Name des Modells, z. B. google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: Der Maschinentyp, auf dem der benutzerdefinierte Vertex AI-Job ausgeführt werden soll, z. B. e2-highmem-4.
  • DISK_SIZE_GB: Die Laufwerksgröße für die VM in GB. Die Mindestgröße beträgt 100 GB.

Je nach Größe des Modells kann es einige Minuten dauern, bis es geladen ist. Rufen Sie die Seite Benutzerdefinierte Jobs von Vertex AI auf, um den Status aufzurufen.

Benutzerdefinierte Jobs aufrufen

Pipeline ausführen

Nachdem Sie das Modell geladen haben, führen Sie die Dataflow-Pipeline aus. Zum Ausführen der Pipeline müssen sowohl das Modell als auch der von den verschiedenen Workers verwendete Arbeitsspeicher in den Speicher passen.

In der folgenden Tabelle sind die empfohlenen Maschinentypen zum Ausführen einer Inferenzpipeline aufgeführt.

Modellname Maschinentyp VM-Arbeitsspeicher
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Führen Sie die Pipeline aus:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Projekt-ID
  • PROMPTS_TOPIC_ID: die Themen-ID für die Prompts, die an das Modell gesendet werden sollen
  • RESPONSES_TOPIC_ID: die Themen-ID für die Antworten des Modells
  • MODEL_NAME: der Name des Modells, z. B. google/flan-t5-xl.
  • BUCKET_NAME: Der Name des Buckets
  • REGION: Region, in der der Job bereitgestellt werden soll, z. B. us-central1
  • DATAFLOW_MACHINE_TYPE: die VM, auf der die Pipeline ausgeführt werden soll, z. B. n2-highmem-4

Damit das Modell nur einmal pro Worker geladen wird und genügend Arbeitsspeicher zur Verfügung steht, konfigurieren Sie die Worker zur Verwendung einer einzelnen Methode. Dazu legen Sie die Pipelineoption --experiments=no_use_multiple_sdk_containers fest. Sie müssen die Anzahl der Threads nicht begrenzen, da die Transformation RunInference dasselbe Modell mit mehreren Threads nutzt.

Die Pipeline in diesem Beispiel wird mit CPUs ausgeführt. Bei einem größeren Modell ist mehr Zeit für die Verarbeitung der einzelnen Anfragen erforderlich. Sie können GPUs aktivieren, wenn Sie schnellere Antworten benötigen.

Den Status der Pipeline können Sie auf der Dataflow-Seite Jobs aufrufen.

ZU JOBS

Modell eine Frage stellen

Nachdem die Pipeline ausgeführt wird, übergeben Sie dem Modell einen Prompt und erhalten eine Antwort.

  1. Senden Sie eine Nachricht an Pub/Sub, um Ihren Prompt zu senden. Führen Sie den Befehl gcloud pubsub topics publish aus:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Ersetzen Sie PROMPT_TEXT durch einen String, der den Prompt enthält, den Sie bereitstellen möchten. Setzen Sie den Prompt in Anführungszeichen.

    Verwenden Sie einen eigenen Prompt oder versuchen Sie es mit einem der folgenden Beispiele:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Verwenden Sie den Befehl gcloud pubsub subscriptions pull, um die Antwort abzurufen.

    Je nach Größe des Modells kann es einige Minuten dauern, bis das Modell eine Antwort generiert. Bei größeren Modellen dauern Bereitstellung und Generation einer Antwort länger.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Ersetzen Sie RESPONSES_SUBSCRIPTION_ID durch die Abo-ID für die Antworten des Modells.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Einzelne Ressourcen löschen

  1. Beenden Sie die virtuelle Python-Umgebung:

    deactivate
  2. Beenden Sie die Pipeline:

    1. Listen Sie die Job-IDs der ausgeführten Dataflow-Jobs auf und notieren Sie sich die Job-ID des Jobs der Anleitung:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Brechen Sie den Job ab:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Löschen Sie den Bucket und seinen Inhalt:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Löschen Sie die Themen und das Abo:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. Optional: Löschen Sie Rollen aus Ihrem Google-Konto.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Wie geht es weiter?