Ejecuta un LLM en una canalización de transmisión


En este instructivo, se muestra cómo ejecutar un modelo de lenguaje grande (LLM) en una canalización de transmisión de Dataflow con la API de RunInference de Apache Beam.

Para obtener más información sobre la API de RunInference, consulta Acerca de Beam ML en la documentación de Apache Beam.

El código está disponible en GitHub.

Objetivos

  • Crear temas y suscripciones de Pub/Sub para las entradas y respuestas del modelo
  • Cargar el modelo en Cloud Storage con un trabajo personalizado de Vertex AI
  • Ejecutar la canalización
  • Hacerle una pregunta al modelo y obtener una respuesta

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

Ejecuta este instructivo en una máquina que tenga al menos 5 GB de espacio libre en el disco para instalar las dependencias.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID de tu proyecto.
    • PROJECT_NUMBER: Es el número de tu proyecto. Para encontrar el número de tu proyecto, usa el comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: Es cada rol individual.
  17. Copia el ID del proyecto de Google Cloud. Necesitarás este valor más adelante en este instructivo.

Crea los recursos de Google Cloud

En esta sección, se explica cómo crear lo siguiente:

  • Un bucket de Cloud Storage para usar como ubicación de almacenamiento temporal
  • Un tema de Pub/Sub para las instrucciones del modelo
  • Un tema y una suscripción de Pub/Sub para las respuestas del modelo

Crea un bucket de Cloud Storage

Crea un bucket de Cloud Storage con gcloud CLI. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.

Para crear el bucket, usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Reemplaza lo siguiente:

  • BUCKET_NAME: Es un nombre para tu bucket de Cloud Storage que cumple con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.
  • LOCATION: la ubicación del bucket.

Copia el nombre del bucket. Necesitarás este valor más adelante en este instructivo.

Crea temas y suscripciones de Pub/Sub

Crea dos temas de Pub/Sub y una suscripción. Un tema es para las instrucciones de entrada que envías al modelo. El otro tema y su suscripción adjunta son para las respuestas del modelo.

  1. Para crear los temas, ejecuta el comando gcloud pubsub topics create dos veces, una vez por cada tema:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Reemplaza lo siguiente:

    • PROMPTS_TOPIC_ID: Es el ID del tema de los mensajes de entrada que se enviarán al modelo, como prompts.
    • RESPONSES_TOPIC_ID: Es el ID del tema para las respuestas del modelo, como responses.
  2. Para crear la suscripción y adjuntarla a tu tema de respuestas, usa el comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Reemplaza RESPONSES_SUBSCRIPTION_ID por el ID de suscripción de las respuestas del modelo, como responses-subscription.

Copia los IDs del tema y el ID de suscripción. Necesitarás estos valores más adelante en este instructivo.

Prepara el entorno

Descarga las muestras de código y, luego, configura tu entorno para ejecutar el instructivo.

Las muestras de código en el repositorio de GitHub python-docs-samples proporcionan el código que necesitas para ejecutar esta canalización. Cuando tengas todo listo para compilar tu propia canalización, puedes usar este código de muestra como plantilla.

Crea un entorno virtual de Python aislado para ejecutar tu proyecto de canalización con venv. Un entorno virtual te permite aislar las dependencias de un proyecto de las dependencias de otros proyectos. Para obtener más información sobre cómo instalar Python y crear un entorno virtual, consulta Configura un entorno de desarrollo de Python.

  1. Usa el comando git clone para clonar el repositorio de GitHub:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Navega al directorio run-inference:

    cd python-docs-samples/dataflow/run-inference
    
  3. Si usas un símbolo del sistema, verifica que Python 3 y pip se estén ejecutando en el sistema:

    python --version
    python -m pip --version
    

    Si es necesario, instala Python 3.

    Si usas Cloud Shell, puedes omitir este paso porque Cloud Shell ya tiene Python instalado.

  4. Crea un entorno virtual de Python:

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Instala las dependencias:

    pip install -r requirements.txt --no-cache-dir
    

Muestra de código de carga del modelo

El código de carga del modelo en este instructivo inicia un trabajo personalizado de Vertex AI que carga el objeto state_dict del modelo en Cloud Storage.

El archivo inicial se ve de la siguiente manera:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Muestra de código de canalización

El código de canalización de este instructivo implementa una canalización de Dataflow que hace lo siguiente:

  • Lee una instrucción de Pub/Sub y codifica el texto en tensores de tokens.
  • Ejecuta la transformación RunInference.
  • Decodifica los tensores de tokens de salida en texto y escribe la respuesta en Pub/Sub.

El archivo inicial se ve de la siguiente manera:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Carga el modelo

Los LLM pueden ser modelos muy grandes. Por lo general, los modelos más grandes que se entrenan con más parámetros proporcionan mejores resultados. Sin embargo, los modelos más grandes requieren una máquina más grande y más memoria para ejecutarse. Los modelos más grandes también pueden ser más lentos para ejecutarse en CPUs.

Antes de ejecutar un modelo de PyTorch en Dataflow, debes cargar el objeto state_dict del modelo. El objeto state_dict de un modelo almacena los pesos del modelo.

En una canalización de Dataflow que usa la transformación RunInference de Apache Beam, el objeto state_dict del modelo debe cargarse en Cloud Storage. La máquina que usas para cargar el objeto state_dict a Cloud Storage debe tener suficiente memoria para cargar el modelo. La máquina también necesita una conexión rápida a Internet para descargar los pesos y subirlos a Cloud Storage.

