Mengembangkan agen kustom

Template agen di Agent Engine ditentukan sebagai class Python. Langkah-langkah berikut menunjukkan cara membuat template kustom untuk membuat instance agen yang dapat di-deploy di Vertex AI:

  1. Contoh dasar
  2. (Opsional) Streaming respons
  3. (Opsional) Mendaftarkan metode kustom
  4. (Opsional) Berikan anotasi jenis
  5. (Opsional) Pelacakan menggunakan OpenTelemetry
  6. (Opsional) Menggunakan variabel lingkungan
  7. (Opsional) Mengintegrasikan dengan Secret Manager
  8. (Opsional) Menangani kredensial

Contoh dasar

Untuk memberikan contoh dasar, class Python berikut adalah template untuk membuat instance agen yang dapat di-deploy di Vertex AI (Anda dapat memberikan nilai seperti MyAgent ke variabel CLASS_NAME):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Pertimbangan deployment

Saat menulis class Python, tiga metode berikut penting:

  1. __init__():
    • Gunakan metode ini hanya untuk parameter konfigurasi agen. Misalnya, Anda dapat menggunakan metode ini untuk mengumpulkan parameter model dan atribut keamanan sebagai argumen input dari pengguna. Anda juga dapat menggunakan metode ini untuk mengumpulkan parameter seperti project ID, region, kredensial aplikasi, dan kunci API.
    • Konstruktor menampilkan objek yang harus "dapat di-pickle" agar dapat di-deploy ke Agent Engine. Oleh karena itu, Anda harus melakukan inisialisasi klien layanan dan membuat koneksi ke database dalam metode .set_up, bukan metode __init__.
    • Metode ini bersifat opsional. Jika tidak ditentukan, Vertex AI akan menggunakan konstruktor Python default untuk class.
  2. set_up():
    • Anda harus menggunakan metode ini untuk menentukan logika inisialisasi agen. Misalnya, Anda menggunakan metode ini untuk membuat koneksi ke database atau layanan dependen, mengimpor paket dependen, atau melakukan prakomputasi data yang digunakan untuk menayangkan kueri.
    • Metode ini bersifat opsional. Jika tidak ditentukan, Vertex AI akan mengasumsikan bahwa agen tidak perlu memanggil metode .set_up sebelum menayangkan kueri pengguna.
  3. query() / stream_query():
    • Gunakan query() untuk menampilkan respons lengkap sebagai satu hasil.
    • Gunakan stream_query() untuk menampilkan respons dalam potongan saat tersedia, sehingga memungkinkan pengalaman streaming. Metode stream_query harus menampilkan objek yang dapat di-iterasi (misalnya generator) untuk mengaktifkan streaming.
    • Anda dapat menerapkan kedua metode tersebut jika ingin mendukung interaksi streaming dan respons tunggal dengan agen.
    • Anda harus memberikan docstring yang jelas ke metode ini yang menentukan fungsinya, mendokumentasikan atributnya, dan memberikan anotasi jenis untuk inputnya. Hindari argumen variabel dalam metode query dan stream_query.

Membuat instance agen secara lokal

Anda dapat membuat instance lokal agen menggunakan kode berikut:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Menguji metode query

Anda dapat menguji agen dengan mengirim kueri ke instance lokal:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

Responsnya adalah kamus yang mirip dengan berikut ini:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Respons aliran data

Untuk melakukan streaming respons ke kueri, Anda dapat menentukan metode bernama stream_query yang menghasilkan respons. Sebagai contoh, template berikut memperluas contoh dasar untuk melakukan streaming respons dan dapat di-deploy di Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

Berikut beberapa hal penting yang perlu diingat saat menggunakan streaming API:

  • Waktu tunggu maksimum: Waktu tunggu maksimum untuk respons streaming adalah 10 menit. Jika agen Anda memerlukan waktu pemrosesan yang lebih lama, pertimbangkan untuk membagi tugas menjadi beberapa bagian yang lebih kecil.
  • Streaming model dan rantai: Antarmuka Runnable LangChain mendukung streaming, sehingga Anda dapat melakukan streaming respons tidak hanya dari agen, tetapi juga model dan rantai.
  • Kompatibilitas LangChain: Perhatikan bahwa metode asinkron seperti metode astream_event LangChain tidak didukung saat ini.
  • Mengoreksi pembuatan konten: Jika Anda mengalami masalah backpressure (saat produser menghasilkan data lebih cepat daripada yang dapat diproses konsumen), Anda harus mengoreksi kecepatan pembuatan konten. Hal ini dapat membantu mencegah buffer yang meluap dan memastikan pengalaman streaming yang lancar.

Menguji metode stream_query

Anda dapat menguji kueri streaming secara lokal dengan memanggil metode stream_query dan melakukan iterasi pada hasilnya. Berikut contohnya:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Kode ini mencetak setiap bagian respons saat dibuat. Outputnya mungkin terlihat seperti ini:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Dalam contoh ini, setiap bagian berisi informasi yang berbeda tentang respons, seperti tindakan yang dilakukan oleh agen, pesan yang dipertukarkan, dan output akhir.

