Executar um LLM em um pipeline de streaming


Neste tutorial, mostramos como executar um modelo de linguagem grande (LLM) em um pipeline de streaming do Dataflow usando a API RunInference do Apache Beam.

Para mais informações sobre a API RunInference, consulte Sobre a ML do Beam (em inglês) na documentação do Apache Beam.

O código está disponível no GitHub.

Objetivos

  • Criar tópicos e assinaturas do Pub/Sub para a entrada e as respostas do modelo.
  • Carregue o modelo no Cloud Storage usando um job personalizado da Vertex AI.
  • Executar o pipeline.
  • Faça uma pergunta ao modelo e receba uma resposta.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

Execute este tutorial em uma máquina que tenha pelo menos 5 GB de espaço livre em disco para instalar as dependências.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  13. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o comando a seguir para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • PROJECT_NUMBER: o número do projeto. Para encontrar o número do projeto, use o comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: cada papel individual.
  17. Copie o ID do projeto do Google Cloud. Você vai precisar desse valor mais adiante neste tutorial.

Criar os recursos do Google Cloud

Esta seção explica como criar os seguintes recursos:

  • Um bucket do Cloud Storage para usar como local de armazenamento temporário
  • Um tópico do Pub/Sub para os prompts do modelo
  • Um tópico e uma assinatura do Pub/Sub para as respostas do modelo

Crie um bucket do Cloud Storage

Crie um bucket do Cloud Storage usando a gcloud CLI. Esse bucket é usado como um local de armazenamento temporário pelo pipeline do Dataflow.

Para criar o bucket, use o comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Substitua:

  • BUCKET_NAME: um nome para o bucket do Cloud Storage que atende aos requisitos de nomenclatura de bucket. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.
  • LOCATION: o local do bucket.

Copie o nome do bucket. Você vai precisar desse valor mais adiante neste tutorial.

Criar tópicos e assinaturas do Pub/Sub

Criar dois tópicos Pub/Sub e uma assinatura. Um deles é para os comandos de entrada que você envia ao modelo. O outro tópico e a assinatura anexada são para as respostas do modelo.

  1. Para criar os tópicos, execute o comando gcloud pubsub topics create duas vezes, uma para cada tópico:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Substitua:

    • PROMPTS_TOPIC_ID: o ID do tópico dos comandos de entrada para enviar ao modelo, como prompts
    • RESPONSES_TOPIC_ID: o ID do tópico para as respostas do modelo, como responses
  2. Para criar a assinatura e anexá-la ao tópico de respostas, use o comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Substitua RESPONSES_SUBSCRIPTION_ID pelo ID da assinatura das respostas do modelo, como responses-subscription.

Copie os IDs dos tópicos e da assinatura. Você precisará desses valores posteriormente neste tutorial.

Prepare o ambiente

Faça o download dos exemplos de código e configure o ambiente para executar o tutorial.

Os exemplos de código no repositório python-docs-samples do GitHub (em inglês) fornecem o código necessário para executar esse pipeline. Quando você estiver pronto para criar seu próprio pipeline, poderá usar esse exemplo de código como modelo.

Você cria um ambiente virtual isolado do Python para executar seu projeto de pipeline usando o venv. Um ambiente virtual permite isolar as dependências de um projeto das dependências de outros projetos. Para mais informações sobre como instalar o Python e criar um ambiente virtual, consulte Como configurar um ambiente de desenvolvimento em Python.

  1. Use o comando git clone para clonar o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Acesse o diretório run-inference:

    cd python-docs-samples/dataflow/run-inference
    
  3. Se você estiver usando um prompt de comando, verifique se tem o Python 3 e o pip em execução no sistema:

    python --version
    python -m pip --version
    

    Se necessário, instale o Python 3.

    Se você estiver usando o Cloud Shell, poderá pular esta etapa porque ele já tem o Python instalado.

  4. Crie um ambiente virtual em Python:

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Instale as dependências:

    pip install -r requirements.txt --no-cache-dir
    

Exemplo de código de carregamento do modelo

O código de carregamento de modelo neste tutorial inicia um job personalizado da Vertex AI que carrega o objeto state_dict do modelo no Cloud Storage.

O arquivo inicial tem a seguinte aparência:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Exemplo de código de pipeline

O código do pipeline neste tutorial implanta um pipeline do Dataflow que faz o seguinte:

  • Lê um prompt do Pub/Sub e codifica o texto em tensores de token.
  • Executa a transformação RunInference.
  • Decodifica os tensores de token de saída em texto e grava a resposta no Pub/Sub.

O arquivo inicial tem a seguinte aparência:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Carregar o modelo

Os LLMs podem ser modelos muito grandes. Modelos maiores treinados com mais parâmetros geralmente oferecem melhores resultados. No entanto, modelos maiores exigem uma máquina maior e mais memória para serem executados. Modelos maiores também podem ser mais lentos para serem executados em CPUs.

Antes de executar um modelo PyTorch no Dataflow, você precisa carregar o objeto state_dict do modelo. O objeto state_dict de um modelo armazena os pesos para o modelo.

Em um pipeline do Dataflow que usa a transformação RunInference do Apache Beam, o objeto state_dict do modelo precisa ser carregado no Cloud Storage. A máquina usada para carregar o objeto state_dict no Cloud Storage precisa ter memória suficiente para carregar o modelo. A máquina também precisa de uma conexão de Internet rápida para fazer o download dos pesos e fazer o upload deles no Cloud Storage.

