맞춤 에이전트 개발

상담사 엔진의 상담사 템플릿은 Python 클래스로 정의됩니다. 다음 단계에서는 Vertex AI에 배포할 수 있는 에이전트를 인스턴스화하기 위한 맞춤 템플릿을 만드는 방법을 보여줍니다.

  1. 기본 예
  2. (선택사항) 응답 스트리밍
  3. (선택사항) 커스텀 메서드 등록
  4. (선택사항) 유형 주석 제공
  5. (선택사항) OpenTelemetry를 사용한 추적
  6. (선택사항) 환경 변수 작업
  7. (선택사항) Secret Manager와 통합
  8. (선택사항) 사용자 인증 정보 처리

기본 예시

기본적인 예를 들어 보자면 다음 Python 클래스는 Vertex AI에 배포할 수 있는 에이전트를 인스턴스화하기 위한 템플릿입니다. CLASS_NAME 변수에 MyAgent와 같은 값을 지정할 수 있습니다.

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

배포 시 고려사항

Python 클래스를 작성할 때는 다음 세 가지 메서드가 중요합니다.

  1. __init__():
    • 에이전트 구성 매개변수에만 이 메서드를 사용하세요. 예를 들어 이 메서드를 사용하여 모델 매개변수와 안전 속성을 사용자의 입력 인수로 수집할 수 있습니다. 이 메서드를 사용하여 프로젝트 ID, 리전, 애플리케이션 사용자 인증 정보, API 키와 같은 매개변수를 수집할 수도 있습니다.
    • 생성자는 상담사 엔진에 배포할 수 있도록 '피클링 가능한' 객체를 반환합니다. 따라서 서비스 클라이언트를 초기화하고 데이터베이스 연결을 설정할 때 __init__ 메서드 대신 .set_up 메서드를 사용해야 합니다.
    • 이 메서드는 선택사항입니다. 지정하지 않으면 Vertex AI에서 클래스에 기본 Python 생성자를 사용합니다.
  2. set_up():
    • 에이전트 초기화 로직을 정의하려면 이 메서드를 사용해야 합니다. 예를 들어 이 메서드를 사용하여 데이터베이스나 종속 서비스에 대한 연결을 설정하거나 종속 패키지를 가져오거나 쿼리를 처리하는 데 사용되는 데이터를 미리 계산할 수 있습니다.
    • 이 메서드는 선택사항입니다. 지정하지 않으면 Vertex AI는 사용자 쿼리를 처리하기 전에 에이전트에서 .set_up 메서드를 호출할 필요가 없다고 가정합니다.
  3. query() / stream_query():
    • query()를 사용하여 전체 응답을 단일 결과로 반환합니다.
    • stream_query()를 사용하여 응답이 제공될 때마다 응답을 청크로 반환하여 스트리밍 환경을 지원합니다. 스트리밍을 사용 설정하려면 stream_query 메서드가 반복 가능한 객체 (예: 생성자)를 반환해야 합니다.
    • 상담사와의 단일 응답 및 스트리밍 상호작용을 모두 지원하려면 두 메서드를 모두 구현할 수 있습니다.
    • 이 메서드에는 기능을 정의하고 속성을 문서화하고 입력에 대한 유형 주석을 제공하는 명확한 docstring을 제공해야 합니다. querystream_query 메서드에서 변수 인수를 사용하지 마세요.

로컬에서 에이전트 인스턴스화

다음 코드를 사용하여 상담사의 로컬 인스턴스를 만들 수 있습니다.

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

query 메서드 테스트

쿼리를 로컬 인스턴스에 전송하여 에이전트를 테스트할 수 있습니다.

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

응답은 다음과 유사한 딕셔너리입니다.

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

스트리밍 응답

쿼리에 대한 응답을 스트리밍하려면 응답을 생성하는 stream_query라는 메서드를 정의하면 됩니다. 예를 들어 다음 템플릿은 기본 예시를 확장하여 응답을 스트리밍하고 Vertex AI에 배포할 수 있습니다.

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

다음은 스트리밍 API를 사용할 때 유의해야 할 몇 가지 주요 사항입니다.

  • 최대 제한 시간: 스트리밍 응답의 최대 제한 시간은 10분입니다. 에이전트에 더 긴 처리 시간이 필요한 경우 작업을 더 작은 청크로 나누는 것이 좋습니다.
  • 모델 및 체인 스트리밍: LangChain의 Runnable 인터페이스는 스트리밍을 지원하므로 에이전트뿐만 아니라 모델 및 체인의 응답을 스트리밍할 수 있습니다.
  • LangChain 호환성: LangChain의 astream_event 메서드와 같은 비동기 메서드는 현재 지원되지 않습니다.
  • 콘텐츠 생성 제한: 백프레셔 문제 (생산자가 소비자가 처리할 수 있는 것보다 더 빠르게 데이터를 생성하는 경우)가 발생하면 콘텐츠 생성 속도를 제한해야 합니다. 이렇게 하면 버퍼 오버플로를 방지하고 원활한 스트리밍 환경을 보장할 수 있습니다.

stream_query 메서드 테스트

stream_query 메서드를 호출하고 결과를 반복하여 스트리밍 쿼리를 로컬에서 테스트할 수 있습니다. 예를 들면 다음과 같습니다.

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

이 코드는 응답의 각 청크가 생성될 때마다 이를 출력합니다. 출력은 다음과 같이 표시될 수 있습니다.

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

이 예에서 각 청크에는 상담사가 취한 조치, 교환된 메시지, 최종 출력과 같은 응답에 관한 다양한 정보가 포함됩니다.

