LLM in einer Streamingpipeline ausführen


In dieser Anleitung erfahren Sie, wie Sie mithilfe der Apache Beam RunInference API ein großes Sprachmodell (LLM) in einer Dataflow-Streamingpipeline ausführen.

Weitere Informationen zur RunInference API finden Sie in der Apache Beam-Dokumentation unter Beam ML.

Der Beispielcode ist auf GitHub verfügbar.

Ziele

  • Pub/Sub-Themen und -Abos für die Eingaben und Antworten des Modells erstellen.
  • Das Modell mit einem benutzerdefinierten Vertex AI-Job in Cloud Storage laden.
  • Pipeline ausführen.
  • Dem Modell eine Frage stellen und eine Antwort erhalten.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten der Google Cloud Platform:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Google Cloud Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

Führen Sie diese Anleitung auf einem Computer aus, auf dem mindestens 5 GB freier Speicherplatz zur Installation der Abhängigkeiten verfügbar sind.

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  4. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  12. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • PROJECT_NUMBER: Ihre Projektnummer. Verwenden Sie den Befehl gcloud projects describe, um Ihre Projektnummer zu ermitteln.
    • SERVICE_ACCOUNT_ROLE: Jede einzelne Rolle.
  19. Kopieren Sie die Google Cloud Projekt-ID. Sie benötigen diesen Wert später in dieser Anleitung.
  20. Google Cloud Platform-Ressourcen erstellen

    In diesem Abschnitt werden die folgenden Ressourcen erstellt:

    • Ein Cloud Storage-Bucket, der als temporärer Speicherort verwendet werden soll
    • Ein Pub/Sub-Thema für die Prompts des Modells
    • Ein Pub/Sub-Thema und -Abo für die Antworten des Modells

    Cloud Storage-Bucket erstellen

    Cloud Storage-Bucket mit der gcloud CLI erstellen Dieser Bucket wird von der Dataflow-Pipeline als temporärer Speicherort verwendet.

    Verwenden Sie den Befehl gcloud storage buckets create, um den Bucket zu erstellen:

    gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
    

    Ersetzen Sie dabei Folgendes:

    Kopieren Sie den Bucket-Namen. Sie benötigen diesen Wert später in dieser Anleitung.

    Pub/Sub-Themen und -Abos erstellen

    Erstellen Sie zwei Pub/Sub-Themen und ein Abo. Ein Thema ist für die Prompts, die Sie an das Modell senden. Das andere Thema und das zugehörige Abo wird für die Antworten des Modells genutzt.

    1. Führen Sie zum Erstellen der Themen den gcloud pubsub topics create-Befehl zweimal aus, einmal für jedes Thema:

      gcloud pubsub topics create PROMPTS_TOPIC_ID
      gcloud pubsub topics create RESPONSES_TOPIC_ID
      

      Ersetzen Sie dabei Folgendes:

      • PROMPTS_TOPIC_ID: Die Themen-ID für die Prompts, die an das Modell gesendet werden sollen, z. B. prompts
      • RESPONSES_TOPIC_ID: Die Themen-ID für die Antworten des Modells, z. B. responses
    2. Verwenden Sie den Befehl gcloud pubsub subscriptions create, um das Abo zu erstellen und an Ihr Thema für Antworten anzuhängen:

      gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
      

      Ersetzen Sie RESPONSES_SUBSCRIPTION_ID durch die Abo-ID für die Antworten des Modells, z. B. responses-subscription.

    Kopieren Sie die Themen-IDs und die Abo-ID. Sie benötigen diese Werte später in dieser Anleitung.

    Umgebung vorbereiten

    Laden Sie die Codebeispiele herunter und richten Sie dann Ihre Umgebung ein, um die Anleitung auszuführen.

    Die Codebeispiele im GitHub-Repository python-docs-samples enthalten den Code, den Sie zum Ausführen dieser Pipeline benötigen. Wenn Sie Ihre eigene Pipeline erstellen möchten, können Sie diesen Beispielcode als Vorlage verwenden.

    Sie erstellen eine isolierte virtuelle Python-Umgebung, um Ihr Pipeline-Projekt mit venv auszuführen. In einer virtuellen Umgebung können Sie die Abhängigkeiten eines Projekts von den Abhängigkeiten anderer Projekte isolieren. Weitere Informationen zum Installieren von Python und zum Erstellen einer virtuellen Umgebung finden Sie unter Python-Entwicklungsumgebung einrichten.

    1. Klonen Sie das GitHub-Repository mit dem Befehl git clone:

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      
    2. Rufen Sie das Verzeichnis run-inference auf:

      cd python-docs-samples/dataflow/run-inference
      
    3. Wenn Sie eine Eingabeaufforderung verwenden, prüfen Sie, ob Python 3 und pip in Ihrem System ausgeführt werden:

      python --version
      python -m pip --version
      

      Installieren Sie Python 3, falls erforderlich.

      Wenn Sie Cloud Shell verwenden, können Sie diesen Schritt überspringen, da Python bereits in Cloud Shell installiert ist.

    4. Erstellen Sie eine virtuelle Python-Umgebung::

      python -m venv /tmp/env
      source /tmp/env/bin/activate
      
    5. Installieren Sie die Abhängigkeiten:

      pip install -r requirements.txt --no-cache-dir
      

    Beispielcode zum Laden von Modellen

    Der Modellladecode in dieser Anleitung startet einen benutzerdefinierten Vertex AI-Job, der das state_dict-Objekt des Modells in Cloud Storage lädt.

    Die Startdatei sieht so aus:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Loads the state_dict for an LLM model into Cloud Storage."""
    
    from __future__ import annotations
    
    import os
    
    import torch
    from transformers import AutoModelForSeq2SeqLM
    
    
    def run_local(model_name: str, state_dict_path: str) -> None:
        """Loads the state dict and saves it into the desired path.
    
        If the `state_dict_path` is a Cloud Storage location starting
        with "gs://", this assumes Cloud Storage is mounted with
        Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        """
        print(f"Loading model: {model_name}")
        model = AutoModelForSeq2SeqLM.from_pretrained(
            model_name, torch_dtype=torch.bfloat16
        )
        print(f"Model loaded, saving state dict to: {state_dict_path}")
    
        # Assume Cloud Storage FUSE is mounted in `/gcs`.
        state_dict_path = state_dict_path.replace("gs://", "/gcs/")
        directory = os.path.dirname(state_dict_path)
        if directory and not os.path.exists(directory):
            os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
        torch.save(model.state_dict(), state_dict_path)
        print("State dict saved successfully!")
    
    
    def run_vertex_job(
        model_name: str,
        state_dict_path: str,
        job_name: str,
        project: str,
        bucket: str,
        location: str = "us-central1",
        machine_type: str = "e2-highmem-2",
        disk_size_gb: int = 100,
    ) -> None:
        """Launches a Vertex AI custom job to load the state dict.
    
        If the model is too large to fit into memory or disk, we can launch
        a Vertex AI custom job with a large enough VM for this to work.
    
        Depending on the model's size, it might require a different VM
        configuration. The model MUST fit into the VM's memory, and there
        must be enough disk space to stage the entire model while it gets
        copied to Cloud Storage.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            job_name: Job display name in the Vertex AI console.
            project: Google Cloud Project ID.
            bucket: Cloud Storage bucket name, without the "gs://" prefix.
            location: Google Cloud regional location.
            machine_type: Machine type for the VM to run the job.
            disk_size_gb: Disk size in GB for the VM to run the job.
        """
        from google.cloud import aiplatform
    
        aiplatform.init(project=project, staging_bucket=bucket, location=location)
    
        job = aiplatform.CustomJob.from_local_script(
            display_name=job_name,
            container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
            script_path="download_model.py",
            args=[
                "local",
                f"--model-name={model_name}",
                f"--state-dict-path={state_dict_path}",
            ],
            machine_type=machine_type,
            boot_disk_size_gb=disk_size_gb,
            requirements=["transformers"],
        )
        job.run()
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        subparsers = parser.add_subparsers(required=True)
    
        parser_local = subparsers.add_parser("local")
        parser_local.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_local.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_local.set_defaults(run=run_local)
    
        parser_vertex = subparsers.add_parser("vertex")
        parser_vertex.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_vertex.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_vertex.add_argument(
            "--job-name", required=True, help="Job display name in the Vertex AI console"
        )
        parser_vertex.add_argument(
            "--project", required=True, help="Google Cloud Project ID"
        )
        parser_vertex.add_argument(
            "--bucket",
            required=True,
            help='Cloud Storage bucket name, without the "gs://" prefix',
        )
        parser_vertex.add_argument(
            "--location", default="us-central1", help="Google Cloud regional location"
        )
        parser_vertex.add_argument(
            "--machine-type",
            default="e2-highmem-2",
            help="Machine type for the VM to run the job",
        )
        parser_vertex.add_argument(
            "--disk-size-gb",
            type=int,
            default=100,
            help="Disk size in GB for the VM to run the job",
        )
        parser_vertex.set_defaults(run=run_vertex_job)
    
        args = parser.parse_args()
        kwargs = args.__dict__.copy()
        kwargs.pop("run")
    
        args.run(**kwargs)
    

    Pipeline-Codebeispiel

    Der Pipelinecode in dieser Anleitung stellt eine Dataflow-Pipeline bereit, die Folgendes ausführt:

    • Liest einen Prompt aus Pub/Sub und codiert den Text in Token-Tensoren.
    • Führt die Transformation RunInference aus.
    • Decodiert die Ausgabetoken-Tensoren in Text und schreibt die Antwort in Pub/Sub.

    Die Startdatei sieht so aus:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Runs a streaming RunInference Language Model pipeline."""
    
    from __future__ import annotations
    
    import logging
    
    import apache_beam as beam
    from apache_beam.ml.inference.base import PredictionResult
    from apache_beam.ml.inference.base import RunInference
    from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
    from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
    from apache_beam.options.pipeline_options import PipelineOptions
    import torch
    from transformers import AutoConfig
    from transformers import AutoModelForSeq2SeqLM
    from transformers import AutoTokenizer
    from transformers.tokenization_utils import PreTrainedTokenizer
    
    MAX_RESPONSE_TOKENS = 256
    
    
    def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
        """Encodes input text into token tensors.
    
        Args:
            input_text: Input text for the language model.
            tokenizer: Tokenizer for the language model.
    
        Returns: Tokenized input tokens.
        """
        return tokenizer(input_text, return_tensors="pt").input_ids[0]
    
    
    def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
        """Decodes output token tensors into text.
    
        Args:
            result: Prediction results from the RunInference transform.
            tokenizer: Tokenizer for the language model.
    
        Returns: The model's response as text.
        """
        output_tokens = result.inference
        return tokenizer.decode(output_tokens, skip_special_tokens=True)
    
    
    class AskModel(beam.PTransform):
        """Asks an language model a prompt message and gets its responses.
    
        Attributes:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            max_response_tokens: Maximum number of tokens for the model to generate.
        """
    
        def __init__(
            self,
            model_name: str,
            state_dict_path: str,
            max_response_tokens: int = MAX_RESPONSE_TOKENS,
        ) -> None:
            self.model_handler = PytorchModelHandlerTensor(
                state_dict_path=state_dict_path,
                model_class=AutoModelForSeq2SeqLM.from_config,
                model_params={"config": AutoConfig.from_pretrained(model_name)},
                inference_fn=make_tensor_model_fn("generate"),
            )
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.max_response_tokens = max_response_tokens
    
        def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
            return (
                pcollection
                | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
                | "RunInference"
                >> RunInference(
                    self.model_handler,
                    inference_args={"max_new_tokens": self.max_response_tokens},
                )
                | "Get response" >> beam.Map(decode_response, self.tokenizer)
            )
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--messages-topic",
            required=True,
            help="Pub/Sub topic for input text messages",
        )
        parser.add_argument(
            "--responses-topic",
            required=True,
            help="Pub/Sub topic for output text responses",
        )
        parser.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        args, beam_args = parser.parse_known_args()
    
        logging.getLogger().setLevel(logging.INFO)
        beam_options = PipelineOptions(
            beam_args,
            pickle_library="cloudpickle",
            streaming=True,
        )
    
        simple_name = args.model_name.split("/")[-1]
        pipeline = beam.Pipeline(options=beam_options)
        _ = (
            pipeline
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
            | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
            | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
            | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
            | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
        )
        pipeline.run()
    

    Modell laden

    LLMs können sehr große Modelle sein. Größere Modelle, die mit mehr Parametern trainiert werden, liefern in der Regel bessere Ergebnisse. Größere Modelle benötigen jedoch einen größeren Rechner und mehr Arbeitsspeicher. Größere Modelle können auf CPUs auch langsamer ausgeführt werden.

    Bevor Sie ein PyTorch-Modell in Dataflow ausführen können, müssen Sie das state_dict-Objekt des Modells laden. Im state_dict-Objekt eines Modells werden die Gewichte für das Modell gespeichert.

    In einer Dataflow-Pipeline, in der die Apache Beam-Transformation RunInference verwendet wird, muss das state_dict-Objekt des Modells in Cloud Storage geladen werden. Der Rechner, den Sie zum Laden des state_dict-Objekts in Cloud Storage verwenden, muss genügend Arbeitsspeicher haben, um das Modell zu laden. Der Rechner benötigt außerdem eine schnelle Internetverbindung, um die Gewichtungen herunterzuladen und in Cloud Storage hochzuladen.

    In der folgenden Tabelle sehen Sie die Anzahl der Parameter für die einzelnen Modelle und den Mindestspeicher, der zum Laden der jeweiligen Modelle erforderlich ist.

    Modell Parameter Erforderlicher Arbeitsspeicher
    google/flan-t5-small 80 Millionen > 320 MB
    google/flan-t5-base 250 Millionen > 1 GB
    google/flan-t5-large 780 Millionen > 3,2 GB
    google/flan-t5-xl 3 Milliarden > 12 GB
    google/flan-t5-xxl 11 Milliarden > 44 GB
    google/flan-ul2 20 Milliarden > 80 GB

    Sie können zwar ein kleineres Modell lokal laden, in dieser Anleitung wird jedoch gezeigt, wie Sie einen benutzerdefinierten Vertex AI-Job starten, der das Modell mit einer VM mit angemessener Größe lädt.

    Da LLMs so groß sein können, wird das state_dict-Objekt im Beispiel in diesem Tutorial im float16-Format anstelle des Standardformats float32 gespeichert. Bei dieser Konfiguration verwendet jeder Parameter 16 Bits anstelle von 32 Bits, sodass das state_dict-Objekt nur halb so groß ist. Eine kleinere Größe minimiert die Zeit, die zum Laden des Modells benötigt wird. Die Konvertierung des Formats bedeutet jedoch, dass die VM sowohl das Modell als auch das state_dict-Objekt in ihren Speicher aufnehmen muss.

    In der folgenden Tabelle sind die Mindestanforderungen für das Laden eines Modells aufgeführt, nachdem das state_dict-Objekt im float16-Format gespeichert wurde. Die Tabelle enthält auch die vorgeschlagenen Rechnertypen zum Laden eines Modells mithilfe von Vertex AI. Die minimale (und standardmäßige) Laufwerkgröße für Vertex AI beträgt 100 GB. Bei einigen Modellen ist jedoch möglicherweise ein größeres Laufwerk erforderlich.

    Modellname Erforderlicher Arbeitsspeicher Maschinentyp VM-Arbeitsspeicher VM-Laufwerk
    google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
    google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
    google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
    google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

    Laden Sie das state_dict-Objekt des Modells mit einem benutzerdefinierten Vertex AI-Job in Cloud Storage:

    python download_model.py vertex \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --job-name="Load MODEL_NAME" \
        --project="PROJECT_ID" \
        --bucket="BUCKET_NAME" \
        --location="LOCATION" \
        --machine-type="VERTEX_AI_MACHINE_TYPE" \
        --disk-size-gb="DISK_SIZE_GB"
    

    Ersetzen Sie dabei Folgendes:

    • MODEL_NAME: Der Name des Modells, z. B. google/flan-t5-xl.
    • VERTEX_AI_MACHINE_TYPE: Der Maschinentyp, auf dem der benutzerdefinierte Vertex AI-Job ausgeführt werden soll, z. B. e2-highmem-4.
    • DISK_SIZE_GB: Die Laufwerksgröße für die VM in GB. Die Mindestgröße beträgt 100 GB.

    Je nach Größe des Modells kann es einige Minuten dauern, bis es geladen ist. Den Status können Sie auf der Seite Benutzerdefinierte Jobs von Vertex AI aufrufen.

    Benutzerdefinierte Jobs aufrufen

    Pipeline ausführen

    Nachdem Sie das Modell geladen haben, führen Sie die Dataflow-Pipeline aus. Zum Ausführen der Pipeline müssen sowohl das Modell als auch der von den verschiedenen Workers verwendete Arbeitsspeicher in den Speicher passen.

    In der folgenden Tabelle sind die empfohlenen Maschinentypen für die Ausführung einer Inferenzpipeline aufgeführt.

    Modellname Maschinentyp VM-Arbeitsspeicher
    google/flan-t5-small n2-highmem-2 16 GB
    google/flan-t5-base n2-highmem-2 16 GB
    google/flan-t5-large n2-highmem-4 32 GB
    google/flan-t5-xl n2-highmem-4 32 GB
    google/flan-t5-xxl n2-highmem-8 64 GB
    google/flan-ul2 n2-highmem-16 128 GB

    Führen Sie die Pipeline aus:

    python main.py \
        --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
        --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --runner="DataflowRunner" \
        --project="PROJECT_ID" \
        --temp_location="gs://BUCKET_NAME/temp" \
        --region="REGION" \
        --machine_type="DATAFLOW_MACHINE_TYPE" \
        --requirements_file="requirements.txt" \
        --requirements_cache="skip" \
        --experiments="use_sibling_sdk_workers" \
        --experiments="no_use_multiple_sdk_containers"
    

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: die Projekt-ID
    • PROMPTS_TOPIC_ID: die Themen-ID für die Prompts, die an das Modell gesendet werden sollen
    • RESPONSES_TOPIC_ID: die Themen-ID für die Antworten des Modells
    • MODEL_NAME: der Name des Modells, z. B. google/flan-t5-xl.
    • BUCKET_NAME: Der Name des Buckets
    • REGION: die Region, in der der Job bereitgestellt werden soll, z. B. us-central1
    • DATAFLOW_MACHINE_TYPE: die VM, auf der die Pipeline ausgeführt werden soll, z. B. n2-highmem-4

    Damit das Modell nur einmal pro Worker geladen wird und genügend Arbeitsspeicher zur Verfügung steht, konfigurieren Sie die Worker zur Verwendung einer einzelnen Methode. Dazu legen Sie die Pipelineoption --experiments=no_use_multiple_sdk_containers fest. Sie müssen die Anzahl der Threads nicht begrenzen, da die RunInference-Transformation dasselbe Modell mit mehreren Threads teilt.

    Die Pipeline in diesem Beispiel wird mit CPUs ausgeführt. Bei einem größeren Modell ist mehr Zeit für die Verarbeitung jeder Anfrage erforderlich. Sie können GPUs aktivieren, wenn Sie schnellere Antworten benötigen.

    Rufen Sie die Dataflow-Seite Jobs auf, um den Status der Pipeline aufzurufen.

    ZU JOBS

    Dem Modell eine Frage stellen

    Nachdem die Pipeline ausgeführt wird, übergeben Sie dem Modell einen Prompt und erhalten eine Antwort.

    1. Senden Sie eine Nachricht an Pub/Sub, um Ihren Prompt zu senden. Führen Sie den Befehl gcloud pubsub topics publish aus:

      gcloud pubsub topics publish PROMPTS_TOPIC_ID \
          --message="PROMPT_TEXT"
      

      Ersetzen Sie PROMPT_TEXT durch einen String, der den Prompt enthält, den Sie bereitstellen möchten. Setzen Sie den Prompt in Anführungszeichen.

      Verwenden Sie einen eigenen Prompt oder versuchen Sie es mit einem der folgenden Beispiele:

      • Translate to Spanish: My name is Luka
      • Complete this sentence: Once upon a time, there was a
      • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
    2. Verwenden Sie den Befehl gcloud pubsub subscriptions pull, um die Antwort abzurufen.

      Je nach Größe des Modells kann es einige Minuten dauern, bis eine Antwort generiert wird. Bei größeren Modellen dauern Bereitstellung und Generation einer Antwort länger.

      gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
      

      Ersetzen Sie RESPONSES_SUBSCRIPTION_ID durch die Abo-ID für die Antworten des Modells.

    Bereinigen

    Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

    Projekt löschen

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Einzelne Ressourcen löschen

    1. Beenden Sie die virtuelle Python-Umgebung:

      deactivate
    2. Beenden Sie die Pipeline:

      1. Listen Sie die Job-IDs der Dataflow-Jobs auf, die derzeit ausgeführt werden, und notieren Sie sich die Job-ID des Jobs für das Tutorial:

        gcloud dataflow jobs list --region=REGION --status=active
      2. So brechen Sie den Job ab:

        gcloud dataflow jobs cancel JOB_ID --region=REGION
    3. Löschen Sie den Bucket und seinen Inhalt:

      gcloud storage rm gs://BUCKET_NAME --recursive
    4. Löschen Sie die Themen und das Abo:

      gcloud pubsub topics delete PROMPTS_TOPIC_ID
      gcloud pubsub topics delete RESPONSES_TOPIC_ID
      gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
    5. Widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.admin
      • roles/pubsub.editor
      • roles/aiplatform.user
      gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
    6. Optional: Löschen Sie Rollen aus Ihrem Google-Konto.

      gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
    7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

      gcloud auth application-default revoke
    8. Optional: Revoke credentials from the gcloud CLI.

      gcloud auth revoke

    Wie geht es weiter?