Execute um GML numa pipeline de streaming

Este tutorial mostra como executar um modelo de linguagem grande (LLM) num pipeline do Dataflow de streaming usando a API RunInference do Apache Beam.

Para mais informações sobre a API RunInference, consulte o artigo Acerca do Beam ML na documentação do Apache Beam.

O código de exemplo está disponível no GitHub.

Objetivos

  • Crie tópicos e subscrições Pub/Sub para a entrada e as respostas do modelo.
  • Carregue o modelo para o Cloud Storage através de uma tarefa personalizada do Vertex AI.
  • Execute a conduta.
  • Fazer uma pergunta ao modelo e receber uma resposta.

Custos

Neste documento, usa os seguintes componentes faturáveis da Google Cloud Platform:

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

Execute este tutorial num computador com, pelo menos, 5 GB de espaço livre no disco para instalar as dependências.

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  12. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Conceda funções à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Substitua o seguinte:

    • PROJECT_ID: o ID do seu projeto.
    • PROJECT_NUMBER: o número do seu projeto. Para encontrar o número do projeto, use o comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: cada função individual.
  19. Copie o Google Cloud ID do projeto. Vai precisar deste valor mais tarde neste tutorial.
  20. Crie os recursos da Google Cloud Platform

    Esta secção explica como criar os seguintes recursos:

    • Um contentor do Cloud Storage para usar como localização de armazenamento temporário
    • Um tópico do Pub/Sub para os comandos do modelo
    • Um tópico e uma subscrição do Pub/Sub para as respostas do modelo

    Crie um contentor do Cloud Storage

    Crie um contentor do Cloud Storage através da CLI gcloud. Este contentor é usado como uma localização de armazenamento temporário pelo pipeline do Dataflow.

    Para criar o contentor, use o comando gcloud storage buckets create:

    gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
    

    Substitua o seguinte:

    Copie o nome do contentor. Vai precisar deste valor mais tarde neste tutorial.

    Crie tópicos e subscrições do Pub/Sub

    Crie dois tópicos Pub/Sub e uma subscrição. Um tópico destina-se aos comandos de entrada que envia para o modelo. O outro tópico e a respetiva subscrição anexada destinam-se às respostas do modelo.

    1. Para criar os tópicos, execute o comando gcloud pubsub topics create duas vezes, uma para cada tópico:

      gcloud pubsub topics create PROMPTS_TOPIC_ID
      gcloud pubsub topics create RESPONSES_TOPIC_ID
      

      Substitua o seguinte:

      • PROMPTS_TOPIC_ID: o ID do tópico para os comandos de entrada a enviar para o modelo, como prompts
      • RESPONSES_TOPIC_ID: o ID do tópico para as respostas do modelo, como responses
    2. Para criar a subscrição e anexá-la ao tópico de respostas, use o comando gcloud pubsub subscriptions create:

      gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
      

      Substitua RESPONSES_SUBSCRIPTION_ID pelo ID da subscrição das respostas do modelo, como responses-subscription.

    Copie os IDs dos tópicos e o ID da subscrição. Vai precisar destes valores mais tarde neste tutorial.

    Prepare o seu ambiente

    Transfira os exemplos de código e, em seguida, configure o seu ambiente para executar o tutorial.

    Os exemplos de código no repositório do GitHub python-docs-samples fornecem o código de que precisa para executar este pipeline. Quando estiver tudo pronto para criar o seu próprio pipeline, pode usar este exemplo de código como modelo.

    Cria um ambiente virtual Python isolado para executar o seu projeto de pipeline usando o venv. Um ambiente virtual permite-lhe isolar as dependências de um projeto das dependências de outros projetos. Para mais informações sobre como instalar o Python e criar um ambiente virtual, consulte o artigo Configurar um ambiente de desenvolvimento Python.

    1. Use o comando git clone para clonar o repositório do GitHub:

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      
    2. Navegue para o diretório run-inference:

      cd python-docs-samples/dataflow/run-inference
      
    3. Se estiver a usar uma linha de comandos, verifique se tem o Python 3 e o pip em execução no seu sistema:

      python --version
      python -m pip --version
      

      Se necessário, instale o Python 3.

      Se estiver a usar o Cloud Shell, pode ignorar este passo porque o Cloud Shell já tem o Python instalado.

    4. Crie um ambiente virtual Python:

      python -m venv /tmp/env
      source /tmp/env/bin/activate
      
    5. Instale as dependências:

      pip install -r requirements.txt --no-cache-dir
      

    Exemplo de código de carregamento do modelo

    O código de carregamento do modelo neste tutorial inicia uma tarefa personalizada do Vertex AI que carrega o objeto state_dict do modelo para o Cloud Storage.

    O ficheiro inicial tem o seguinte aspeto:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Loads the state_dict for an LLM model into Cloud Storage."""
    
    from __future__ import annotations
    
    import os
    
    import torch
    from transformers import AutoModelForSeq2SeqLM
    
    
    def run_local(model_name: str, state_dict_path: str) -> None:
        """Loads the state dict and saves it into the desired path.
    
        If the `state_dict_path` is a Cloud Storage location starting
        with "gs://", this assumes Cloud Storage is mounted with
        Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        """
        print(f"Loading model: {model_name}")
        model = AutoModelForSeq2SeqLM.from_pretrained(
            model_name, torch_dtype=torch.bfloat16
        )
        print(f"Model loaded, saving state dict to: {state_dict_path}")
    
        # Assume Cloud Storage FUSE is mounted in `/gcs`.
        state_dict_path = state_dict_path.replace("gs://", "/gcs/")
        directory = os.path.dirname(state_dict_path)
        if directory and not os.path.exists(directory):
            os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
        torch.save(model.state_dict(), state_dict_path)
        print("State dict saved successfully!")
    
    
    def run_vertex_job(
        model_name: str,
        state_dict_path: str,
        job_name: str,
        project: str,
        bucket: str,
        location: str = "us-central1",
        machine_type: str = "e2-highmem-2",
        disk_size_gb: int = 100,
    ) -> None:
        """Launches a Vertex AI custom job to load the state dict.
    
        If the model is too large to fit into memory or disk, we can launch
        a Vertex AI custom job with a large enough VM for this to work.
    
        Depending on the model's size, it might require a different VM
        configuration. The model MUST fit into the VM's memory, and there
        must be enough disk space to stage the entire model while it gets
        copied to Cloud Storage.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            job_name: Job display name in the Vertex AI console.
            project: Google Cloud Project ID.
            bucket: Cloud Storage bucket name, without the "gs://" prefix.
            location: Google Cloud regional location.
            machine_type: Machine type for the VM to run the job.
            disk_size_gb: Disk size in GB for the VM to run the job.
        """
        from google.cloud import aiplatform
    
        aiplatform.init(project=project, staging_bucket=bucket, location=location)
    
        job = aiplatform.CustomJob.from_local_script(
            display_name=job_name,
            container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
            script_path="download_model.py",
            args=[
                "local",
                f"--model-name={model_name}",
                f"--state-dict-path={state_dict_path}",
            ],
            machine_type=machine_type,
            boot_disk_size_gb=disk_size_gb,
            requirements=["transformers"],
        )
        job.run()
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        subparsers = parser.add_subparsers(required=True)
    
        parser_local = subparsers.add_parser("local")
        parser_local.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_local.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_local.set_defaults(run=run_local)
    
        parser_vertex = subparsers.add_parser("vertex")
        parser_vertex.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_vertex.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_vertex.add_argument(
            "--job-name", required=True, help="Job display name in the Vertex AI console"
        )
        parser_vertex.add_argument(
            "--project", required=True, help="Google Cloud Project ID"
        )
        parser_vertex.add_argument(
            "--bucket",
            required=True,
            help='Cloud Storage bucket name, without the "gs://" prefix',
        )
        parser_vertex.add_argument(
            "--location", default="us-central1", help="Google Cloud regional location"
        )
        parser_vertex.add_argument(
            "--machine-type",
            default="e2-highmem-2",
            help="Machine type for the VM to run the job",
        )
        parser_vertex.add_argument(
            "--disk-size-gb",
            type=int,
            default=100,
            help="Disk size in GB for the VM to run the job",
        )
        parser_vertex.set_defaults(run=run_vertex_job)
    
        args = parser.parse_args()
        kwargs = args.__dict__.copy()
        kwargs.pop("run")
    
        args.run(**kwargs)
    

    Exemplo de código de pipeline

    O código do pipeline neste tutorial implementa um pipeline do Dataflow que faz o seguinte:

    • Lê um comando do Pub/Sub e codifica o texto em tensores de tokens.
    • Executa a transformação RunInference.
    • Descodifica os tensores de tokens de saída em texto e escreve a resposta no Pub/Sub.

    O ficheiro inicial tem o seguinte aspeto:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Runs a streaming RunInference Language Model pipeline."""
    
    from __future__ import annotations
    
    import logging
    
    import apache_beam as beam
    from apache_beam.ml.inference.base import PredictionResult
    from apache_beam.ml.inference.base import RunInference
    from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
    from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
    from apache_beam.options.pipeline_options import PipelineOptions
    import torch
    from transformers import AutoConfig
    from transformers import AutoModelForSeq2SeqLM
    from transformers import AutoTokenizer
    from transformers.tokenization_utils import PreTrainedTokenizer
    
    MAX_RESPONSE_TOKENS = 256
    
    
    def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
        """Encodes input text into token tensors.
    
        Args:
            input_text: Input text for the language model.
            tokenizer: Tokenizer for the language model.
    
        Returns: Tokenized input tokens.
        """
        return tokenizer(input_text, return_tensors="pt").input_ids[0]
    
    
    def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
        """Decodes output token tensors into text.
    
        Args:
            result: Prediction results from the RunInference transform.
            tokenizer: Tokenizer for the language model.
    
        Returns: The model's response as text.
        """
        output_tokens = result.inference
        return tokenizer.decode(output_tokens, skip_special_tokens=True)
    
    
    class AskModel(beam.PTransform):
        """Asks an language model a prompt message and gets its responses.
    
        Attributes:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            max_response_tokens: Maximum number of tokens for the model to generate.
        """
    
        def __init__(
            self,
            model_name: str,
            state_dict_path: str,
            max_response_tokens: int = MAX_RESPONSE_TOKENS,
        ) -> None:
            self.model_handler = PytorchModelHandlerTensor(
                state_dict_path=state_dict_path,
                model_class=AutoModelForSeq2SeqLM.from_config,
                model_params={"config": AutoConfig.from_pretrained(model_name)},
                inference_fn=make_tensor_model_fn("generate"),
            )
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.max_response_tokens = max_response_tokens
    
        def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
            return (
                pcollection
                | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
                | "RunInference"
                >> RunInference(
                    self.model_handler,
                    inference_args={"max_new_tokens": self.max_response_tokens},
                )
                | "Get response" >> beam.Map(decode_response, self.tokenizer)
            )
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--messages-topic",
            required=True,
            help="Pub/Sub topic for input text messages",
        )
        parser.add_argument(
            "--responses-topic",
            required=True,
            help="Pub/Sub topic for output text responses",
        )
        parser.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        args, beam_args = parser.parse_known_args()
    
        logging.getLogger().setLevel(logging.INFO)
        beam_options = PipelineOptions(
            beam_args,
            pickle_library="cloudpickle",
            streaming=True,
        )
    
        simple_name = args.model_name.split("/")[-1]
        pipeline = beam.Pipeline(options=beam_options)
        _ = (
            pipeline
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
            | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
            | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
            | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
            | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
        )
        pipeline.run()
    

    Carregue o modelo

    Os GMLs podem ser modelos muito grandes. Os modelos maiores que são preparados com mais parâmetros geralmente dão melhores resultados. No entanto, os modelos maiores requerem uma máquina maior e mais memória para serem executados. Os modelos maiores também podem ser mais lentos de executar nas CPUs.

    Antes de executar um modelo do PyTorch no Dataflow, tem de carregar o objeto state_dict do modelo. Um state_dictobjeto de um modelo armazena as ponderações do modelo.

    Num pipeline do Dataflow que usa a transformação RunInference do Apache Beam, o objeto state_dict do modelo tem de ser carregado para o Cloud Storage. A máquina que usa para carregar o objetostate_dict para o Cloud Storage tem de ter memória suficiente para carregar o modelo. A máquina também precisa de uma ligação à Internet rápida para transferir os pesos e carregá-los para o Cloud Storage.

    A tabela seguinte mostra o número de parâmetros para cada modelo e a memória mínima necessária para carregar cada modelo.

    Modelo Parâmetros Memória necessária
    google/flan-t5-small 80 milhões > 320 MB
    google/flan-t5-base 250 milhões > 1 GB
    google/flan-t5-large 780 milhões > 3,2 GB
    google/flan-t5-xl 3 mil milhões > 12 GB
    google/flan-t5-xxl 11 mil milhões > 44 GB
    google/flan-ul2 20 mil milhões > 80 GB

    Embora possa carregar um modelo mais pequeno localmente, este tutorial mostra como iniciar uma tarefa personalizada do Vertex AI que carrega o modelo com uma VM de tamanho adequado.

    Uma vez que os MDIs podem ser tão grandes, o exemplo neste tutorial guarda o objeto state_dict no formato float16 em vez do formato float32 predefinido. Com esta configuração, cada parâmetro usa 16 bits em vez de 32 bits, o que faz com que o objeto state_dict tenha metade do tamanho. Um tamanho mais pequeno minimiza o tempo necessário para carregar o modelo. No entanto, a conversão do formato significa que a MV tem de ajustar o modelo e o objeto state_dict à memória.

    A tabela seguinte mostra os requisitos mínimos para carregar um modelo depois de o objeto state_dict ser guardado no formato float16. A tabela também mostra os tipos de máquinas sugeridos para carregar um modelo através da Vertex AI. O tamanho mínimo (e predefinido) do disco para o Vertex AI é de 100 GB, mas alguns modelos podem exigir um disco maior.

    Nome do modelo Memória necessária Tipo de máquina Memória da VM Disco da VM
    google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
    google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
    google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
    google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

    Carregue o objeto state_dict do modelo para o Cloud Storage através de uma tarefa personalizada do Vertex AI:

    python download_model.py vertex \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --job-name="Load MODEL_NAME" \
        --project="PROJECT_ID" \
        --bucket="BUCKET_NAME" \
        --location="LOCATION" \
        --machine-type="VERTEX_AI_MACHINE_TYPE" \
        --disk-size-gb="DISK_SIZE_GB"
    

    Substitua o seguinte:

    • MODEL_NAME: o nome do modelo, como google/flan-t5-xl.
    • VERTEX_AI_MACHINE_TYPE: o tipo de máquina para executar a tarefa personalizada do Vertex AI, como e2-highmem-4.
    • DISK_SIZE_GB: o tamanho do disco da VM, em GB. O tamanho mínimo é de 100 GB.

    Consoante o tamanho do modelo, o carregamento do mesmo pode demorar alguns minutos. Para ver o estado, aceda à página Tarefas personalizadas do Vertex AI.

    Aceda a Tarefas personalizadas

    Execute o pipeline

    Depois de carregar o modelo, executa o pipeline do Dataflow. Para executar o pipeline, tanto o modelo como a memória usada por cada trabalhador têm de caber na memória.

    A tabela seguinte mostra os tipos de máquinas recomendados para executar um pipeline de inferência.

    Nome do modelo Tipo de máquina Memória da VM
    google/flan-t5-small n2-highmem-2 16 GB
    google/flan-t5-base n2-highmem-2 16 GB
    google/flan-t5-large n2-highmem-4 32 GB
    google/flan-t5-xl n2-highmem-4 32 GB
    google/flan-t5-xxl n2-highmem-8 64 GB
    google/flan-ul2 n2-highmem-16 128 GB

    Execute a pipeline:

    python main.py \
        --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
        --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --runner="DataflowRunner" \
        --project="PROJECT_ID" \
        --temp_location="gs://BUCKET_NAME/temp" \
        --region="REGION" \
        --machine_type="DATAFLOW_MACHINE_TYPE" \
        --requirements_file="requirements.txt" \
        --requirements_cache="skip" \
        --experiments="use_sibling_sdk_workers" \
        --experiments="no_use_multiple_sdk_containers"
    

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto
    • PROMPTS_TOPIC_ID: o ID do tópico para os comandos de entrada a enviar para o modelo
    • RESPONSES_TOPIC_ID: o ID do tópico para as respostas do modelo
    • MODEL_NAME: o nome do modelo, como google/flan-t5-xl
    • BUCKET_NAME: o nome do contentor
    • REGION: a região onde implementar a tarefa, como us-central1
    • DATAFLOW_MACHINE_TYPE: a VM na qual executar o pipeline, como n2-highmem-4

    Para garantir que o modelo é carregado apenas uma vez por worker e não fica sem memória, configure os workers para usarem um único processo definindo a opção de pipeline --experiments=no_use_multiple_sdk_containers. Não tem de limitar o número de threads porque a transformação RunInference partilha o mesmo modelo com vários threads.

    O pipeline neste exemplo é executado com CPUs. Para um modelo maior, é necessário mais tempo para processar cada pedido. Pode ativar as GPUs se precisar de respostas mais rápidas.

    Para ver o estado do pipeline, aceda à página Tarefas do Dataflow.

    Aceda a Empregos

    Faça uma pergunta ao modelo

    Depois de o pipeline começar a ser executado, fornece um comando ao modelo e recebe uma resposta.

    1. Envie o seu comando publicando uma mensagem no Pub/Sub. Use o comando gcloud pubsub topics publish:

      gcloud pubsub topics publish PROMPTS_TOPIC_ID \
          --message="PROMPT_TEXT"
      

      Substitua PROMPT_TEXT por uma string que contenha o comando que quer fornecer. Coloque o comando entre aspas.

      Use o seu próprio comando ou experimente um dos seguintes exemplos:

      • Translate to Spanish: My name is Luka
      • Complete this sentence: Once upon a time, there was a
      • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
    2. Para receber a resposta, use o comando gcloud pubsub subscriptions pull.

      Consoante o tamanho do modelo, este pode demorar alguns minutos a gerar uma resposta. Os modelos maiores demoram mais tempo a implementar e a gerar uma resposta.

      gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
      

      Substitua RESPONSES_SUBSCRIPTION_ID pelo ID da subscrição para as respostas do modelo.

    Limpar

    Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

    Elimine o projeto

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Elimine recursos individuais

    1. Saia do ambiente virtual do Python:

      deactivate
    2. Parar o pipeline:

      1. Indique os IDs das tarefas do Dataflow que estão a ser executadas e, em seguida, tome nota do ID da tarefa do tutorial:

        gcloud dataflow jobs list --region=REGION --status=active
      2. Cancelar a tarefa:

        gcloud dataflow jobs cancel JOB_ID --region=REGION
    3. Eliminar o contentor e tudo o que estiver no mesmo:

      gcloud storage rm gs://BUCKET_NAME --recursive
    4. Elimine os tópicos e a subscrição:

      gcloud pubsub topics delete PROMPTS_TOPIC_ID
      gcloud pubsub topics delete RESPONSES_TOPIC_ID
      gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
    5. Revogue as funções que concedeu à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.admin
      • roles/pubsub.editor
      • roles/aiplatform.user
      gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
    6. Opcional: revogue funções da sua Conta Google.

      gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
    7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

      gcloud auth application-default revoke
    8. Optional: Revoke credentials from the gcloud CLI.

      gcloud auth revoke

    O que se segue?