Esegui un modello LLM in una pipeline di streaming


Questo tutorial mostra come eseguire un modello linguistico di grandi dimensioni (LLM) in un flusso di dati Pipeline Dataflow utilizzando l'API Apache Beam RunInference.

Per ulteriori informazioni sull'API RunInference, vedi Informazioni su Beam ML nella documentazione di Apache Beam.

Il codice di esempio è disponibile su GitHub.

Obiettivi

  • Creare argomenti e sottoscrizioni Pub/Sub per l'input e il modello diverse.
  • carica il modello in Cloud Storage utilizzando un'istanza di Vertex AI un lavoro.
  • Esegui la pipeline.
  • Poni una domanda al modello e ricevi una risposta.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.

Prima di iniziare

Esegui questo tutorial su una macchina con almeno 5 GB di spazio libero su disco per installare le dipendenze.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Concedi ruoli al tuo account di servizio predefinito Compute Engine. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • PROJECT_NUMBER: il numero del progetto. Per trovare il numero del tuo progetto, utilizza il Comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: ogni ruolo individuale.
  17. Copia l'ID progetto Google Cloud. Questo valore ti servirà più avanti in questo tutorial.

crea le risorse Google Cloud

Questa sezione spiega come creare le risorse seguenti:

  • Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea
  • Un argomento Pub/Sub per i prompt del modello
  • Un argomento e una sottoscrizione Pub/Sub per le risposte del modello

Crea un bucket Cloud Storage

Crea un bucket Cloud Storage utilizzando gcloud CLI. Questo bucket viene utilizzato come posizione di archiviazione temporanea Dataflow.

Per creare il bucket, utilizza Comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Sostituisci quanto segue:

  • BUCKET_NAME: un nome per il tuo Cloud Storage che soddisfa i requisiti requisiti di denominazione dei bucket. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.
  • LOCATION: il valore location del bucket.

Copia il nome del bucket. Questo valore ti servirà più avanti in questo tutorial.

Creare argomenti e sottoscrizioni Pub/Sub

Crea due argomenti Pub/Sub e una sottoscrizione. Un argomento è per i prompt di input che invii al modello. L'altro argomento e i relativi allegati abbonamento riguarda le risposte del modello.

  1. Per creare gli argomenti, esegui Comando gcloud pubsub topics create due volte, una per ogni argomento:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Sostituisci quanto segue:

    • PROMPTS_TOPIC_ID: ID argomento per l'input dei prompt da inviare al modello, ad esempio prompts
    • RESPONSES_TOPIC_ID: l'ID argomento per il valore del modello di risposte, come responses
  2. Per creare la sottoscrizione e collegarla all'argomento delle risposte, utilizza il comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Sostituisci RESPONSES_SUBSCRIPTION_ID con ID sottoscrizione per le risposte del modello, ad esempio responses-subscription.

Copia gli ID argomento e l'ID abbonamento. Questi valori ti serviranno più avanti durante il tutorial.

prepara l'ambiente

Scarica gli esempi di codice, quindi configura l'ambiente per eseguire il tutorial.

Gli esempi di codice nella Repository GitHub di python-docs-samples fornisci il codice necessario per eseguire questa pipeline. Quando è tutto pronto per creare la tua pipeline, puoi utilizzare questo codice di esempio come modello.

Creerai un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Un ambiente virtuale consente di isolare le dipendenze di un progetto e dipendenze di altri progetti. Per ulteriori informazioni su come installare Python e creare un ambiente virtuale, Impostazione di un ambiente di sviluppo Python.

  1. Usa il comando git clone per clonare il repository GitHub:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Vai alla directory run-inference:

    cd python-docs-samples/dataflow/run-inference
    
  3. Se utilizzi un prompt dei comandi, verifica di avere Python 3 e pip in esecuzione nel sistema:

    python --version
    python -m pip --version
    

    Se necessario, installa Python 3.

    Se utilizzi Cloud Shell, puoi saltare questo passaggio perché In Cloud Shell è già installato Python.

  4. Crea un Ambiente virtuale Python:

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Installa le dipendenze:

    pip install -r requirements.txt --no-cache-dir
    

Esempio di codice di caricamento del modello

Il codice di caricamento del modello in questo tutorial avvia Vertex AI un job personalizzato che carica l'oggetto state_dict del modello in di archiviazione ideale in Cloud Storage.

Il file di base è simile al seguente:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Esempio di codice della pipeline

Il codice della pipeline in questo tutorial esegue il deployment di una pipeline Dataflow che esegue le seguenti operazioni:

  • Legge un prompt da Pub/Sub e codifica il testo in tensori di token.
  • Esegue la trasformazione RunInference.
  • Decodifica i tensori del token di output in testo e scrive la risposta al in Pub/Sub.

Il file di base è simile al seguente:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Carica il modello

Gli LLM possono essere modelli molto grandi. I modelli più grandi addestrati con più parametri generalmente forniscono risultati migliori. Tuttavia, i modelli più grandi richiedono e più memoria da eseguire. Anche i modelli più grandi possono essere più lenti per l'esecuzione sulle CPU.

Prima di eseguire un modello PyTorch su Dataflow, devi caricare l'oggetto state_dict del modello. Il modello Oggetto state_dict memorizza i pesi per il modello.

