Exécuter un LLM dans un pipeline de traitement par flux


Ce tutoriel explique comment exécuter un grand modèle de langage (LLM) dans un pipeline Dataflow de traitement en flux continu à l'aide de l'API Apache Beam RunInference.

Pour en savoir plus sur l'API RunInference, consultez la page À propos de Beam ML dans la documentation Apache Beam.

L'exemple de code est disponible sur GitHub.

Objectifs

  • Créer des sujets et des abonnements Pub/Sub pour l'entrée et les réponses du modèle
  • Charger le modèle dans Cloud Storage à l'aide d'un job personnalisé Vertex AI
  • Exécutez le pipeline.
  • Poser une question au modèle et obtenir une réponse

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Pour obtenir une estimation des coûts en fonction de votre utilisation prévue, utilisez le simulateur de coût.

Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Exécutez ce tutoriel sur une machine disposant d'au moins 5 Go d'espace disque disponible pour installer les dépendances.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à gcloud CLI avec votre identité fédérée.

  4. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à gcloud CLI avec votre identité fédérée.

  12. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Attribuez des rôles à votre compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet.
    • PROJECT_NUMBER : votre numéro de projet. Pour trouver votre numéro de projet, utilisez la commande gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE : chaque rôle individuel.
  19. Copiez l'ID du projet Google Cloud . Vous aurez besoin de cette valeur plus loin dans ce tutoriel.
  20. Créer les ressources Google Cloud

    Cette section explique comment créer les ressources suivantes :

    • Un bucket Cloud Storage à utiliser comme emplacement de stockage temporaire
    • Un sujet Pub/Sub pour les requêtes du modèle
    • Un sujet et un abonnement Pub/Sub pour les réponses du modèle

    Créer un bucket Cloud Storage

    Créez un bucket Cloud Storage à l'aide de la gcloud CLI. Ce bucket sert d'emplacement de stockage temporaire pour le pipeline Dataflow.

    Pour créer le bucket, utilisez la commande gcloud storage buckets create :

    gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
    

    Remplacez les éléments suivants :

    Copiez le nom du bucket. Vous aurez besoin de cette valeur plus loin dans ce tutoriel.

    Créer les sujets et les abonnements Pub/Sub

    Créez deux sujets Pub/Sub et un abonnement. Un sujet concerne les requêtes d'entrée que vous envoyez au modèle. L'autre sujet et son abonnement associé concernent les réponses du modèle.

    1. Pour créer les sujets, exécutez la commande gcloud pubsub topics create deux fois, une fois pour chaque sujet :

      gcloud pubsub topics create PROMPTS_TOPIC_ID
      gcloud pubsub topics create RESPONSES_TOPIC_ID
      

      Remplacez les éléments suivants :

      • PROMPTS_TOPIC_ID : ID de sujet des requêtes d'entrée à envoyer au modèle, tel que prompts
      • RESPONSES_TOPIC_ID : ID de sujet pour les réponses du modèle, tel que responses
    2. Pour créer l'abonnement et l'associer à votre sujet de réponses, utilisez la commande gcloud pubsub subscriptions create :

      gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
      

      Remplacez RESPONSES_SUBSCRIPTION_ID par l'ID d'abonnement pour les réponses du modèle, tel que responses-subscription.

    Copiez les ID du sujet et l'ID d'abonnement. Vous aurez besoin de ces valeurs plus loin dans ce tutoriel.

    Préparer votre environnement

    Téléchargez les exemples de code, puis configurez votre environnement pour exécuter le tutoriel.

    Les exemples de code du dépôt GitHub python-docs-samples fournissent le code dont vous avez besoin pour exécuter ce pipeline. Lorsque vous êtes prêt à créer votre propre pipeline, vous pouvez utiliser cet exemple de code comme modèle.

    Vous créez un environnement virtuel Python isolé pour exécuter votre projet de pipeline à l'aide de venv. Un environnement virtuel vous permet d'isoler les dépendances d'un projet des dépendances d'autres projets. Pour plus d'informations sur l'installation de Python et la création d'un environnement virtuel, consultez la page Configurer un environnement de développement Python.

    1. Exécutez la commande git clone pour cloner le dépôt GitHub :

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      
    2. Accédez au répertoire run-inference :

      cd python-docs-samples/dataflow/run-inference
      
    3. Si vous utilisez une invite de commande, vérifiez que Python 3 et pip sont en cours d'exécution sur votre système:

      python --version
      python -m pip --version
      

      Si nécessaire, installez Python 3.

      Si vous utilisez Cloud Shell, vous pouvez ignorer cette étape, car Cloud Shell est déjà installé en Python.

    4. Créez un environnement virtuel Python :

      python -m venv /tmp/env
      source /tmp/env/bin/activate
      
    5. Installez les dépendances :

      pip install -r requirements.txt --no-cache-dir
      

    Exemple de code de chargement de modèle

    Le code de chargement du modèle de ce tutoriel lance un job personnalisé Vertex AI qui charge l'objet state_dict du modèle dans Cloud Storage.

    Le fichier de démarrage se présente comme suit :

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Loads the state_dict for an LLM model into Cloud Storage."""
    
    from __future__ import annotations
    
    import os
    
    import torch
    from transformers import AutoModelForSeq2SeqLM
    
    
    def run_local(model_name: str, state_dict_path: str) -> None:
        """Loads the state dict and saves it into the desired path.
    
        If the `state_dict_path` is a Cloud Storage location starting
        with "gs://", this assumes Cloud Storage is mounted with
        Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        """
        print(f"Loading model: {model_name}")
        model = AutoModelForSeq2SeqLM.from_pretrained(
            model_name, torch_dtype=torch.bfloat16
        )
        print(f"Model loaded, saving state dict to: {state_dict_path}")
    
        # Assume Cloud Storage FUSE is mounted in `/gcs`.
        state_dict_path = state_dict_path.replace("gs://", "/gcs/")
        directory = os.path.dirname(state_dict_path)
        if directory and not os.path.exists(directory):
            os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
        torch.save(model.state_dict(), state_dict_path)
        print("State dict saved successfully!")
    
    
    def run_vertex_job(
        model_name: str,
        state_dict_path: str,
        job_name: str,
        project: str,
        bucket: str,
        location: str = "us-central1",
        machine_type: str = "e2-highmem-2",
        disk_size_gb: int = 100,
    ) -> None:
        """Launches a Vertex AI custom job to load the state dict.
    
        If the model is too large to fit into memory or disk, we can launch
        a Vertex AI custom job with a large enough VM for this to work.
    
        Depending on the model's size, it might require a different VM
        configuration. The model MUST fit into the VM's memory, and there
        must be enough disk space to stage the entire model while it gets
        copied to Cloud Storage.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            job_name: Job display name in the Vertex AI console.
            project: Google Cloud Project ID.
            bucket: Cloud Storage bucket name, without the "gs://" prefix.
            location: Google Cloud regional location.
            machine_type: Machine type for the VM to run the job.
            disk_size_gb: Disk size in GB for the VM to run the job.
        """
        from google.cloud import aiplatform
    
        aiplatform.init(project=project, staging_bucket=bucket, location=location)
    
        job = aiplatform.CustomJob.from_local_script(
            display_name=job_name,
            container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
            script_path="download_model.py",
            args=[
                "local",
                f"--model-name={model_name}",
                f"--state-dict-path={state_dict_path}",
            ],
            machine_type=machine_type,
            boot_disk_size_gb=disk_size_gb,
            requirements=["transformers"],
        )
        job.run()
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        subparsers = parser.add_subparsers(required=True)
    
        parser_local = subparsers.add_parser("local")
        parser_local.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_local.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_local.set_defaults(run=run_local)
    
        parser_vertex = subparsers.add_parser("vertex")
        parser_vertex.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_vertex.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_vertex.add_argument(
            "--job-name", required=True, help="Job display name in the Vertex AI console"
        )
        parser_vertex.add_argument(
            "--project", required=True, help="Google Cloud Project ID"
        )
        parser_vertex.add_argument(
            "--bucket",
            required=True,
            help='Cloud Storage bucket name, without the "gs://" prefix',
        )
        parser_vertex.add_argument(
            "--location", default="us-central1", help="Google Cloud regional location"
        )
        parser_vertex.add_argument(
            "--machine-type",
            default="e2-highmem-2",
            help="Machine type for the VM to run the job",
        )
        parser_vertex.add_argument(
            "--disk-size-gb",
            type=int,
            default=100,
            help="Disk size in GB for the VM to run the job",
        )
        parser_vertex.set_defaults(run=run_vertex_job)
    
        args = parser.parse_args()
        kwargs = args.__dict__.copy()
        kwargs.pop("run")
    
        args.run(**kwargs)
    

    Exemple de code de pipeline

    Le code de ce tutoriel déploie un pipeline Dataflow qui effectue les opérations suivantes :

    • Lit une requête Pub/Sub et encode le texte dans des Tensors de jetons.
    • Exécute la transformation RunInference.
    • Décode les Tensors des jetons de sortie en texte et écrit la réponse dans Pub/Sub.

    Le fichier de démarrage se présente comme suit :

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Runs a streaming RunInference Language Model pipeline."""
    
    from __future__ import annotations
    
    import logging
    
    import apache_beam as beam
    from apache_beam.ml.inference.base import PredictionResult
    from apache_beam.ml.inference.base import RunInference
    from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
    from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
    from apache_beam.options.pipeline_options import PipelineOptions
    import torch
    from transformers import AutoConfig
    from transformers import AutoModelForSeq2SeqLM
    from transformers import AutoTokenizer
    from transformers.tokenization_utils import PreTrainedTokenizer
    
    MAX_RESPONSE_TOKENS = 256
    
    
    def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
        """Encodes input text into token tensors.
    
        Args:
            input_text: Input text for the language model.
            tokenizer: Tokenizer for the language model.
    
        Returns: Tokenized input tokens.
        """
        return tokenizer(input_text, return_tensors="pt").input_ids[0]
    
    
    def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
        """Decodes output token tensors into text.
    
        Args:
            result: Prediction results from the RunInference transform.
            tokenizer: Tokenizer for the language model.
    
        Returns: The model's response as text.
        """
        output_tokens = result.inference
        return tokenizer.decode(output_tokens, skip_special_tokens=True)
    
    
    class AskModel(beam.PTransform):
        """Asks an language model a prompt message and gets its responses.
    
        Attributes:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            max_response_tokens: Maximum number of tokens for the model to generate.
        """
    
        def __init__(
            self,
            model_name: str,
            state_dict_path: str,
            max_response_tokens: int = MAX_RESPONSE_TOKENS,
        ) -> None:
            self.model_handler = PytorchModelHandlerTensor(
                state_dict_path=state_dict_path,
                model_class=AutoModelForSeq2SeqLM.from_config,
                model_params={"config": AutoConfig.from_pretrained(model_name)},
                inference_fn=make_tensor_model_fn("generate"),
            )
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.max_response_tokens = max_response_tokens
    
        def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
            return (
                pcollection
                | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
                | "RunInference"
                >> RunInference(
                    self.model_handler,
                    inference_args={"max_new_tokens": self.max_response_tokens},
                )
                | "Get response" >> beam.Map(decode_response, self.tokenizer)
            )
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--messages-topic",
            required=True,
            help="Pub/Sub topic for input text messages",
        )
        parser.add_argument(
            "--responses-topic",
            required=True,
            help="Pub/Sub topic for output text responses",
        )
        parser.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        args, beam_args = parser.parse_known_args()
    
        logging.getLogger().setLevel(logging.INFO)
        beam_options = PipelineOptions(
            beam_args,
            pickle_library="cloudpickle",
            streaming=True,
        )
    
        simple_name = args.model_name.split("/")[-1]
        pipeline = beam.Pipeline(options=beam_options)
        _ = (
            pipeline
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
            | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
            | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
            | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
            | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
        )
        pipeline.run()
    

    Charger le modèle

    Les LLM peuvent être des modèles très volumineux. Les modèles très volumineux entraînés avec davantage de paramètres donnent généralement de meilleurs résultats. Cependant, les modèles très volumineux nécessitent une machine de plus grande taille et plus de mémoire pour s'exécuter. Les modèles très volumineux peuvent également être plus lents à exécuter sur les processeurs.

    Avant d'exécuter un modèle PyTorch sur Dataflow, vous devez charger l'objet state_dict du modèle. L'objet state_dict d'un modèle stocke les poids du modèle.

    Dans un pipeline Dataflow qui utilise la transformation Apache Beam RunInference, l'objet state_dict du modèle doit être chargé dans Cloud Storage. La machine que vous utilisez pour charger l'objet state_dict dans Cloud Storage doit disposer de suffisamment de mémoire pour charger le modèle. La machine doit également disposer d'une connexion Internet rapide pour télécharger les pondérations et les importer dans Cloud Storage.

    Le tableau suivant indique le nombre de paramètres pour chaque modèle et la mémoire minimale nécessaire pour charger chaque modèle.

    Modèle Paramètres Mémoire requise
    google/flan-t5-small 80 millions > 320 Mo
    google/flan-t5-base 250 millions > 1 Go
    google/flan-t5-large 780 millions > 3,2 Go
    google/flan-t5-xl 3 milliards > 12 Go
    google/flan-t5-xxl 11 milliards > 44 Go
    google/flan-ul2 20 milliards > 80 Go

    Bien que vous puissiez charger un modèle plus petit localement, ce tutoriel explique comment lancer un job personnalisé Vertex AI qui charge le modèle avec une VM de taille appropriée.

    Comme les LLM peuvent être très volumineux, l'exemple de ce tutoriel enregistre l'objet state_dict au format float16 au lieu du format float32 par défaut. Avec cette configuration, chaque paramètre utilise 16 bits au lieu de 32 bits, ce qui réduit la taille de l'objet state_dict. Une plus petite taille réduit le temps nécessaire au chargement du modèle. Toutefois, la conversion du format signifie que la VM doit tenir en mémoire le modèle et l'objet state_dict.

    Le tableau suivant présente les exigences minimales de chargement d'un modèle une fois l'objet state_dict enregistré au format float16. Le tableau indique également les types de machines suggérés pour charger un modèle à l'aide de Vertex AI. La taille de disque minimale (et par défaut) pour Vertex AI est de 100 Go, mais certains modèles peuvent nécessiter un disque plus volumineux.

    Nom du modèle Mémoire requise Type de machine Mémoire de la VM Disque de la VM
    google/flan-t5-small > 480 Mo e2-standard-4 16 Go 100 Go
    google/flan-t5-base > 1,5 Go e2-standard-4 16 Go 100 Go
    google/flan-t5-large > 4,8 Go e2-standard-4 16 Go 100 Go
    google/flan-t5-xl > 18 Go e2-highmem-4 32 Go 100 Go
    google/flan-t5-xxl > 66 Go e2-highmem-16 128 Go 100 Go
    google/flan-ul2 > 120 Go e2-highmem-16 128 Go 150 Go

    Chargez l'objet state_dict du modèle dans Cloud Storage à l'aide d'un job personnalisé Vertex AI :

    python download_model.py vertex \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --job-name="Load MODEL_NAME" \
        --project="PROJECT_ID" \
        --bucket="BUCKET_NAME" \
        --location="LOCATION" \
        --machine-type="VERTEX_AI_MACHINE_TYPE" \
        --disk-size-gb="DISK_SIZE_GB"
    

    Remplacez les éléments suivants :

    • MODEL_NAME : nom du modèle, tel que google/flan-t5-xl.
    • VERTEX_AI_MACHINE_TYPE : type de machine sur lequel exécuter le job personnalisé Vertex AI, par exemple e2-highmem-4.
    • DISK_SIZE_GB : taille du disque de la VM, en Go La taille minimale est de 100 Go.

    Selon la taille du modèle, le chargement du modèle peut prendre quelques minutes. Pour afficher l'état, accédez à la page Jobs personnalisés de Vertex AI.

    Accéder à la page Tâches personnalisées

    Exécuter le pipeline

    Après avoir chargé le modèle, vous exécutez le pipeline Dataflow. Pour exécuter le pipeline, le modèle et la mémoire utilisés par chaque nœud de calcul doivent tenir dans la mémoire.

    Le tableau suivant indique les types de machines recommandés pour exécuter un pipeline d'inférence.

    Nom du modèle Type de machine Mémoire de la VM
    google/flan-t5-small n2-highmem-2 16 Go
    google/flan-t5-base n2-highmem-2 16 Go
    google/flan-t5-large n2-highmem-4 32 Go
    google/flan-t5-xl n2-highmem-4 32 Go
    google/flan-t5-xxl n2-highmem-8 64 Go
    google/flan-ul2 n2-highmem-16 128 Go

    Exécutez le pipeline :

    python main.py \
        --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
        --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --runner="DataflowRunner" \
        --project="PROJECT_ID" \
        --temp_location="gs://BUCKET_NAME/temp" \
        --region="REGION" \
        --machine_type="DATAFLOW_MACHINE_TYPE" \
        --requirements_file="requirements.txt" \
        --requirements_cache="skip" \
        --experiments="use_sibling_sdk_workers" \
        --experiments="no_use_multiple_sdk_containers"
    

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet
    • PROMPTS_TOPIC_ID : ID du sujet que les requêtes d'entrée doivent envoyer au modèle
    • RESPONSES_TOPIC_ID : ID du sujet pour les réponses du modèle
    • MODEL_NAME : nom du modèle, tel que google/flan-t5-xl
    • BUCKET_NAME : nom du bucket
    • REGION : région dans laquelle déployer la tâche, par exemple us-central1
    • DATAFLOW_MACHINE_TYPE : VM sur laquelle exécuter le pipeline, par exemple n2-highmem-4

    Pour vous assurer que le modèle n'est chargé qu'une fois par nœud de calcul et ne manque pas de mémoire, configurez les nœuds de calcul de manière à utiliser un seul processus en définissant l'option de pipeline --experiments=no_use_multiple_sdk_containers. Vous n'avez pas besoin de limiter le nombre de threads, car la transformation RunInference partage le même modèle avec plusieurs threads.

    Dans cet exemple, le pipeline est exécuté avec des processeurs. Pour un modèle plus grand, le traitement de chaque requête prend plus de temps. Vous pouvez activer les GPU si vous avez besoin de réponses plus rapides.

    Pour afficher l'état du pipeline, accédez à la page Jobs de Dataflow.

    Accédez aux tâches

    Poser une question au modèle

    Une fois le pipeline démarré, vous fournissez une requête au modèle et recevez une réponse.

    1. Envoyez votre requête en publiant un message dans Pub/Sub. Exécutez la commande gcloud pubsub topics publish :

      gcloud pubsub topics publish PROMPTS_TOPIC_ID \
          --message="PROMPT_TEXT"
      

      Remplacez PROMPT_TEXT par une chaîne contenant la requête que vous souhaitez fournir. Placez la requête entre guillemets.

      Utilisez votre propre requête ou essayez l'un des exemples suivants :

      • Translate to Spanish: My name is Luka
      • Complete this sentence: Once upon a time, there was a
      • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
    2. Pour obtenir la réponse, utilisez la commande gcloud pubsub subscriptions pull.

      Selon la taille du modèle, la génération d'une réponse peut prendre quelques minutes. Les modèles plus volumineux mettent plus de temps à être déployés et à générer une réponse.

      gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
      

      Remplacez RESPONSES_SUBSCRIPTION_ID par l'ID d'abonnement pour les réponses du modèle.

    Effectuer un nettoyage

    Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

    Supprimer le projet

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Supprimer des ressources individuelles

    1. Quittez l'environnement virtuel Python :

      deactivate
    2. Arrêtez le pipeline :

      1. Répertoriez les ID des jobs Dataflow en cours d'exécution, puis notez l'ID de job pour le job du tutoriel :

        gcloud dataflow jobs list --region=REGION --status=active
      2. Annulez le job :

        gcloud dataflow jobs cancel JOB_ID --region=REGION
    3. Supprimez le bucket et tout ce qu'il contient :

      gcloud storage rm gs://BUCKET_NAME --recursive
    4. Supprimez les sujets et l'abonnement :

      gcloud pubsub topics delete PROMPTS_TOPIC_ID
      gcloud pubsub topics delete RESPONSES_TOPIC_ID
      gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
    5. Révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.admin
      • roles/pubsub.editor
      • roles/aiplatform.user
      gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
    6. Facultatif : Révoquez les rôles de votre compte Google.

      gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
    7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

      gcloud auth application-default revoke
    8. Optional: Revoke credentials from the gcloud CLI.

      gcloud auth revoke

    Étapes suivantes