Menjalankan LLM di pipeline streaming


Tutorial ini menunjukkan cara menjalankan model bahasa besar (LLM) dalam pipeline Dataflow streaming menggunakan Apache Beam RunInference API.

Untuk mengetahui informasi selengkapnya tentang RunInference API, lihat Tentang Beam ML dalam dokumentasi Apache Beam.

Contoh kode tersedia di GitHub.

Tujuan

  • Buat topik dan langganan Pub/Sub untuk input dan respons model.
  • Muat model ke Cloud Storage menggunakan tugas kustom Vertex AI.
  • Jalankan pipeline.
  • Ajukan pertanyaan kepada model dan dapatkan respons.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

Jalankan tutorial ini di komputer yang memiliki ruang disk kosong minimal 5 GB untuk menginstal dependensi.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  4. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  12. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Berikan peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@" --role=SERVICE_ACCOUNT_ROLE

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • PROJECT_NUMBER: nomor project Anda. Untuk menemukan nomor project Anda, gunakan perintah gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: setiap peran individual.
  19. Salin Google Cloud project ID. Anda akan memerlukan nilai ini nanti dalam tutorial ini.
  20. Buat resource Google Cloud

    Bagian ini menjelaskan cara membuat resource berikut:

    • Bucket Cloud Storage yang akan digunakan sebagai lokasi penyimpanan sementara
    • Topik Pub/Sub untuk perintah model
    • Topik dan langganan Pub/Sub untuk respons model

    Membuat bucket Cloud Storage

    Buat bucket Cloud Storage menggunakan gcloud CLI. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.

    Untuk membuat bucket, gunakan perintah gcloud storage buckets create:

    gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
    

    Ganti kode berikut:

    • BUCKET_NAME: nama untuk bucket Cloud Storage Anda yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
    • LOCATION: lokasi bucket.

    Salin nama bucket. Anda akan memerlukan nilai ini nanti dalam tutorial ini.

    Membuat topik dan langganan Pub/Sub

    Buat dua topik Pub/Sub dan satu langganan. Satu topik adalah untuk perintah input yang Anda kirim ke model. Topik lainnya dan langganan terlampirnya adalah untuk respons model.

    1. Untuk membuat topik, jalankan perintah gcloud pubsub topics create dua kali, sekali untuk setiap topik:

      gcloud pubsub topics create PROMPTS_TOPIC_ID
      gcloud pubsub topics create RESPONSES_TOPIC_ID
      

      Ganti kode berikut:

      • PROMPTS_TOPIC_ID: ID topik untuk perintah input yang akan dikirim ke model, seperti prompts
      • RESPONSES_TOPIC_ID: ID topik untuk respons model, seperti responses
    2. Untuk membuat langganan dan melampirkannya ke topik respons, gunakan perintah gcloud pubsub subscriptions create:

      gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
      

      Ganti RESPONSES_SUBSCRIPTION_ID dengan ID langganan untuk respons model, seperti responses-subscription.

    Salin ID topik dan ID langganan. Anda akan memerlukan nilai ini nanti dalam tutorial ini.

    Menyiapkan lingkungan Anda

    Download contoh kode, lalu siapkan lingkungan Anda untuk menjalankan tutorial.

    Contoh kode di repositori GitHub python-docs-samples menyediakan kode yang Anda butuhkan untuk menjalankan pipeline ini. Saat siap membuat pipeline sendiri, Anda dapat menggunakan kode contoh ini sebagai template.

    Anda membuat lingkungan virtual Python yang terisolasi untuk menjalankan project pipeline dengan menggunakan venv. Lingkungan virtual memungkinkan Anda mengisolasi dependensi satu project dari dependensi project lainnya. Untuk mengetahui informasi selengkapnya tentang cara menginstal Python dan membuat lingkungan virtual, lihat Menyiapkan lingkungan pengembangan Python.

    1. Gunakan perintah git clone untuk meng-clone repositori GitHub:

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      
    2. Buka direktori run-inference:

      cd python-docs-samples/dataflow/run-inference
      
    3. Jika Anda menggunakan command prompt, pastikan Anda telah menjalankan Python 3 dan pip di sistem Anda:

      python --version
      python -m pip --version
      

      Jika diperlukan, instal Python 3.

      Jika menggunakan Cloud Shell, Anda dapat melewati langkah ini karena Cloud Shell sudah menginstal Python.

    4. Buat lingkungan virtual Python:

      python -m venv /tmp/env
      source /tmp/env/bin/activate
      
    5. Instal dependensinya:

      pip install -r requirements.txt --no-cache-dir
      

    Contoh kode pemuatan model

    Kode pemuatan model dalam tutorial ini meluncurkan tugas kustom Vertex AI yang memuat objek state_dict model ke Cloud Storage.

    File awal akan terlihat seperti berikut:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Loads the state_dict for an LLM model into Cloud Storage."""
    
    from __future__ import annotations
    
    import os
    
    import torch
    from transformers import AutoModelForSeq2SeqLM
    
    
    def run_local(model_name: str, state_dict_path: str) -> None:
        """Loads the state dict and saves it into the desired path.
    
        If the `state_dict_path` is a Cloud Storage location starting
        with "gs://", this assumes Cloud Storage is mounted with
        Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        """
        print(f"Loading model: {model_name}")
        model = AutoModelForSeq2SeqLM.from_pretrained(
            model_name, torch_dtype=torch.bfloat16
        )
        print(f"Model loaded, saving state dict to: {state_dict_path}")
    
        # Assume Cloud Storage FUSE is mounted in `/gcs`.
        state_dict_path = state_dict_path.replace("gs://", "/gcs/")
        directory = os.path.dirname(state_dict_path)
        if directory and not os.path.exists(directory):
            os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
        torch.save(model.state_dict(), state_dict_path)
        print("State dict saved successfully!")
    
    
    def run_vertex_job(
        model_name: str,
        state_dict_path: str,
        job_name: str,
        project: str,
        bucket: str,
        location: str = "us-central1",
        machine_type: str = "e2-highmem-2",
        disk_size_gb: int = 100,
    ) -> None:
        """Launches a Vertex AI custom job to load the state dict.
    
        If the model is too large to fit into memory or disk, we can launch
        a Vertex AI custom job with a large enough VM for this to work.
    
        Depending on the model's size, it might require a different VM
        configuration. The model MUST fit into the VM's memory, and there
        must be enough disk space to stage the entire model while it gets
        copied to Cloud Storage.
    
        Args:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            job_name: Job display name in the Vertex AI console.
            project: Google Cloud Project ID.
            bucket: Cloud Storage bucket name, without the "gs://" prefix.
            location: Google Cloud regional location.
            machine_type: Machine type for the VM to run the job.
            disk_size_gb: Disk size in GB for the VM to run the job.
        """
        from google.cloud import aiplatform
    
        aiplatform.init(project=project, staging_bucket=bucket, location=location)
    
        job = aiplatform.CustomJob.from_local_script(
            display_name=job_name,
            container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
            script_path="download_model.py",
            args=[
                "local",
                f"--model-name={model_name}",
                f"--state-dict-path={state_dict_path}",
            ],
            machine_type=machine_type,
            boot_disk_size_gb=disk_size_gb,
            requirements=["transformers"],
        )
        job.run()
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        subparsers = parser.add_subparsers(required=True)
    
        parser_local = subparsers.add_parser("local")
        parser_local.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_local.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_local.set_defaults(run=run_local)
    
        parser_vertex = subparsers.add_parser("vertex")
        parser_vertex.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser_vertex.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        parser_vertex.add_argument(
            "--job-name", required=True, help="Job display name in the Vertex AI console"
        )
        parser_vertex.add_argument(
            "--project", required=True, help="Google Cloud Project ID"
        )
        parser_vertex.add_argument(
            "--bucket",
            required=True,
            help='Cloud Storage bucket name, without the "gs://" prefix',
        )
        parser_vertex.add_argument(
            "--location", default="us-central1", help="Google Cloud regional location"
        )
        parser_vertex.add_argument(
            "--machine-type",
            default="e2-highmem-2",
            help="Machine type for the VM to run the job",
        )
        parser_vertex.add_argument(
            "--disk-size-gb",
            type=int,
            default=100,
            help="Disk size in GB for the VM to run the job",
        )
        parser_vertex.set_defaults(run=run_vertex_job)
    
        args = parser.parse_args()
        kwargs = args.__dict__.copy()
        kwargs.pop("run")
    
        args.run(**kwargs)
    

    Contoh kode pipeline

    Kode pipeline dalam tutorial ini men-deploy pipeline Dataflow yang melakukan hal-hal berikut:

    • Membaca perintah dari Pub/Sub dan mengenkode teks menjadi tensor token.
    • Menjalankan transformasi RunInference.
    • Mendekode tensor token output menjadi teks dan menulis respons ke Pub/Sub.

    File awal akan terlihat seperti berikut:

    # Copyright 2023 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Runs a streaming RunInference Language Model pipeline."""
    
    from __future__ import annotations
    
    import logging
    
    import apache_beam as beam
    from apache_beam.ml.inference.base import PredictionResult
    from apache_beam.ml.inference.base import RunInference
    from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
    from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
    from apache_beam.options.pipeline_options import PipelineOptions
    import torch
    from transformers import AutoConfig
    from transformers import AutoModelForSeq2SeqLM
    from transformers import AutoTokenizer
    from transformers.tokenization_utils import PreTrainedTokenizer
    
    MAX_RESPONSE_TOKENS = 256
    
    
    def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
        """Encodes input text into token tensors.
    
        Args:
            input_text: Input text for the language model.
            tokenizer: Tokenizer for the language model.
    
        Returns: Tokenized input tokens.
        """
        return tokenizer(input_text, return_tensors="pt").input_ids[0]
    
    
    def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
        """Decodes output token tensors into text.
    
        Args:
            result: Prediction results from the RunInference transform.
            tokenizer: Tokenizer for the language model.
    
        Returns: The model's response as text.
        """
        output_tokens = result.inference
        return tokenizer.decode(output_tokens, skip_special_tokens=True)
    
    
    class AskModel(beam.PTransform):
        """Asks an language model a prompt message and gets its responses.
    
        Attributes:
            model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
            state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
            max_response_tokens: Maximum number of tokens for the model to generate.
        """
    
        def __init__(
            self,
            model_name: str,
            state_dict_path: str,
            max_response_tokens: int = MAX_RESPONSE_TOKENS,
        ) -> None:
            self.model_handler = PytorchModelHandlerTensor(
                state_dict_path=state_dict_path,
                model_class=AutoModelForSeq2SeqLM.from_config,
                model_params={"config": AutoConfig.from_pretrained(model_name)},
                inference_fn=make_tensor_model_fn("generate"),
            )
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.max_response_tokens = max_response_tokens
    
        def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
            return (
                pcollection
                | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
                | "RunInference"
                >> RunInference(
                    self.model_handler,
                    inference_args={"max_new_tokens": self.max_response_tokens},
                )
                | "Get response" >> beam.Map(decode_response, self.tokenizer)
            )
    
    
    if __name__ == "__main__":
        import argparse
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--messages-topic",
            required=True,
            help="Pub/Sub topic for input text messages",
        )
        parser.add_argument(
            "--responses-topic",
            required=True,
            help="Pub/Sub topic for output text responses",
        )
        parser.add_argument(
            "--model-name",
            required=True,
            help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
        )
        parser.add_argument(
            "--state-dict-path",
            required=True,
            help="File path to the model's state_dict, can be in Cloud Storage",
        )
        args, beam_args = parser.parse_known_args()
    
        logging.getLogger().setLevel(logging.INFO)
        beam_options = PipelineOptions(
            beam_args,
            pickle_library="cloudpickle",
            streaming=True,
        )
    
        simple_name = args.model_name.split("/")[-1]
        pipeline = beam.Pipeline(options=beam_options)
        _ = (
            pipeline
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
            | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
            | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
            | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
            | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
        )
        pipeline.run()
    

    Memuat model

    LLM dapat berupa model yang sangat besar. Model yang lebih besar yang dilatih dengan lebih banyak parameter umumnya memberikan hasil yang lebih baik. Namun, model yang lebih besar memerlukan mesin yang lebih besar dan lebih banyak memori untuk dijalankan. Model yang lebih besar juga dapat berjalan lebih lambat di CPU.

    Sebelum menjalankan model PyTorch di Dataflow, Anda perlu memuat objek state_dict model. Objek state_dict model menyimpan bobot untuk model.

    Dalam pipeline Dataflow yang menggunakan transformasi RunInference Apache Beam, objek state_dict model harus dimuat ke Cloud Storage. Mesin yang Anda gunakan untuk memuat objekstate_dict ke Cloud Storage harus memiliki memori yang cukup untuk memuat model. Mesin juga memerlukan koneksi internet yang cepat untuk mendownload bobot dan menguploadnya ke Cloud Storage.

    Tabel berikut menunjukkan jumlah parameter untuk setiap model dan memori minimum yang diperlukan untuk memuat setiap model.

    Model Parameter Memori yang diperlukan
    google/flan-t5-small 80 juta > 320 MB
    google/flan-t5-base 250 juta > 1 GB
    google/flan-t5-large 780 juta > 3,2 GB
    google/flan-t5-xl 3 miliar > 12 GB
    google/flan-t5-xxl 11 miliar > 44 GB
    google/flan-ul2 20 miliar > 80 GB

    Meskipun Anda dapat memuat model yang lebih kecil secara lokal, tutorial ini menunjukkan cara meluncurkan tugas kustom Vertex AI yang memuat model dengan VM berukuran sesuai.

    Karena LLM bisa sangat besar, contoh dalam tutorial ini menyimpan objek state_dict sebagai format float16, bukan format float32 default. Dengan konfigurasi ini, setiap parameter menggunakan 16 bit, bukan 32 bit, sehingga ukuran objek state_dict menjadi setengahnya. Ukuran yang lebih kecil meminimalkan waktu yang diperlukan untuk memuat model. Namun, mengonversi format berarti VM harus memuat model dan objek state_dict ke dalam memori.

    Tabel berikut menunjukkan persyaratan minimum untuk memuat model setelah objek state_dict disimpan sebagai format float16. Tabel ini juga menunjukkan jenis mesin yang disarankan untuk memuat model menggunakan Vertex AI. Ukuran disk minimum (dan default) untuk Vertex AI adalah 100 GB, tetapi beberapa model mungkin memerlukan disk yang lebih besar.

    Nama model Memori yang diperlukan Jenis mesin Memori VM Disk VM
    google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
    google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
    google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
    google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
    google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

    Muat objek state_dict model ke Cloud Storage menggunakan tugas kustom Vertex AI:

    python download_model.py vertex \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --job-name="Load MODEL_NAME" \
        --project="PROJECT_ID" \
        --bucket="BUCKET_NAME" \
        --location="LOCATION" \
        --machine-type="VERTEX_AI_MACHINE_TYPE" \
        --disk-size-gb="DISK_SIZE_GB"
    

    Ganti kode berikut:

    • MODEL_NAME: nama model, seperti google/flan-t5-xl.
    • VERTEX_AI_MACHINE_TYPE: jenis mesin untuk menjalankan tugas kustom Vertex AI, seperti e2-highmem-4.
    • DISK_SIZE_GB: ukuran disk untuk VM, dalam GB. Ukuran minimumnya adalah 100 GB.

    Bergantung pada ukuran model, mungkin perlu waktu beberapa menit untuk memuat model. Untuk melihat statusnya, buka halaman Custom jobs Vertex AI.

    Buka Custom jobs

    Menjalankan pipeline

    Setelah memuat model, Anda menjalankan pipeline Dataflow. Untuk menjalankan pipeline, model dan memori yang digunakan oleh setiap pekerja harus sesuai dengan memori.

    Tabel berikut menunjukkan jenis mesin yang direkomendasikan untuk menjalankan pipeline inferensi.

    Nama model Jenis mesin Memori VM
    google/flan-t5-small n2-highmem-2 16 GB
    google/flan-t5-base n2-highmem-2 16 GB
    google/flan-t5-large n2-highmem-4 32 GB
    google/flan-t5-xl n2-highmem-4 32 GB
    google/flan-t5-xxl n2-highmem-8 64 GB
    google/flan-ul2 n2-highmem-16 128 GB

    Menjalankan pipeline:

    python main.py \
        --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
        --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
        --model-name="MODEL_NAME" \
        --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
        --runner="DataflowRunner" \
        --project="PROJECT_ID" \
        --temp_location="gs://BUCKET_NAME/temp" \
        --region="REGION" \
        --machine_type="DATAFLOW_MACHINE_TYPE" \
        --requirements_file="requirements.txt" \
        --requirements_cache="skip" \
        --experiments="use_sibling_sdk_workers" \
        --experiments="no_use_multiple_sdk_containers"
    

    Ganti kode berikut:

    • PROJECT_ID: the project ID
    • PROMPTS_TOPIC_ID: ID topik untuk perintah input yang akan dikirim ke model
    • RESPONSES_TOPIC_ID: ID topik untuk respons model
    • MODEL_NAME: nama model, seperti google/flan-t5-xl
    • BUCKET_NAME: nama bucket
    • REGION: region tempat tugas di-deploy, seperti us-central1
    • DATAFLOW_MACHINE_TYPE: VM untuk menjalankan pipeline di, seperti n2-highmem-4

    Untuk memastikan model dimuat hanya sekali per worker dan tidak kehabisan memori, Anda mengonfigurasi worker untuk menggunakan satu proses dengan menyetel opsi pipeline --experiments=no_use_multiple_sdk_containers. Anda tidak perlu membatasi jumlah thread karena transformasi RunInference menggunakan model yang sama dengan beberapa thread.

    Pipeline dalam contoh ini berjalan dengan CPU. Untuk model yang lebih besar, diperlukan lebih banyak waktu untuk memproses setiap permintaan. Anda dapat mengaktifkan GPU jika memerlukan respons yang lebih cepat.

    Untuk melihat status pipeline, buka halaman Jobs Dataflow.

    Buka Tugas

    Mengajukan pertanyaan kepada model

    Setelah pipeline mulai berjalan, Anda memberikan perintah ke model dan menerima respons.

    1. Kirim perintah Anda dengan memublikasikan pesan ke Pub/Sub. Gunakan perintah gcloud pubsub topics publish:

      gcloud pubsub topics publish PROMPTS_TOPIC_ID \
          --message="PROMPT_TEXT"
      

      Ganti PROMPT_TEXT dengan string yang berisi perintah yang ingin Anda berikan. Apit perintah dengan tanda kutip.

      Gunakan perintah Anda sendiri, atau coba salah satu contoh berikut:

      • Translate to Spanish: My name is Luka
      • Complete this sentence: Once upon a time, there was a
      • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
    2. Untuk mendapatkan respons, gunakan perintah gcloud pubsub subscriptions pull.

      Bergantung pada ukuran model, model mungkin memerlukan waktu beberapa menit untuk membuat respons. Model yang lebih besar memerlukan waktu lebih lama untuk di-deploy dan untuk menghasilkan respons.

      gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
      

      Ganti RESPONSES_SUBSCRIPTION_ID dengan ID langganan untuk respons model.

    Pembersihan

    Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

    Menghapus project

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Menghapus resource satu per satu

    1. Keluar dari lingkungan virtual Python:

      deactivate
    2. Hentikan pipeline:

      1. Mencantumkan ID tugas untuk tugas Dataflow yang sedang berjalan, lalu catat ID tugas untuk tugas tutorial:

        gcloud dataflow jobs list --region=REGION --status=active
      2. Membatalkan tugas:

        gcloud dataflow jobs cancel JOB_ID --region=REGION
    3. Hapus bucket dan semua yang ada di dalamnya:

      gcloud storage rm gs://BUCKET_NAME --recursive
    4. Hapus topik dan langganan:

      gcloud pubsub topics delete PROMPTS_TOPIC_ID
      gcloud pubsub topics delete RESPONSES_TOPIC_ID
      gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
    5. Batalkan peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.admin
      • roles/pubsub.editor
      • roles/aiplatform.user
      gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@ --role=SERVICE_ACCOUNT_ROLE
    6. Opsional: Cabut peran dari Akun Google Anda.

      gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
    7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

      gcloud auth application-default revoke
    8. Optional: Revoke credentials from the gcloud CLI.

      gcloud auth revoke

    Langkah berikutnya