在 Gemini Live API 中使用 Vertex AI RAG Engine

检索增强生成 (RAG) 是一种技术,用于检索相关信息并将其提供给 LLM,以生成可验证的回答。这些信息可以包括最新信息、主题和背景信息,或标准答案。

本页面介绍了如何将 Vertex AI RAG Engine 与 Gemini Live API 搭配使用,以便指定和检索 RAG 语料库中的信息。

前提条件

您必须先完成以下前提条件,然后才能将 Vertex AI RAG 引擎与多模态 Live API 搭配使用:

  1. 在 Vertex AI 中启用 RAG API。

  2. 创建 RAG 语料库示例

  3. 如需将文件上传到 RAG 语料库,请参阅 Import RAG files example API

设置

您可以通过将 Vertex AI RAG 引擎指定为工具,将 Vertex AI RAG 引擎与 Live API 搭配使用。以下代码示例演示了如何将 Vertex AI RAG 引擎指定为工具:

执行以下变量替换操作:

  • YOUR_PROJECT_ID:您的 Google Cloud 项目的 ID。
  • YOUR_CORPUS_ID:语料库的 ID。
  • YOUR_LOCATION:处理请求的区域。
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

使用 Websocket 进行实时通信

如需在客户端与服务器之间实现实时通信,您必须使用 Websocket。以下代码示例演示了如何使用 Python API 和 Python SDK 来使用 Websocket

Python API

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

Python SDK

如需了解如何安装生成式 AI SDK,请参阅安装库

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

使用 Vertex AI RAG 引擎作为上下文存储区

您可以将 Vertex AI RAG Engine 用作 Gemini Live API 的上下文存储区,以存储会话上下文,从而形成并检索与对话相关的过往上下文,并丰富当前上下文以供模型生成。您还可以利用此功能在不同的 Live API 会话之间共享上下文。

Vertex AI RAG 引擎支持存储和索引会话上下文中的以下形式的数据:

  • 文本
  • 音频语音

创建 MemoryCorpus 类型的语料库

如需存储和索引会话上下文中的对话文本,您必须创建 MemoryCorpus 类型的 RAG 语料库。您还必须在内存语料库配置中指定一个 LLM 解析器,该解析器用于解析从 Live API 存储的会话上下文,以便为索引构建内存。

此代码示例演示了如何创建语料库。不过,请先将变量替换为值。

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

指定用于存储上下文的记忆语料库

将记忆库与 Live API 搭配使用时,您必须将记忆库指定为检索工具,然后将 store_context 设置为 true,以允许 Live API 存储会话上下文。

此代码示例演示了如何指定用于存储上下文的记忆语料库。不过,请先将变量替换为值。

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

后续步骤