커스텀 메서드 등록

기본적으로 querystream_query 메서드는 배포된 에이전트에서 작업으로 등록됩니다. register_operations 메서드를 사용하여 기본 동작을 재정의하고 등록할 작업 집합을 정의할 수 있습니다. 작업은 표준 (빈 문자열 ""로 표시됨) 또는 스트리밍 ("stream") 실행 모드로 등록할 수 있습니다.

여러 작업을 등록하려면 에이전트가 배포될 때 사용자에게 제공할 메서드를 나열하는 register_operations라는 메서드를 정의하면 됩니다. 다음 코드 예에서 register_operations 메서드를 사용하면 배포된 에이전트가 queryget_state를 동기식으로 실행되는 작업으로, stream_queryget_state_history를 응답을 스트리밍하는 작업으로 등록합니다.

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

querystream_query 메서드를 테스트하는 것과 마찬가지로 상담사의 로컬 인스턴스에서 직접 호출하여 맞춤 메서드를 테스트할 수 있습니다.

유형 주석 제공

유형 주석을 사용하여 상담사 메서드의 예상 입력 및 출력 유형을 지정할 수 있습니다. 에이전트가 배포되면 에이전트에서 지원하는 작업의 입력과 출력에서 JSON 직렬화 가능 유형만 지원됩니다. 입력과 출력의 스키마는 TypedDict 또는 Pydantic 모델을 사용하여 주석을 추가할 수 있습니다.

다음 예에서는 입력에 TypedDict 주석을 추가하고 .get_state (NamedTuple)의 원시 출력을 ._asdict() 메서드를 사용하여 직렬화 가능한 사전으로 변환합니다.

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

OpenTelemetry를 사용한 추적

OpenTelemetry를 지원하는 계측 라이브러리로 추적을 사용 설정하려면 .set_up 메서드에서 라이브러리를 가져와 초기화하면 됩니다.

예를 들어 다음 템플릿은 Cloud Trace로 트레이스를 내보내기 위해 기본 예를 수정한 것입니다.

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project)
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

환경 변수 작업

에이전트가 배포될 때 작동하는 방식으로 환경 변수를 설정하려면 .set_up 메서드 내에 설정합니다. 예를 들면 다음과 같습니다.

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
        env_vars: dict[str, str], # <- new
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location
        self.env_vars = env_vars # <- new

    def set_up(self):
        # Code for setting the environment variables
        import os
        for env_var_name, env_var_value in self.env_vars.items():
            os.environ[env_var_name] = env_var_value
        # End of code for setting the environment variables

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Secret Manager와 통합

Secret Manager와 통합하려면 다음 단계를 따르세요.

  1. 다음을 실행하여 클라이언트 라이브러리를 설치합니다.

    pip install google-cloud-secret-manager
  2. 배포된 에이전트에 역할 부여의 안내에 따라 Google Cloud 콘솔을 통해 추론 엔진 서비스 에이전트에 'Secret Manager 보안 비밀 접근자' 역할 (roles/secretmanager.secretAccessor)을 부여합니다.

  3. .set_up 메서드에서 클라이언트를 가져와 초기화하고 필요하면 상응하는 보안 비밀번호를 가져옵니다. 예를 들어 다음 템플릿은 Secret Manager에 저장ChatAnthropic의 API 키를 사용하도록 기본 예를 수정한 것입니다.

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

사용자 인증 정보 처리

상담사가 배포될 때 다양한 유형의 사용자 인증 정보를 처리해야 할 수 있습니다.

  1. 일반적으로 서비스 계정에서 발생하는 애플리케이션 기본 사용자 인증 정보 (ADC)
  2. 일반적으로 사용자 계정에서 발생하는 OAuth
  3. 외부 계정 사용자 인증 정보 (워크로드 아이덴티티 제휴)의 ID 공급업체

애플리케이션 기본 사용자 인증 정보

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

다음과 같이 코드에서 사용할 수 있습니다.

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

자세한 내용은 애플리케이션 기본 사용자 인증 정보의 작동 방식을 참고하세요.

OAuth

사용자 인증 정보는 일반적으로 OAuth 2.0을 사용하여 가져옵니다.

액세스 토큰 (예: oauthlib의 토큰)이 있으면 google.oauth2.credentials.Credentials 인스턴스를 만들 수 있습니다. 또한 갱신 토큰을 가져온 경우 갱신 토큰과 토큰 URI를 지정하여 사용자 인증 정보가 자동으로 새로고침되도록 할 수도 있습니다.

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

여기서 TOKEN_URI, CLIENT_ID, CLIENT_SECRETOAuth 클라이언트 사용자 인증 정보 만들기를 기반으로 합니다.

액세스 토큰이 없는 경우 google_auth_oauthlib.flow를 사용하여 OAuth 2.0 승인 부여 흐름을 실행하여 상응하는 google.oauth2.credentials.Credentials 인스턴스를 가져올 수 있습니다.

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

자세한 내용은 google_auth_oauthlib.flow 모듈 문서를 참고하세요.

ID 공급업체

이메일/비밀번호, 전화번호, Google, Facebook, GitHub와 같은 소셜 제공업체 또는 커스텀 인증 메커니즘을 사용하여 사용자를 인증하려는 경우 Identity Platform 또는 Firebase 인증 또는 OpenID Connect (OIDC)를 지원하는 ID 공급업체를 사용하면 됩니다.

자세한 내용은 OIDC ID 공급업체에서 리소스 액세스를 참고하세요.

다음 단계