Ejecutar un LLM en un flujo de procesamiento de streaming


En este tutorial se muestra cómo ejecutar un modelo de lenguaje extenso (LLM) en una canalización de Dataflow de streaming mediante la API RunInference de Apache Beam.

Para obtener más información sobre la API RunInference, consulta la sección Acerca de Beam ML de la documentación de Apache Beam.

El código de ejemplo está disponible en GitHub.

Objetivos

  • Crea temas y suscripciones de Pub/Sub para las entradas y las respuestas del modelo.
  • Carga el modelo en Cloud Storage mediante un trabajo personalizado de Vertex AI.
  • Ejecuta el flujo de procesamiento.
  • Hazle una pregunta al modelo y obtén una respuesta.

Costes

En este documento, se usan los siguientes componentes facturables de Google Cloud Platform:

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.

Antes de empezar

Sigue este tutorial en un equipo que tenga al menos 5 GB de espacio libre en el disco para instalar las dependencias.

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  12. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Haz los cambios siguientes:

    • PROJECT_ID: tu ID de proyecto.
    • PROJECT_NUMBER: tu número de proyecto. Para encontrar el número de tu proyecto, usa el comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: cada rol individual.
  19. Copia el Google Cloud ID del proyecto. Necesitarás este valor más adelante en este tutorial.
  20. Crear los recursos de Google Cloud Platform

    En esta sección se explica cómo crear los siguientes recursos:

    • Un segmento de Cloud Storage que se usará como ubicación de almacenamiento temporal
    • Un tema de Pub/Sub para las peticiones del modelo
    • Un tema y una suscripción de Pub/Sub para las respuestas del modelo

    Crea un segmento de Cloud Storage

    Crea un segmento de Cloud Storage con gcloud CLI. Este segmento se usa como ubicación de almacenamiento temporal en la canalización de Dataflow.

    Para crear el segmento, usa el comando gcloud storage buckets create:

    gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
    

    Haz los cambios siguientes:

    Copia el nombre del segmento. Necesitarás este valor más adelante en este tutorial.

    Crear temas y suscripciones de Pub/Sub

    Crea dos temas de Pub/Sub y una suscripción. Un tema es para las peticiones que envías al modelo. El otro tema y la suscripción adjunta son para las respuestas del modelo.

    1. Para crear los temas, ejecuta el comando gcloud pubsub topics create dos veces, una por cada tema:

      gcloud pubsub topics create PROMPTS_TOPIC_ID
      gcloud pubsub topics create RESPONSES_TOPIC_ID
      

      Haz los cambios siguientes:

      • PROMPTS_TOPIC_ID: el ID del tema de las peticiones de entrada que se enviarán al modelo, como prompts
      • RESPONSES_TOPIC_ID: el ID del tema de las respuestas del modelo, como responses
    2. Para crear la suscripción y adjuntarla al tema de respuestas, usa el gcloud pubsub subscriptions create comando:

      gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
      

      Sustituye RESPONSES_SUBSCRIPTION_ID por el ID de suscripción de las respuestas del modelo, como responses-subscription.

    Copia los IDs de tema y el ID de suscripción. Necesitarás estos valores más adelante en este tutorial.

    Prepara tu entorno

    Descarga los ejemplos de código y configura tu entorno para seguir el tutorial.

    Los ejemplos de código del repositorio de GitHub python-docs-samples proporcionan el código que necesitas para ejecutar esta canalización. Cuando quieras crear tu propio flujo de procesamiento, puedes usar este código de ejemplo como plantilla.

    Para ejecutar tu proyecto de flujo de procesamiento, crea un entorno virtual de Python aislado con venv. Un entorno virtual te permite aislar las dependencias de un proyecto de las de otros proyectos. Para obtener más información sobre cómo instalar Python y crear un entorno virtual, consulta Configurar un entorno de desarrollo de Python.

    1. Usa el comando git clone para clonar el repositorio de GitHub:

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      
    2. Ve al directorio run-inference:

      cd python-docs-samples/dataflow/run-inference
      
    3. Si usas un símbolo del sistema, comprueba que tengas Python 3 y pip en tu sistema:

      python --version
      python -m pip --version
      

      Si es necesario, instala Python 3.

      Si usas Cloud Shell, puedes saltarte este paso porque ya tiene Python instalado.

    4. Crea un entorno virtual de Python:

      python -m venv /tmp/env
      source /tmp/env/bin/activate
      
    5. Instala las dependencias:

      pip install -r requirements.txt --no-cache-dir
      

    Código de ejemplo de carga de modelos

    El código de carga del modelo de este tutorial inicia un trabajo personalizado de Vertex AI que carga el objeto state_dict del modelo en Cloud Storage.

    El archivo de inicio tiene el siguiente aspecto:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Loads the state_dict for an LLM model into Cloud Storage."""
    
    from __future__ import annotations
    
    import os
    
    import torch
    from transformers import AutoModelForSeq2SeqLM
    
    
    def run_local(model_name: str, state_dict_path: str) -> None:
        """Loads the state dict and saves it into the desired path.
    
        If the `state_dict_path` is a Cloud Storage location starting
        with "gs://", this assumes Cloud Storage is mounted with
        Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        """
        print(f"Loading model: {model_name}")
        model = AutoModelForSeq2SeqLM.from_pretrained(
            model_name, torch_dtype=torch.bfloat16
        )
        print(f"Model loaded, saving state dict to: {state_dict_path}")
    
        # Assume Cloud Storage FUSE is mounted in `/gcs`.
        state_dict_path = state_dict_path.replace("gs://", "/gcs/")
        directory = os.path.dirname(state_dict_path)
        if directory and not os.path.exists(directory):
            os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
        torch.save(model.state_dict(), state_dict_path)
        print("State dict saved successfully!")
    
    
    def run_vertex_job(
        model_name: str,
        state_dict_path: str,
        job_name: str,
        project: str,
        bucket: str,
        location: str = "us-central1",
        machine_type: str = "e2-highmem-2",
        disk_size_gb: int = 100,
    ) -> None:
        """Launches a Vertex AI custom job to load the state dict.
    
        If the model is too large to fit into memory or disk, we can launch
        a Vertex AI custom job with a large enough VM for this to work.
    
        Depending on the model's size, it might require a different VM
        configuration. The model MUST fit into the VM's memory, and there
        must be enough disk space to stage the entire model while it gets
        copied to Cloud Storage.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            job_name: Job display name in the Vertex AI console.
            project: Google Cloud Project ID.
            bucket: Cloud Storage bucket name, without the "gs://" prefix.
            location: Google Cloud regional location.
            machine_type: Machine type for the VM to run the job.
            disk_size_gb: Disk size in GB for the VM to run the job.
        """
        from google.cloud import aiplatform
    
        aiplatform.init(project=project, staging_bucket=bucket, location=location)
    
        job = aiplatform.CustomJob.from_local_script(
            display_name=job_name,
            container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
            script_path="download_model.py",
            args=[
                "local",
                f"--model-name={model_name}",
                f"--state-dict-path={state_dict_path}",
            ],
            machine_type=machine_type,
            boot_disk_size_gb=disk_size_gb,
            requirements=["transformers"],
        )
        job.run()
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        subparsers = parser.add_subparsers(required=True)
    
        parser_local = subparsers.add_parser("local")
        parser_local.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_local.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_local.set_defaults(run=run_local)
    
        parser_vertex = subparsers.add_parser("vertex")
        parser_vertex.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_vertex.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_vertex.add_argument(
            "--job-name", required=True, help="Job display name in the Vertex AI console"
        )
        parser_vertex.add_argument(
            "--project", required=True, help="Google Cloud Project ID"
        )
        parser_vertex.add_argument(
            "--bucket",
            required=True,
            help='Cloud Storage bucket name, without the "gs://" prefix',
        )
        parser_vertex.add_argument(
            "--location", default="us-central1", help="Google Cloud regional location"
        )
        parser_vertex.add_argument(
            "--machine-type",
            default="e2-highmem-2",
            help="Machine type for the VM to run the job",
        )
        parser_vertex.add_argument(
            "--disk-size-gb",
            type=int,
            default=100,
            help="Disk size in GB for the VM to run the job",
        )
        parser_vertex.set_defaults(run=run_vertex_job)
    
        args = parser.parse_args()
        kwargs = args.__dict__.copy()
        kwargs.pop("run")
    
        args.run(**kwargs)
    

    Código de ejemplo de una canalización

    El código del flujo de procesamiento de este tutorial implementa un flujo de procesamiento de Dataflow que hace lo siguiente:

    • Lee una petición de Pub/Sub y codifica el texto en tensores de tokens.
    • Ejecuta la transformación RunInference.
    • Decodifica los tensores de tokens de salida en texto y escribe la respuesta en Pub/Sub.

    El archivo de inicio tiene el siguiente aspecto:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Runs a streaming RunInference Language Model pipeline."""
    
    from __future__ import annotations
    
    import logging
    
    import apache_beam as beam
    from apache_beam.ml.inference.base import PredictionResult
    from apache_beam.ml.inference.base import RunInference
    from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
    from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
    from apache_beam.options.pipeline_options import PipelineOptions
    import torch
    from transformers import AutoConfig
    from transformers import AutoModelForSeq2SeqLM
    from transformers import AutoTokenizer
    from transformers.tokenization_utils import PreTrainedTokenizer
    
    MAX_RESPONSE_TOKENS = 256
    
    
    def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
        """Encodes input text into token tensors.
    
        Args:
            input_text: Input text for the language model.
            tokenizer: Tokenizer for the language model.
    
        Returns: Tokenized input tokens.
        """
        return tokenizer(input_text, return_tensors="pt").input_ids[0]
    
    
    def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
        """Decodes output token tensors into text.
    
        Args:
            result: Prediction results from the RunInference transform.
            tokenizer: Tokenizer for the language model.
    
        Returns: The model's response as text.
        """
        output_tokens = result.inference
        return tokenizer.decode(output_tokens, skip_special_tokens=True)
    
    
    class AskModel(beam.PTransform):
        """Asks an language model a prompt message and gets its responses.
    
        Attributes:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            max_response_tokens: Maximum number of tokens for the model to generate.
        """
    
        def __init__(
            self,
            model_name: str,
            state_dict_path: str,
            max_response_tokens: int = MAX_RESPONSE_TOKENS,
        ) -> None:
            self.model_handler = PytorchModelHandlerTensor(
                state_dict_path=state_dict_path,
                model_class=AutoModelForSeq2SeqLM.from_config,
                model_params={"config": AutoConfig.from_pretrained(model_name)},
                inference_fn=make_tensor_model_fn("generate"),
            )
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.max_response_tokens = max_response_tokens
    
        def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
            return (
                pcollection
                | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
                | "RunInference"
                >> RunInference(
                    self.model_handler,
                    inference_args={"max_new_tokens": self.max_response_tokens},
                )
                | "Get response" >> beam.Map(decode_response, self.tokenizer)
            )
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--messages-topic",
            required=True,
            help="Pub/Sub topic for input text messages",
        )
        parser.add_argument(
            "--responses-topic",
            required=True,
            help="Pub/Sub topic for output text responses",
        )
        parser.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        args, beam_args = parser.parse_known_args()
    
        logging.getLogger().setLevel(logging.INFO)
        beam_options = PipelineOptions(
            beam_args,
            pickle_library="cloudpickle",
            streaming=True,
        )
    
        simple_name = args.model_name.split("/")[-1]
        pipeline = beam.Pipeline(options=beam_options)
        _ = (
            pipeline
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
            | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
            | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
            | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
            | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
        )
        pipeline.run()
    

    Cargar el modelo

    Los LLMs pueden ser modelos muy grandes. Los modelos más grandes que se entrenan con más parámetros suelen dar mejores resultados. Sin embargo, los modelos más grandes requieren una máquina más grande y más memoria para ejecutarse. Los modelos más grandes también pueden ser más lentos en las CPUs.

    Antes de ejecutar un modelo de PyTorch en Dataflow, debes cargar el objeto state_dict del modelo. El objeto state_dict de un modelo almacena las ponderaciones del modelo.

    En un flujo de procesamiento de Dataflow que usa la transformación RunInference de Apache Beam, el objeto state_dict del modelo debe cargarse en Cloud Storage. La máquina que uses para cargar el objeto state_dict en Cloud Storage debe tener suficiente memoria para cargar el modelo. La máquina también necesita una conexión a Internet rápida para descargar los pesos y subirlos a Cloud Storage.

    En la siguiente tabla se muestra el número de parámetros de cada modelo y la memoria mínima necesaria para cargar cada modelo.

    Modelo Parámetros Memoria necesaria
    google/flan-t5-small 80 millones > 320 MB
    google/flan-t5-base 250 millones > 1 GB
    google/flan-t5-large 780 millones > 3,2 GB
    google/flan-t5-xl 3000 millones > 12 GB
    google/flan-t5-xxl 11.000 millones > 44 GB
    google/flan-ul2 20.000 millones > 80 GB

    Aunque puedes cargar un modelo más pequeño de forma local, en este tutorial se muestra cómo iniciar un trabajo personalizado de Vertex AI que cargue el modelo con una VM del tamaño adecuado.

    Como los LLMs pueden ser muy grandes, en el ejemplo de este tutorial se guarda el objeto state_dict en formato float16 en lugar del formato float32 predeterminado. Con esta configuración, cada parámetro usa 16 bits en lugar de 32, lo que hace que el objeto state_dict tenga la mitad de tamaño. Un tamaño más pequeño minimiza el tiempo necesario para cargar el modelo. Sin embargo, al convertir el formato, la máquina virtual tiene que ajustar tanto el modelo como el objeto state_dict en la memoria.

    En la siguiente tabla se muestran los requisitos mínimos para cargar un modelo después de guardar el objeto state_dict en formato float16. En la tabla también se muestran los tipos de máquinas sugeridos para cargar un modelo con Vertex AI. El tamaño de disco mínimo (y predeterminado) de Vertex AI es de 100 GB, pero algunos modelos pueden requerir un disco más grande.

    Nombre del modelo Memoria necesaria Tipo de máquina Memoria de la máquina virtual Disco de VM
    google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
    google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
    google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
    google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

    Carga el objeto state_dict del modelo en Cloud Storage mediante un trabajo personalizado de Vertex AI:

    python download_model.py vertex \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --job-name="Load MODEL_NAME" \
        --project="PROJECT_ID" \
        --bucket="BUCKET_NAME" \
        --location="LOCATION" \
        --machine-type="VERTEX_AI_MACHINE_TYPE" \
        --disk-size-gb="DISK_SIZE_GB"
    

    Haz los cambios siguientes:

    • MODEL_NAME: el nombre del modelo, como google/flan-t5-xl.
    • VERTEX_AI_MACHINE_TYPE: el tipo de máquina en el que se ejecutará el trabajo personalizado de Vertex AI, como e2-highmem-4.
    • DISK_SIZE_GB: tamaño del disco de la VM en GB. El tamaño mínimo es de 100 GB.

    En función del tamaño del modelo, puede tardar unos minutos en cargarse. Para ver el estado, ve a la página Trabajos personalizados de Vertex AI.

    Ir a Tareas personalizadas

    Ejecutar el flujo de procesamiento

    Una vez que hayas cargado el modelo, ejecuta el flujo de procesamiento de Dataflow. Para ejecutar la canalización, tanto el modelo como la memoria utilizada por cada trabajador deben caber en la memoria.

    En la siguiente tabla se muestran los tipos de máquinas recomendados para ejecutar una canalización de inferencia.

    Nombre del modelo Tipo de máquina Memoria de la máquina virtual
    google/flan-t5-small n2-highmem-2 16 GB
    google/flan-t5-base n2-highmem-2 16 GB
    google/flan-t5-large n2-highmem-4 32 GB
    google/flan-t5-xl n2-highmem-4 32 GB
    google/flan-t5-xxl n2-highmem-8 64 GB
    google/flan-ul2 n2-highmem-16 128 GB

    Ejecuta el flujo de procesamiento:

    python main.py \
        --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
        --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --runner="DataflowRunner" \
        --project="PROJECT_ID" \
        --temp_location="gs://BUCKET_NAME/temp" \
        --region="REGION" \
        --machine_type="DATAFLOW_MACHINE_TYPE" \
        --requirements_file="requirements.txt" \
        --requirements_cache="skip" \
        --experiments="use_sibling_sdk_workers" \
        --experiments="no_use_multiple_sdk_containers"
    

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto
    • PROMPTS_TOPIC_ID: el ID del tema de las peticiones de entrada que se van a enviar al modelo.
    • RESPONSES_TOPIC_ID: el ID del tema de las respuestas del modelo
    • MODEL_NAME: el nombre del modelo, como google/flan-t5-xl
    • BUCKET_NAME: el nombre del segmento
    • REGION: la región en la que se va a implementar el trabajo, como us-central1
    • DATAFLOW_MACHINE_TYPE: la máquina virtual en la que se ejecutará la canalización, como n2-highmem-4

    Para asegurarte de que el modelo se carga solo una vez por trabajador y no se queda sin memoria, configura los trabajadores para que usen un solo proceso definiendo la opción de la canalización --experiments=no_use_multiple_sdk_containers. No tienes que limitar el número de hilos porque la transformación RunInference comparte el mismo modelo con varios hilos.

    El flujo de procesamiento de este ejemplo se ejecuta con CPUs. En el caso de los modelos más grandes, se necesita más tiempo para procesar cada solicitud. Puedes habilitar las GPUs si necesitas respuestas más rápidas.

    Para ver el estado de la canalización, vaya a la página Trabajos de Dataflow.

    Ir a Tareas

    Hacer una pregunta al modelo

    Una vez que se haya iniciado el proceso, proporciona una petición al modelo y recibe una respuesta.

    1. Envía tu petición publicando un mensaje en Pub/Sub. Usa el comando gcloud pubsub topics publish:

      gcloud pubsub topics publish PROMPTS_TOPIC_ID \
          --message="PROMPT_TEXT"
      

      Sustituye PROMPT_TEXT por una cadena que contenga la petición que quieras proporcionar. Incluye la petición entre comillas.

      Usa tu propia petición o prueba con uno de los siguientes ejemplos:

      • Translate to Spanish: My name is Luka
      • Complete this sentence: Once upon a time, there was a
      • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
    2. Para obtener la respuesta, usa el comando gcloud pubsub subscriptions pull.

      En función del tamaño del modelo, puede que tarde unos minutos en generar una respuesta. Los modelos más grandes tardan más en implementarse y en generar una respuesta.

      gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
      

      Sustituye RESPONSES_SUBSCRIPTION_ID por el ID de suscripción de las respuestas del modelo.

    Limpieza

    Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.

    Eliminar el proyecto

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Eliminar recursos concretos

    1. Sal del entorno virtual de Python:

      deactivate
    2. Detener el flujo de procesamiento:

      1. Consulta los IDs de las tareas de Dataflow que se están ejecutando y anota el ID de la tarea del tutorial:

        gcloud dataflow jobs list --region=REGION --status=active
      2. Cancelar la tarea:

        gcloud dataflow jobs cancel JOB_ID --region=REGION
    3. Elimina el segmento y todo lo que contenga:

      gcloud storage rm gs://BUCKET_NAME --recursive
    4. Elimina los temas y la suscripción:

      gcloud pubsub topics delete PROMPTS_TOPIC_ID
      gcloud pubsub topics delete RESPONSES_TOPIC_ID
      gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
    5. Revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.admin
      • roles/pubsub.editor
      • roles/aiplatform.user
      gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
    6. Opcional: Revoca los roles de tu cuenta de Google.

      gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
    7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

      gcloud auth application-default revoke
    8. Optional: Revoke credentials from the gcloud CLI.

      gcloud auth revoke

    Siguientes pasos