En la siguiente tabla, se muestra la cantidad de parámetros de cada modelo y la memoria mínima que se necesita para cargar cada modelo.

Modelo Parámetros Memoria necesaria
google/flan-t5-small 80 millones > 320 MB
google/flan-t5-base 250 millones > 1 GB
google/flan-t5-large 780 millones > 3.2 GB
google/flan-t5-xl 3,000 millones > 12 GB
google/flan-t5-xxl 11,000 millones > 44 GB
google/flan-ul2 20,000 millones > 80 GB

Aunque puedes cargar un modelo más pequeño de forma local, en este instructivo, se muestra cómo iniciar un trabajo personalizado de Vertex AI que carga el modelo con una VM del tamaño adecuado.

Debido a que los LLM pueden ser tan grandes, en el ejemplo de este instructivo, se guarda el objeto state_dict como formato float16 en lugar del formato float32 predeterminado. Con esta configuración, cada parámetro usa 16 bits en lugar de 32 bits, lo que hace que el objeto state_dict tenga la mitad del tamaño. Un tamaño más pequeño minimiza el tiempo necesario para cargar el modelo. Sin embargo, convertir el formato significa que la VM debe ajustar el modelo y el objeto state_dict en la memoria.

En la siguiente tabla, se muestran los requisitos mínimos para cargar un modelo después de que el objeto state_dict se guarda como formato float16. En la tabla, también se muestran los tipos de máquinas sugeridos para cargar un modelo con Vertex AI. El tamaño mínimo (y predeterminado) de disco para Vertex AI es de 100 GB, pero algunos modelos pueden requerir un disco más grande.

Nombre del modelo Memoria necesaria Tipo de máquina Memoria de la VM Disco de VM
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1.5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4.8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

Carga el objeto state_dict del modelo en Cloud Storage a través de un trabajo personalizado de Vertex AI:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Reemplaza lo siguiente:

  • MODEL_NAME: Es el nombre del modelo, como google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: Es el tipo de máquina en la que se ejecutará el trabajo personalizado de Vertex AI, como e2-highmem-4.
  • DISK_SIZE_GB: El tamaño del disco de la VM, en GB. El tamaño mínimo es de 100 GB.

Según el tamaño del modelo, es posible que tarde unos minutos en cargarse. Para ver el estado, ve a la página Trabajos personalizados de Vertex AI.

Ve a Trabajos personalizados

Ejecuta la canalización

Después de cargar el modelo, debes ejecutar la canalización de Dataflow. Para hacerlo, tanto el modelo como la memoria que usa cada trabajador deben caber en la memoria.

En la siguiente tabla, se muestran los tipos de máquinas recomendados para ejecutar una canalización de inferencia.

Nombre del modelo Tipo de máquina Memoria de la VM
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Ejecuta la canalización:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Reemplaza lo siguiente:

  • PROJECT_ID: el ID del proyecto
  • PROMPTS_TOPIC_ID: Es el ID del tema de las instrucciones de entrada que se enviarán al modelo.
  • RESPONSES_TOPIC_ID: Es el ID del tema para las respuestas del modelo.
  • MODEL_NAME: Es el nombre del modelo, como google/flan-t5-xl.
  • BUCKET_NAME: Es el nombre del bucket.
  • REGION: la región en la que se implementará el trabajo, como us-central1.
  • DATAFLOW_MACHINE_TYPE: Es la VM en la que se ejecutará la canalización, como n2-highmem-4.

Para asegurarte de que el modelo se cargue solo una vez por trabajador y no se quede sin memoria, configura los trabajadores para que usen un solo proceso a través de la configuración de la opción de canalización --experiments=no_use_multiple_sdk_containers. No es necesario que limites la cantidad de subprocesos porque la transformación RunInference comparte el mismo modelo con varios subprocesos.

La canalización de este ejemplo se ejecuta con CPUs. Para un modelo más grande, se requiere más tiempo para procesar cada solicitud. Puedes habilitar las GPU si necesitas respuestas más rápidas.

Para ver el estado de la canalización, ve a la página Trabajos de Dataflow.

Ir a Trabajos

Hazle una pregunta al modelo

Después de que la canalización comience a ejecutarse, le proporcionas una instrucción al modelo y recibes una respuesta.

  1. Para enviar tu instrucción, publica un mensaje en Pub/Sub. Usa el comando gcloud pubsub topics publish:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Reemplaza PROMPT_TEXT por una cadena que contenga el mensaje que deseas proporcionar. Encierra el mensaje entre comillas.

    Usa tu propio mensaje o prueba uno de los siguientes ejemplos:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Para obtener la respuesta, usa el comando gcloud pubsub subscriptions pull.

    Según el tamaño del modelo, el modelo puede tardar unos minutos en generar una respuesta. Los modelos más grandes tardan más en implementarse y generar una respuesta.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Reemplaza RESPONSES_SUBSCRIPTION_ID por el ID de suscripción de las respuestas del modelo.

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Borra los recursos individuales

  1. Sal del entorno virtual de Python:

    deactivate
  2. Detén la canalización:

    1. Enumera los ID de los trabajos de Dataflow que se están ejecutando y, luego, toma nota del ID del trabajo del instructivo:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Cancela el trabajo:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Borra el bucket y su contenido:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Borra los temas y la suscripción:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. Opcional: Revoca los roles de tu Cuenta de Google.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Próximos pasos