Mendaftarkan metode kustom

Secara default, metode query dan stream_query didaftarkan sebagai operasi di agen yang di-deploy. Anda dapat mengganti perilaku default dan menentukan kumpulan operasi yang akan didaftarkan menggunakan metode register_operations. Operasi dapat didaftarkan sebagai mode eksekusi standar (diwakili oleh string kosong "") atau streaming ("stream").

Untuk mendaftarkan beberapa operasi, Anda dapat menentukan metode bernama register_operations yang mencantumkan metode yang akan disediakan untuk pengguna saat agen di-deploy. Dalam kode contoh berikut, metode register_operations akan menyebabkan agen yang di-deploy mendaftarkan query dan get_state sebagai operasi yang berjalan secara sinkron, dan stream_query dan get_state_history sebagai operasi yang melakukan streaming respons:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Anda dapat menguji metode kustom dengan memanggilnya langsung di instance lokal agen, mirip dengan cara Anda menguji metode query dan stream_query.

Menyediakan Anotasi Jenis

Anda dapat menggunakan anotasi jenis untuk menentukan jenis input dan output yang diharapkan dari metode agen Anda. Saat agen di-deploy, hanya jenis yang dapat diserialisasi JSON yang didukung dalam input dan output operasi yang didukung oleh agen. Skema input dan output dapat dianotasikan menggunakan model TypedDict atau Pydantic.

Pada contoh berikut, kita menganotasi input sebagai TypedDict, dan mengonversi output mentah dari .get_state (yang merupakan NamedTuple) menjadi kamus yang dapat diserialisasi menggunakan metode ._asdict()-nya:

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Pelacakan menggunakan OpenTelemetry

Untuk mengaktifkan pelacakan dengan library instrumentasi yang mendukung OpenTelemetry, Anda dapat mengimpor dan menginisialisasi library tersebut dalam metode .set_up.

Sebagai contoh, template berikut adalah modifikasi dari contoh dasar untuk mengekspor rekaman aktivitas ke Cloud Trace:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Menggunakan Variabel Lingkungan

Untuk menetapkan variabel lingkungan dengan cara yang berfungsi saat agen di-deploy, Anda akan menetapkannya di dalam metode .set_up, misalnya:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Melakukan integrasi dengan Secret Manager

Untuk berintegrasi dengan Secret Manager:

  1. Instal library klien dengan menjalankan

    pip install google-cloud-secret-manager
  2. Ikuti petunjuk di Memberikan peran untuk agen yang di-deploy untuk memberikan peran "Secret Manager Secret Accessor" (roles/secretmanager.secretAccessor) kepada agen layanan Reasoning Engine melalui konsol Google Cloud.

  3. Impor dan lakukan inisialisasi klien dalam metode .set_up dan dapatkan secret yang sesuai saat diperlukan. Sebagai contoh, template berikut adalah modifikasi dari contoh dasar untuk menggunakan kunci API untuk ChatAnthropic yang disimpan di Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Menangani kredensial

Saat di-deploy, agen mungkin perlu menangani berbagai jenis kredensial:

  1. Kredensial default aplikasi (ADC) yang biasanya berasal dari akun layanan,
  2. OAuth yang biasanya berasal dari akun pengguna, dan
  3. Penyedia identitas untuk kredensial dari akun eksternal (workload identity federation).

Kredensial Default Aplikasi

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Fungsi ini dapat digunakan dalam kode dengan cara berikut:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Untuk mengetahui detailnya, buka Cara kerja Kredensial Default Aplikasi.

OAuth

Kredensial pengguna biasanya diperoleh menggunakan OAuth 2.0.

Jika memiliki token akses (misalnya dari oauthlib), Anda dapat membuat instance google.oauth2.credentials.Credentials. Selain itu, jika Anda mendapatkan token refresh, Anda juga dapat menentukan token refresh dan URI token untuk mengizinkan kredensial dimuat ulang secara otomatis:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Di sini, TOKEN_URI, CLIENT_ID, dan CLIENT_SECRET didasarkan pada Membuat kredensial klien OAuth.

Jika tidak memiliki token akses, Anda dapat menggunakan google_auth_oauthlib.flow untuk melakukan Alur Pemberian Otorisasi OAuth 2.0 untuk mendapatkan instance google.oauth2.credentials.Credentials yang sesuai:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Untuk mengetahui detailnya, buka dokumentasi untuk modul google_auth_oauthlib.flow.

Penyedia Identitas

Jika ingin mengautentikasi pengguna menggunakan email/sandi, nomor telepon, penyedia media sosial seperti Google, Facebook, atau GitHub, atau mekanisme autentikasi kustom, Anda dapat menggunakan Identity Platform atau Firebase Authentication, atau penyedia identitas apa pun yang mendukung OpenID Connect (OIDC).

Untuk mengetahui detailnya, buka Mengakses resource dari penyedia identitas OIDC.

Langkah berikutnya