A tabela a seguir mostra o número de parâmetros para cada modelo e a memória mínima necessária para carregar cada modelo.

Modelo Parâmetros Memória necessária
google/flan-t5-small 80 milhões Mais de 320 MB
google/flan-t5-base 250 milhões >1 GB
google/flan-t5-large 780 milhões >3,2 GB
google/flan-t5-xl 3 bilhões >12 GB
google/flan-t5-xxl 11 bilhões >44 GB
google/flan-ul2 20 bilhões >80 GB

Embora seja possível carregar um modelo menor localmente, este tutorial mostra como iniciar um job personalizado da Vertex AI que carrega o modelo com uma VM de tamanho adequado.

Como os LLMs podem ser muito grandes, o exemplo deste tutorial salva o objeto state_dict no formato float16 em vez do formato float32 padrão. Com essa configuração, cada parâmetro usa 16 bits em vez de 32 bits, tornando o objeto state_dict metade do tamanho. Um tamanho menor minimiza o tempo necessário para carregar o modelo. No entanto, converter o formato significa que a VM precisa ajustar o modelo e o objeto state_dict na memória.

A tabela a seguir mostra os requisitos mínimos para carregar um modelo depois que o objeto state_dict for salvo no formato float16. A tabela também mostra os tipos de máquina sugeridos para carregar um modelo usando a Vertex AI. O tamanho mínimo (e padrão) do disco para a Vertex AI é de 100 GB, mas alguns modelos podem exigir um disco maior.

Nome do modelo Memória necessária Tipo de máquina Memória da VM Disco da VM
google/flan-t5-small Mais de 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base >1,5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large >4,8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl >18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl >66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 >120 GB e2-highmem-16 128 GB 150 GB

Carregue o objeto state_dict do modelo no Cloud Storage usando um job personalizado da Vertex AI:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Substitua:

  • MODEL_NAME: o nome do modelo, como google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: o tipo de máquina em que o job personalizado da Vertex AI será executado, como e2-highmem-4.
  • DISK_SIZE_GB: o tamanho do disco da VM em GB. O tamanho mínimo é 100 GB.

Dependendo do tamanho do modelo, pode levar alguns minutos para carregá-lo. Para conferir o status, acesse a página Jobs personalizados da Vertex AI.

Acessar "Jobs personalizados"

execute o pipeline

Depois de carregar o modelo, você executa o pipeline do Dataflow. Para executar o pipeline, o modelo e a memória usada por cada worker precisam caber na memória.

A tabela a seguir mostra os tipos de máquina recomendados para executar um pipeline de inferência.

Nome do modelo Tipo de máquina Memória da VM
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Execute o canal:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Substitua:

  • PROJECT_ID: o ID do projeto;
  • PROMPTS_TOPIC_ID: o ID do tópico dos comandos de entrada para enviar ao modelo
  • RESPONSES_TOPIC_ID: o ID do tópico para as respostas do modelo
  • MODEL_NAME: o nome do modelo, como google/flan-t5-xl
  • BUCKET_NAME: o nome do bucket
  • REGION: a região em que o job será implantado, como us-central1.
  • DATAFLOW_MACHINE_TYPE: a VM em que o pipeline será executado, como n2-highmem-4.

Para garantir que o modelo seja carregado apenas uma vez por worker e não fique sem memória, configure os workers para usar um único processo definindo a opção --experiments=no_use_multiple_sdk_containers do pipeline. Não é necessário limitar o número de linhas de execução, porque a transformação RunInference compartilha o mesmo modelo com várias linhas de execução.

Neste exemplo, o pipeline é executado com CPUs. Para um modelo maior, é necessário mais tempo para processar cada solicitação. Ative as GPUs se precisar de respostas mais rápidas.

Para visualizar o status do pipeline, acesse a página Jobs do Dataflow.

Acessar "Jobs"

Fazer uma pergunta ao modelo

Depois que a execução do pipeline começar, forneça um prompt ao modelo e receba uma resposta.

  1. Envie seu prompt publicando uma mensagem no Pub/Sub. Use o comando gcloud pubsub topics publish (em inglês).

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Substitua PROMPT_TEXT por uma string que contenha o comando que você quer fornecer. Coloque o comando entre aspas.

    Use seu próprio comando ou teste um dos exemplos a seguir:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Para obter a resposta, use o comando gcloud pubsub subscriptions pull.

    Dependendo do tamanho do modelo, pode levar alguns minutos para ele gerar uma resposta. Modelos maiores levam mais tempo para implantar e gerar uma resposta.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Substitua RESPONSES_SUBSCRIPTION_ID pelo ID da assinatura das respostas do modelo.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Exclua o projeto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Excluir recursos individuais

  1. Saia do ambiente virtual do Python:

    deactivate
  2. Pare o pipeline:

    1. Liste os IDs dos jobs do Dataflow em execução e anote o ID do job do tutorial:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Cancele o job:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Exclua o bucket e qualquer item dentro dele:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Exclua os tópicos e a assinatura:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o comando a seguir para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. Opcional: revogue papéis da sua Conta do Google.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

A seguir