Esegui un modello LLM in una pipeline di streaming


Questo tutorial mostra come eseguire un modello linguistico di grandi dimensioni (LLM) in una pipeline Dataflow di streaming utilizzando l'API Apache Beam RunInference.

Per saperne di più sull'API RunInference, consulta Informazioni su Beam ML nella documentazione di Apache Beam.

Il codice di esempio è disponibile su GitHub.

Obiettivi

  • Crea argomenti e sottoscrizioni Pub/Sub per le risposte e gli input del modello.
  • Carica il modello in Cloud Storage utilizzando un job personalizzato Vertex AI.
  • Esegui la pipeline.
  • Fai una domanda al modello e ricevi una risposta.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.

Prima di iniziare

Esegui questo tutorial su un computer con almeno 5 GB di spazio su disco libero per installare le dipendenze.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Concedi i ruoli all'account di servizio predefinito di Compute Engine. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • PROJECT_NUMBER: il numero del progetto. Per trovare il numero del progetto, utilizza il comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: ogni singolo ruolo.
  17. Copia l'ID progetto Google Cloud. Questo valore ti servirà più avanti in questo tutorial.

Crea le Google Cloud risorse

Questa sezione spiega come creare le seguenti risorse:

  • Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea
  • Un argomento Pub/Sub per i prompt del modello
  • Un argomento e una sottoscrizione Pub/Sub per le risposte del modello

Crea un bucket Cloud Storage

Crea un bucket Cloud Storage utilizzando gcloud CLI. Questo bucket viene utilizzato come posizione di archiviazione temporanea dalla pipeline Dataflow.

Per creare il bucket, utilizza il comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Sostituisci quanto segue:

Copia il nome del bucket. Questo valore ti servirà più avanti in questo tutorial.

Creare argomenti e sottoscrizioni Pub/Sub

Crea due argomenti Pub/Sub e una sottoscrizione. Un argomento è per i prompt di input che invii al modello. L'altro argomento e l'abbonamento associato sono per le risposte del modello.

  1. Per creare gli argomenti, esegui il comando gcloud pubsub topics create due volte, una per ogni argomento:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Sostituisci quanto segue:

    • PROMPTS_TOPIC_ID: l'ID argomento per i prompt di input da inviare al modello, ad esempio prompts
    • RESPONSES_TOPIC_ID: l'ID argomento per le risposte del modello, ad esempio responses
  2. Per creare la sottoscrizione e collegarla all'argomento delle risposte, utilizza il comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Sostituisci RESPONSES_SUBSCRIPTION_ID con l'ID abbonamento per le risposte del modello, ad esempio responses-subscription.

Copia gli ID argomento e l'ID abbonamento. Ti serviranno in un secondo momento in questo tutorial.

prepara l'ambiente

Scarica gli esempi di codice e configura l'ambiente per eseguire il tutorial.

Gli esempi di codice nel repository GitHub python-docs-samples forniscono il codice necessario per eseguire questa pipeline. Quando è tutto pronto per creare la tua pipeline, puoi utilizzare questo codice di esempio come modello.

Crea un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Un ambiente virtuale ti consente di isolare le dipendenze di un progetto dalle dipendenze di altri progetti. Per ulteriori informazioni su come installare Python e creare un ambiente virtuale, consulta Configurazione di un ambiente di sviluppo Python.

  1. Utilizza il comando git clone per clonare il repository GitHub:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Vai alla directory run-inference:

    cd python-docs-samples/dataflow/run-inference
    
  3. Se utilizzi un prompt dei comandi, controlla che Python 3 e pip siano in esecuzione nel sistema:

    python --version
    python -m pip --version
    

    Se necessario, installa Python 3.

    Se utilizzi Cloud Shell, puoi saltare questo passaggio perché su Cloud Shell è già installato Python.

  4. Crea un ambiente virtuale Python:

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Installa le dipendenze:

    pip install -r requirements.txt --no-cache-dir
    

Esempio di codice di caricamento del modello

Il codice di caricamento del modello in questo tutorial avvia un job personalizzato Vertex AI che carica l'oggetto state_dict del modello in Cloud Storage.

Il file iniziale ha il seguente aspetto:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Esempio di codice della pipeline

Il codice della pipeline in questo tutorial esegue il deployment di una pipeline Dataflow che esegue le seguenti operazioni:

  • Legge un prompt da Pub/Sub e codifica il testo in tensori di token.
  • Esegue la trasformazione RunInference.
  • Decodifica i tensori dei token di output in testo e scrive la risposta in Pub/Sub.

Il file iniziale ha il seguente aspetto:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Carica il modello

Gli LLM possono essere modelli molto grandi. I modelli più grandi addestrati con più parametri generalmente forniscono risultati migliori. Tuttavia, i modelli più grandi richiedono una macchina più grande e più memoria per l'esecuzione. Inoltre, i modelli più grandi possono essere più lenti da eseguire su CPU.

Prima di eseguire un modello PyTorch su Dataflow, devi caricare l'oggetto state_dict del modello. L'oggetto state_dict di un modello memorizza i relativi pesi.

In una pipeline Dataflow che utilizza la trasformazione RunInference di Apache Beam, l'oggetto state_dict del modello deve essere caricato in Cloud Storage. La macchina che utilizzi per caricare l'oggettostate_dict in Cloud Storage deve avere memoria sufficiente per caricare il modello. La macchina richiede inoltre una connessione a internet veloce per scaricare i pesi e caricarli su Cloud Storage.

