Template agen di Agent Engine ditentukan sebagai class Python. Langkah-langkah berikut menunjukkan cara membuat template kustom untuk membuat instance agen yang dapat di-deploy di Vertex AI:
- Contoh dasar
- (Opsional) Streaming respons
- (Opsional) Mendaftarkan metode kustom
- (Opsional) Berikan anotasi jenis
- (Opsional) Pelacakan menggunakan OpenTelemetry
- (Opsional) Menggunakan variabel lingkungan
- (Opsional) Mengintegrasikan dengan Secret Manager
- (Opsional) Menangani kredensial
Contoh dasar
Untuk memberikan contoh dasar, class Python berikut adalah template untuk membuat instance agen yang dapat di-deploy di Vertex AI (Anda dapat memberikan nilai seperti MyAgent
ke variabel CLASS_NAME
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Pertimbangan deployment
Saat menulis class Python, tiga metode berikut penting:
__init__()
:- Gunakan metode ini hanya untuk parameter konfigurasi agen. Misalnya, Anda dapat menggunakan metode ini untuk mengumpulkan parameter model dan atribut keamanan sebagai argumen input dari pengguna. Anda juga dapat menggunakan metode ini untuk mengumpulkan parameter seperti project ID, region, kredensial aplikasi, dan kunci API.
- Konstruktor menampilkan objek yang harus "dapat di-pickle" agar dapat di-deploy ke Agent Engine. Oleh karena itu, Anda harus melakukan inisialisasi
klien layanan dan membuat koneksi ke database dalam metode
.set_up
, bukan metode__init__
. - Metode ini bersifat opsional. Jika tidak ditentukan, Vertex AI akan menggunakan konstruktor Python default untuk class.
set_up()
:- Anda harus menggunakan metode ini untuk menentukan logika inisialisasi agen. Misalnya, Anda menggunakan metode ini untuk membuat koneksi ke database atau layanan dependen, mengimpor paket dependen, atau melakukan prakomputasi data yang digunakan untuk menayangkan kueri.
- Metode ini bersifat opsional. Jika tidak ditentukan, Vertex AI akan mengasumsikan
bahwa agen tidak perlu memanggil metode
.set_up
sebelum menayangkan kueri pengguna.
query()
/stream_query()
:- Gunakan
query()
untuk menampilkan respons lengkap sebagai satu hasil. - Gunakan
stream_query()
untuk menampilkan respons dalam potongan saat tersedia, sehingga memungkinkan pengalaman streaming. Metodestream_query
harus menampilkan objek yang dapat di-iterasi (misalnya generator) untuk mengaktifkan streaming. - Anda dapat menerapkan kedua metode tersebut jika ingin mendukung interaksi streaming dan respons tunggal dengan agen.
- Anda harus memberikan docstring yang jelas ke metode ini yang menentukan fungsinya,
mendokumentasikan atributnya, dan memberikan anotasi jenis untuk inputnya.
Hindari argumen variabel dalam metode
query
danstream_query
.
- Gunakan
Membuat instance agen secara lokal
Anda dapat membuat instance lokal agen menggunakan kode berikut:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Menguji metode query
Anda dapat menguji agen dengan mengirim kueri ke instance lokal:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
Responsnya adalah kamus yang mirip dengan berikut ini:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respons aliran data
Untuk melakukan streaming respons ke kueri, Anda dapat menentukan metode bernama stream_query
yang menghasilkan respons. Sebagai contoh, template berikut memperluas contoh dasar untuk melakukan streaming respons dan dapat di-deploy di Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
Berikut beberapa hal penting yang perlu diingat saat menggunakan streaming API:
- Waktu tunggu maksimum: Waktu tunggu maksimum untuk respons streaming adalah 10 menit. Jika agen Anda memerlukan waktu pemrosesan yang lebih lama, pertimbangkan untuk membagi tugas menjadi beberapa bagian yang lebih kecil.
- Streaming model dan rantai: Antarmuka Runnable LangChain mendukung streaming, sehingga Anda dapat melakukan streaming respons tidak hanya dari agen, tetapi juga model dan rantai.
- Kompatibilitas LangChain: Perhatikan bahwa metode asinkron seperti
metode
astream_event
LangChain tidak didukung saat ini. - Mengoreksi pembuatan konten: Jika Anda mengalami masalah backpressure (saat produser menghasilkan data lebih cepat daripada yang dapat diproses konsumen), Anda harus mengoreksi kecepatan pembuatan konten. Hal ini dapat membantu mencegah buffer yang meluap dan memastikan pengalaman streaming yang lancar.
Menguji metode stream_query
Anda dapat menguji kueri streaming secara lokal dengan memanggil metode stream_query
dan melakukan iterasi pada hasilnya. Berikut contohnya:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Kode ini mencetak setiap bagian respons saat dibuat. Outputnya mungkin terlihat seperti ini:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Dalam contoh ini, setiap bagian berisi informasi yang berbeda tentang respons, seperti tindakan yang dilakukan oleh agen, pesan yang dipertukarkan, dan output akhir.
Mendaftarkan metode kustom
Secara default, metode query
dan stream_query
didaftarkan sebagai operasi
di agen yang di-deploy.
Anda dapat mengganti perilaku default dan menentukan kumpulan operasi yang akan
didaftarkan menggunakan metode register_operations
.
Operasi dapat didaftarkan sebagai mode eksekusi standar (diwakili oleh string kosong
""
) atau streaming ("stream"
).
Untuk mendaftarkan beberapa operasi, Anda dapat menentukan metode bernama
register_operations
yang mencantumkan metode yang akan disediakan untuk pengguna saat
agen di-deploy. Dalam kode contoh berikut, metode register_operations
akan menyebabkan agen yang di-deploy mendaftarkan query
dan get_state
sebagai
operasi yang berjalan secara sinkron, dan stream_query
dan get_state_history
sebagai
operasi yang melakukan streaming respons:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Anda dapat menguji metode kustom dengan memanggilnya langsung di instance lokal
agen, mirip dengan cara Anda menguji metode query
dan
stream_query
.
Menyediakan Anotasi Jenis
Anda dapat menggunakan anotasi jenis untuk menentukan jenis input dan output yang diharapkan dari metode agen Anda. Saat agen di-deploy, hanya jenis yang dapat diserialisasi JSON yang
didukung dalam input dan output operasi yang didukung oleh agen. Skema
input dan output dapat dianotasikan menggunakan model TypedDict
atau Pydantic.
Pada contoh berikut, kita menganotasi input sebagai TypedDict
, dan mengonversi
output mentah dari .get_state
(yang merupakan NamedTuple
) menjadi kamus
yang dapat diserialisasi menggunakan metode ._asdict()
-nya:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Pelacakan menggunakan OpenTelemetry
Untuk mengaktifkan pelacakan dengan library instrumentasi yang mendukung OpenTelemetry, Anda
dapat mengimpor dan menginisialisasi library tersebut dalam metode .set_up
.
Sebagai contoh, template berikut adalah modifikasi dari contoh dasar untuk mengekspor rekaman aktivitas ke Cloud Trace:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Menggunakan Variabel Lingkungan
Untuk menetapkan variabel lingkungan dengan cara yang berfungsi saat agen di-deploy,
Anda akan menetapkannya di dalam metode .set_up
, misalnya:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
env_vars: dict[str, str], # <- new
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
self.env_vars = env_vars # <- new
def set_up(self):
# Code for setting the environment variables
import os
for env_var_name, env_var_value in self.env_vars.items():
os.environ[env_var_name] = env_var_value
# End of code for setting the environment variables
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Melakukan integrasi dengan Secret Manager
Untuk berintegrasi dengan Secret Manager:
Instal library klien dengan menjalankan
pip install google-cloud-secret-manager
Ikuti petunjuk di Memberikan peran untuk agen yang di-deploy untuk memberikan peran "Secret Manager Secret Accessor" (
roles/secretmanager.secretAccessor
) kepada agen layanan Reasoning Engine melalui konsol Google Cloud.Impor dan lakukan inisialisasi klien dalam metode
.set_up
dan dapatkan secret yang sesuai saat diperlukan. Sebagai contoh, template berikut adalah modifikasi dari contoh dasar untuk menggunakan kunci API untukChatAnthropic
yang disimpan di Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Menangani kredensial
Saat di-deploy, agen mungkin perlu menangani berbagai jenis kredensial:
- Kredensial default aplikasi (ADC) yang biasanya berasal dari akun layanan,
- OAuth yang biasanya berasal dari akun pengguna, dan
- Penyedia identitas untuk kredensial dari akun eksternal (workload identity federation).
Kredensial Default Aplikasi
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Fungsi ini dapat digunakan dalam kode dengan cara berikut:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Untuk mengetahui detailnya, buka Cara kerja Kredensial Default Aplikasi.
OAuth
Kredensial pengguna biasanya diperoleh menggunakan OAuth 2.0.
Jika memiliki token akses (misalnya dari oauthlib
),
Anda dapat membuat instance google.oauth2.credentials.Credentials
. Selain itu,
jika Anda mendapatkan token refresh, Anda juga dapat menentukan token refresh dan URI
token untuk mengizinkan kredensial dimuat ulang secara otomatis:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Di sini, TOKEN_URI
, CLIENT_ID
, dan
CLIENT_SECRET
didasarkan pada Membuat kredensial klien OAuth.
Jika tidak memiliki token akses, Anda dapat menggunakan google_auth_oauthlib.flow
untuk
melakukan Alur Pemberian Otorisasi OAuth 2.0
untuk mendapatkan instance google.oauth2.credentials.Credentials
yang sesuai:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Untuk mengetahui detailnya, buka dokumentasi untuk modul google_auth_oauthlib.flow
.
Penyedia Identitas
Jika ingin mengautentikasi pengguna menggunakan email/sandi, nomor telepon, penyedia media sosial seperti Google, Facebook, atau GitHub, atau mekanisme autentikasi kustom, Anda dapat menggunakan Identity Platform atau Firebase Authentication, atau penyedia identitas apa pun yang mendukung OpenID Connect (OIDC).
Untuk mengetahui detailnya, buka Mengakses resource dari penyedia identitas OIDC.