In una pipeline Dataflow che utilizza Apache Beam RunInference, l'oggetto state_dict del modello deve essere caricato di archiviazione ideale in Cloud Storage. La macchina che utilizzi per caricare l'oggetto state_dict a Cloud Storage deve avere memoria sufficiente per caricare il modello. La macchina necessita di una connessione Internet veloce per scaricare i pesi e e caricarli su Cloud Storage.

La tabella seguente mostra il numero di parametri per ogni modello e necessaria per caricare ciascun modello.

Modello Parametri Memoria necessaria
google/flan-t5-small 80 milioni > 320 MB
google/flan-t5-base 250 milioni > 1 GB
google/flan-t5-large 780 milioni > 3,2 GB
google/flan-t5-xl 3 miliardi > 12 GB
google/flan-t5-xxl 11 miliardi > 44 GB
google/flan-ul2 20 miliardi > 80 GB

Sebbene sia possibile caricare in locale un modello più piccolo, questo tutorial mostra come avviare un job personalizzato Vertex AI che carica il modello con di una VM di dimensioni standard.

Poiché gli LLM possono essere così grandi, l'esempio di questo tutorial salva Oggetto state_dict nel formato float16 anziché nel formato float32 predefinito. Con questa configurazione, ciascun parametro utilizza 16 bit invece di 32 bit, per l'oggetto state_dict ha le dimensioni di metà. Una dimensione più piccola riduce al minimo il tempo necessario per caricare il modello. Tuttavia, la conversione del formato significa che la VM per inserire in memoria sia il modello che l'oggetto state_dict.

La tabella seguente mostra i requisiti minimi per caricare un modello dopo il L'oggetto state_dict è salvato nel formato float16. La tabella mostra anche consigliati per caricare un modello usando Vertex AI. La la dimensione minima (e predefinita) del disco per Vertex AI è 100 GB, ma alcune potrebbero richiedere un disco più grande.

Nome modello Memoria necessaria Tipo di macchina Memoria VM Disco VM
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

Carica l'oggetto state_dict del modello in Cloud Storage utilizzando un job personalizzato Vertex AI:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Sostituisci quanto segue:

  • MODEL_NAME: il nome del modello, ad esempio google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: tipo di macchina da eseguire il job personalizzato Vertex AI, ad esempio e2-highmem-4.
  • DISK_SIZE_GB: dimensione del disco per la VM, in GB. La dimensione minima è di 100 GB.

A seconda delle dimensioni del modello, il caricamento potrebbe richiedere alcuni minuti. Per visualizzare lo stato, vai a Job personalizzati di Vertex AI. .

Vai a Job personalizzati

esegui la pipeline.

Dopo aver caricato il modello, esegui la pipeline Dataflow. Per eseguire la pipeline, sia il modello sia la memoria utilizzata da ogni worker devono rientrare nella memoria.

La tabella seguente mostra i tipi di macchina consigliati per eseguire un'inferenza una pipeline o un blocco note personalizzato.

Nome modello Tipo di macchina Memoria VM
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Esegui la pipeline:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto
  • PROMPTS_TOPIC_ID: ID argomento per l'input da inviare al modello.
  • RESPONSES_TOPIC_ID: l'ID argomento per il valore del modello risposte
  • MODEL_NAME: il nome del modello, ad esempio google/flan-t5-xl
  • BUCKET_NAME: il nome del bucket
  • REGION: la regione in cui eseguire il deployment offerta di lavoro, ad esempio us-central1
  • DATAFLOW_MACHINE_TYPE: la VM per eseguire la pipeline on, come n2-highmem-4

Per garantire che il modello venga caricato una sola volta per worker e non esaurisca le risorse della memoria, configuri i worker in modo che utilizzino un singolo processo impostando la pipeline opzione --experiments=no_use_multiple_sdk_containers. Non devi limitare il numero di thread perché la trasformazione RunInference condivide lo stesso modello con più thread.

La pipeline in questo esempio viene eseguita con le CPU. Per un modello più grande, più tempo è necessari per elaborare ogni richiesta. Puoi abilita le GPU se hai bisogno di risposte più rapide.

Per visualizzare lo stato della pipeline, vai ai Job di Dataflow. .

Vai a Job

Fai una domanda al modello

Dopo l'avvio della pipeline, fornisci un prompt al modello e ricevi una risposta.

  1. Invia il tuo prompt pubblicando un messaggio in Pub/Sub. Utilizza il comando gcloud pubsub topics publish:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Sostituisci PROMPT_TEXT con una stringa contenente il prompt che vuoi fornire. Racchiudi il prompt tra virgolette.

    Utilizza il tuo prompt o prova uno dei seguenti esempi:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Per ottenere la risposta, utilizza il comando Comando gcloud pubsub subscriptions pull.

    A seconda delle dimensioni del modello, potrebbero essere necessari alcuni minuti modello per generare una risposta. I modelli più grandi richiedono più tempo per il deployment generano una risposta.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Sostituisci RESPONSES_SUBSCRIPTION_ID con l'ID sottoscrizione per le risposte del modello.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimina singole risorse

  1. Esci dall'ambiente virtuale Python:

    deactivate
  2. Arresta la pipeline:

    1. Elenca gli ID job per i job Dataflow in esecuzione e annotare l'ID per il job del tutorial:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Annulla il job:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Elimina il bucket e tutti gli elementi al suo interno:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Elimina gli argomenti e la sottoscrizione:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. (Facoltativo) Revoca i ruoli dal tuo Account Google.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Passaggi successivi