La tabella seguente mostra il numero di parametri per ogni modello e la memoria minima necessaria per caricare ogni modello.

Modello Parametri Memoria necessaria
google/flan-t5-small 80 milioni > 320 MB
google/flan-t5-base 250 milioni > 1 GB
google/flan-t5-large 780 milioni > 3,2 GB
google/flan-t5-xl 3 miliardi > 12 GB
google/flan-t5-xxl 11 miliardi > 44 GB
google/flan-ul2 20 miliardi > 80 GB

Sebbene sia possibile caricare un modello più piccolo in locale, questo tutorial mostra come avviare un job personalizzato Vertex AI che carica il modello con una VM di dimensioni adeguate.

Poiché gli LLM possono essere molto grandi, l'esempio in questo tutorial salva l'oggetto state_dict in formato float16 anziché nel formato predefinito float32. Con questa configurazione, ogni parametro utilizza 16 bit anziché 32, quindi l'oggetto state_dict ha la metà delle dimensioni. Una dimensione inferiore riduce al minimo il tempo necessario per caricare il modello. Tuttavia, la conversione del formato comporta che la VM debba adattare sia il modello sia l'oggetto state_dict in memoria.

La tabella seguente mostra i requisiti minimi per caricare un modello dopo aver salvato l'oggetto state_dict in formato float16. La tabella mostra anche i tipi di macchine suggeriti per caricare un modello utilizzando Vertex AI. Le dimensioni minime (e predefinite) del disco per Vertex AI sono 100 GB, ma alcuni modelli potrebbero richiedere un disco più grande.

Nome modello Memoria necessaria Tipo di macchina Memoria VM Disco VM
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

Carica l'oggetto state_dict del modello in Cloud Storage utilizzando un job personalizzato Vertex AI:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Sostituisci quanto segue:

  • MODEL_NAME: il nome del modello, ad esempio google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: il tipo di macchina su cui eseguire il job personalizzato Vertex AI, ad esempio e2-highmem-4.
  • DISK_SIZE_GB: le dimensioni del disco della VM in GB. La dimensione minima è di 100 GB.

A seconda delle dimensioni del modello, il caricamento potrebbe richiedere alcuni minuti. Per visualizzare lo stato, vai alla pagina Job personalizzati di Vertex AI.

Vai a Job personalizzati

esegui la pipeline.

Dopo aver caricato il modello, esegui la pipeline Dataflow. Per eseguire la pipeline, sia il modello sia la memoria utilizzata da ogni worker devono rientrare nella memoria.

La tabella seguente mostra i tipi di macchine consigliati per eseguire una pipeline di inferenza.

Nome modello Tipo di macchina Memoria VM
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Esegui la pipeline:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto
  • PROMPTS_TOPIC_ID: l'ID argomento per le richieste di input da inviare al modello
  • RESPONSES_TOPIC_ID: l'ID argomento per le risposte del modello
  • MODEL_NAME: il nome del modello, ad esempio google/flan-t5-xl
  • BUCKET_NAME: il nome del bucket
  • REGION: la regione in cui eseguire il deployment del job, ad esempio us-central1
  • DATAFLOW_MACHINE_TYPE: la VM su cui eseguire la pipeline, ad esempio n2-highmem-4

Per assicurarti che il modello venga caricato una sola volta per worker e che non esaurisca la memoria, configura i worker in modo che utilizzino un singolo processo impostando l'opzione della pipeline --experiments=no_use_multiple_sdk_containers. Non devi limitare il numero di thread perché la trasformazione RunInference condivide lo stesso modello con più thread.

La pipeline in questo esempio viene eseguita con CPU. Per un modello più grande, è necessario più tempo per elaborare ogni richiesta. Puoi attivare le GPU se hai bisogno di risposte più rapide.

Per visualizzare lo stato della pipeline, vai alla pagina Job di Dataflow.

Vai a Job

Fai una domanda al modello

Dopo l'avvio della pipeline, fornisci un prompt al modello e ricevi una risposta.

  1. Invia il prompt pubblicando un messaggio in Pub/Sub. Utilizza il comando gcloud pubsub topics publish:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Sostituisci PROMPT_TEXT con una stringa contenente il prompt che vuoi fornire. Racchiudi il prompt tra virgolette.

    Utilizza il tuo prompt o prova uno dei seguenti esempi:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Per ottenere la risposta, utilizza il comando gcloud pubsub subscriptions pull.

    A seconda delle dimensioni del modello, la generazione di una risposta potrebbe richiedere alcuni minuti. I modelli più grandi richiedono più tempo per essere implementati e per generare una risposta.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Sostituisci RESPONSES_SUBSCRIPTION_ID con l'ID abbonamento per le risposte del modello.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimina singole risorse

  1. Esci dall'ambiente virtuale Python:

    deactivate
  2. Interrompi la pipeline:

    1. Elenca gli ID job dei job Dataflow in esecuzione, quindi prendi nota dell'ID job del job del tutorial:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Annullare il job:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Elimina il bucket e tutto ciò che contiene:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Elimina gli argomenti e la sottoscrizione:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. (Facoltativo) Revoca i ruoli dal tuo Account Google.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Passaggi successivi