使用 Python SDK 构建数据代理

本页面介绍了如何使用 Python SDK 向 Conversational Analytics API 发出请求。此示例 Python 代码演示了如何完成以下任务:

进行身份验证并设置环境

如需使用适用于 Conversational Analytics API 的 Python SDK,请按照 Conversational Analytics API SDK Colaboratory 笔记本中的说明下载并安装该 SDK。请注意,下载方法和 SDK Colab 的内容可能会发生变化。

完成笔记本中的设置说明后,您可以使用以下代码导入所需的 SDK 库,在 Colaboratory 环境中对您的 Google 账号进行身份验证,并初始化一个用于发出 API 请求的客户端:

from google.colab import auth
auth.authenticate_user()

from google.cloud import geminidataanalytics

data_agent_client = geminidataanalytics.DataAgentServiceClient()
data_chat_client = geminidataanalytics.DataChatServiceClient()

指定结算项目和系统指令

以下示例 Python 代码定义了在整个脚本中使用的结算项目和系统指令:

# Billing project
billing_project = "my_project_name"

# System description
system_description = "Help the user analyze their data."

按如下所示替换示例值:

  • my_project_name已启用所需 API 的结算项目的 ID。
  • Help the user analyze their data.:用于指导代理行为并根据您的需求自定义代理的系统指令。例如,您可以使用系统指令来定义业务术语(例如“忠实客户”的定义),控制回答长度(“使用数量少于 20 个字的内容来总结”)或设置数据格式(“符合公司标准”)。

连接到数据源

以下 Python 代码示例展示了如何为代理将要查询以回答问题的 LookerBigQueryLooker Studio 数据源定义连接详情。

连接到 Looker 数据

以下代码示例展示了如何使用 API 密钥或访问令牌定义与 Looker 探索的连接的详细信息。

API 密钥

您可以按照使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述,使用生成的 Looker API 密钥与 Looker 实例建立连接。

looker_client_id = "my_looker_client_id"
looker_client_secret = "my_looker_client_secret"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = lookml_model
looker_explore_reference.explore = explore

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference]

按如下所示替换示例值:

  • my_looker_client_id:您生成的 Looker API 密钥的客户端 ID。
  • my_looker_client_secret:您生成的 Looker API 密钥的客户端密钥。
  • https://my_company.looker.com:Looker 实例的完整网址。
  • my_model:包含您要连接的探索的 LookML 模型的名称。
  • my_explore:您希望数据代理查询的 Looker 探索的名称。

访问令牌

您可以按照使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述,使用访问令牌与 Looker 实例建立连接。

looker_access_token = "my_access_token"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = lookml_model
looker_explore_reference.explore = explore

credentials = geminidataanalytics.Credentials()
credentials.oauth.token.access_token = looker_access_token

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference]

按如下所示替换示例值:

  • my_access_token:您生成的用于向 Looker 进行身份验证的 access_token 值。
  • https://my_company.looker.com:Looker 实例的完整网址。
  • my_model:包含您要连接的探索的 LookML 模型的名称。
  • my_explore:您希望数据代理查询的 Looker 探索的名称。

连接到 BigQuery 数据

借助 Conversational Analytics API,您可以一次性连接和查询最多 10 个 BigQuery 表。

以下示例代码定义了与单个 BigQuery 表的连接。

bq_project_id = "my_project_id"
bq_dataset_id = "my_dataset_id"
bq_table_id = "my_table_id"

bigquery_table_reference = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference.project_id = bq_project_id
bigquery_table_reference.dataset_id = bq_dataset_id
bigquery_table_reference.table_id = bq_table_id

# Connect to your data source
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.bq.table_references = [bigquery_table_reference]

按如下所示替换示例值:

  • my_project_id:包含您要连接到的 BigQuery 数据集和表的 Google Cloud 项目的 ID。如需连接到公共数据集,请指定 bigquery-public-data
  • my_dataset_id:BigQuery 数据集的 ID。 例如 san_francisco
  • my_table_id:BigQuery 表的 ID。例如 street_trees

连接到 Looker Studio 数据

以下示例代码定义了与 Looker Studio 数据源的连接。

studio_datasource_id = "my_datasource_id"

studio_references = geminidataanalytics.StudioDatasourceReference()
studio_references.datasource_id = studio_datasource_id

## Connect to your data source
datasource_references.studio.studio_references = [studio_references]

在上面的示例中,将 my_datasource_id 替换为数据源 ID。

为有状态或无状态聊天设置上下文

Conversational Analytics API 支持多轮对话,让用户可以提出基于之前的上下文的后续问题。以下示例 Python 代码演示了如何为有状态或无状态聊天设置上下文:

  • 有状态聊天: Google Cloud 会存储和管理对话历史记录。有状态聊天本身就是多轮聊天,因为 API 会保留之前消息中的上下文。您只需在每轮中发送当前消息。
  • 无状态聊天:您的应用会管理对话历史记录。您必须在每条新消息中包含完整的对话历史记录。如需查看有关如何在无状态模式下管理多轮对话的详细示例,请参阅创建无状态多轮对话

有状态聊天

以下代码示例为有状态聊天设置了上下文,其中 Google Cloud 会存储和管理对话历史记录。您还可以选择在以下示例代码中添加 published_context.options.analysis.python.enabled = True 行,以启用使用 Python 进行高级分析。

# Set up context for stateful chat
published_context = geminidataanalytics.Context()
published_context.system_instruction = system_instruction
published_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
published_context.options.analysis.python.enabled = True

无状态聊天

以下示例代码为无状态聊天设置了上下文,在这种情况下,您必须在每条消息中发送完整的对话历史记录。您还可以选择在以下示例代码中添加 inline_context.options.analysis.python.enabled = True 行,以启用使用 Python 进行高级分析。

# Set up context for stateless chat
# datasource_references.looker.credentials = credentials
inline_context = geminidataanalytics.Context()
inline_context.system_instruction = system_instruction
inline_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
inline_context.options.analysis.python.enabled = True

创建数据代理

以下示例 Python 代码会发出 API 请求来创建数据代理,然后您可以使用该代理针对数据进行对话。该数据代理配置为使用指定的数据源、系统指令和上下文。

data_agent_id = "data_agent_1"

data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}" # Optional

request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/global",
    data_agent_id=data_agent_id, # Optional
    data_agent=data_agent,
)

try:
    data_agent_client.create_data_agent(request=request)
    print("Data Agent created")
except Exception as e:
    print(f"Error creating Data Agent: {e}")

在上面的示例中,将值 data_agent_1 替换为数据代理的唯一标识符。

检索数据代理

以下示例 Python 代码演示了如何发出 API 请求来检索您之前创建的数据代理。

# Initialize request arguments
data_agent_id = "data_agent_1"
request = geminidataanalytics.GetDataAgentRequest(
    name=f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
)

# Make the request
response = data_agent_client.get_data_agent(request=request)

# Handle the response
print(response)

在上面示例中,将值 data_agent_1 替换为您要检索的数据代理的唯一标识符。

创建对话

以下示例 Python 代码会发出 API 请求来创建对话。

# Initialize request arguments
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation = geminidataanalytics.Conversation()
conversation.agents = [f'projects/{billing_project}/locations/global/dataAgents/{data_agent_id}']
conversation.name = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"

request = geminidataanalytics.CreateConversationRequest(
    parent=f"projects/{billing_project}/locations/global",
    conversation_id=conversation_id,
    conversation=conversation,
)

# Make the request
response = data_chat_client.create_conversation(request=request)

# Handle the response
print(response)

按如下所示替换示例值:

  • data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
  • conversation_1:对话的唯一标识符。

使用 API 提问

创建数据代理对话后,以下示例 Python 代码会向代理发送查询。该代码使用您为有状态或无状态聊天设置的上下文。API 会返回一个消息流,表示代理回答查询所执行的步骤。

有状态聊天

发送包含 Conversation 引用的有状态聊天请求

您可以通过引用之前创建的 Conversation 资源,向数据代理发送有状态的聊天请求。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Create a conversation_reference
conversation_reference = geminidataanalytics.ConversationReference()
conversation_reference.conversation = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"
conversation_reference.data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# conversation_reference.data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent = f"projects/{billing_project}/locations/global",
    messages = messages,
    conversation_reference = conversation_reference
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据代理的自然语言问题。
  • data_agent_1:数据代理的唯一标识符,如创建数据代理中所定义。
  • conversation_1:对话的唯一标识符,如创建对话中所定义。

无状态聊天

以下代码示例演示了如何在为无状态聊天设置上下文时向数据代理发送查询。您可以通过引用之前定义的 DataAgent 资源或在请求中使用内嵌上下文来发送无状态查询。

发送包含 DataAgent 引用的无状态聊天请求

您可以通过引用之前创建的 DataAgent 资源,向数据代理发送请求。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"

data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
    data_agent_context = data_agent_context
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

按如下所示替换示例值:

  • Which species of tree is most prevalent?:要发送给数据代理的自然语言问题。
  • data_agent_1:数据代理的唯一标识符,如创建数据代理中所定义。

发送包含内嵌上下文的无状态聊天请求

以下示例代码演示了如何使用 inline_context 参数直接在无状态聊天请求中提供上下文。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

request = geminidataanalytics.ChatRequest(
    inline_context=inline_context,
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

在上面的示例中,将 Which species of tree is most prevalent? 替换为要发送给数据代理的自然语言问题。

创建无状态多轮对话

如需在无状态对话中提出后续问题,您的应用必须通过在每个新请求中发送整个消息历史记录来管理对话的上下文。以下示例展示了如何通过引用数据代理或使用内嵌上下文直接提供数据源来创建多轮对话。

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Use data agent context
data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Helper function for calling the API
def multi_turn_Conversation(msg):

    message = geminidataanalytics.Message()
    message.user_message.text = msg

    # Send a multi-turn request by including previous turns and the new message
    conversation_messages.append(message)

    request = geminidataanalytics.ChatRequest(
        parent=f"projects/{billing_project}/locations/global",
        messages=conversation_messages,
        # Use data agent context
        data_agent_context=data_agent_context,
        # Use inline context
        # inline_context=inline_context,
    )

    # Make the request
    stream = data_chat_client.chat(request=request)

    # Handle the response
    for response in stream:
      show_message(response)
      conversation_messages.append(response)

# Send the first turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在上面的示例中,按如下所示替换示例值:

  • data_agent_1:数据代理的唯一标识符,如创建数据代理部分的示例代码块中所定义。
  • Which species of tree is most prevalent?:要发送给数据代理的自然语言问题。
  • Can you show me the results as a bar chart?:基于或优化上一个问题的后续问题。

定义辅助函数

以下示例代码包含上一个代码示例中使用的辅助函数定义。这些函数有助于解析来自 API 的回答并显示结果。

from pygments import highlight, lexers, formatters
import pandas as pd
import requests
import json as json_lib
import altair as alt
import IPython
from IPython.display import display, HTML

import proto
from google.protobuf.json_format import MessageToDict, MessageToJson

def handle_text_response(resp):
  parts = getattr(resp, 'parts')
  print(''.join(parts))

def display_schema(data):
  fields = getattr(data, 'fields')
  df = pd.DataFrame({
    "Column": map(lambda field: getattr(field, 'name'), fields),
    "Type": map(lambda field: getattr(field, 'type'), fields),
    "Description": map(lambda field: getattr(field, 'description', '-'), fields),
    "Mode": map(lambda field: getattr(field, 'mode'), fields)
  })
  display(df)

def display_section_title(text):
  display(HTML('<h2>{}</h2>'.format(text)))

def format_looker_table_ref(table_ref):
 return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref.lookml_model, table_ref.explore, table_ref.looker_instance_uri)

def format_bq_table_ref(table_ref):
  return '{}.{}.{}'.format(table_ref.project_id, table_ref.dataset_id, table_ref.table_id)

def display_datasource(datasource):
  source_name = ''
  if 'studio_datasource_id' in datasource:
   source_name = getattr(datasource, 'studio_datasource_id')
  elif 'looker_explore_reference' in datasource:
   source_name = format_looker_table_ref(getattr(datasource, 'looker_explore_reference'))
  else:
    source_name = format_bq_table_ref(getattr(datasource, 'bigquery_table_reference'))

  print(source_name)
  display_schema(datasource.schema)

def handle_schema_response(resp):
  if 'query' in resp:
    print(resp.query.question)
  elif 'result' in resp:
    display_section_title('Schema resolved')
    print('Data sources:')
    for datasource in resp.result.datasources:
      display_datasource(datasource)

def handle_data_response(resp):
  if 'query' in resp:
    query = resp.query
    display_section_title('Retrieval query')
    print('Query name: {}'.format(query.name))
    print('Question: {}'.format(query.question))
    print('Data sources:')
    for datasource in query.datasources:
      display_datasource(datasource)
  elif 'generated_sql' in resp:
    display_section_title('SQL generated')
    print(resp.generated_sql)
  elif 'result' in resp:
    display_section_title('Data retrieved')

    fields = [field.name for field in resp.result.schema.fields]
    d = {}
    for el in resp.result.data:
      for field in fields:
        if field in d:
          d[field].append(el[field])
        else:
          d[field] = [el[field]]

    display(pd.DataFrame(d))

def handle_chart_response(resp):
  def _value_to_dict(v):
    if isinstance(v, proto.marshal.collections.maps.MapComposite):
      return _map_to_dict(v)
    elif isinstance(v, proto.marshal.collections.RepeatedComposite):
      return [_value_to_dict(el) for el in v]
    elif isinstance(v, (int, float, str, bool)):
      return v
    else:
      return MessageToDict(v)

  def _map_to_dict(d):
    out = {}
    for k in d:
      if isinstance(d[k], proto.marshal.collections.maps.MapComposite):
        out[k] = _map_to_dict(d[k])
      else:
        out[k] = _value_to_dict(d[k])
    return out

  if 'query' in resp:
    print(resp.query.instructions)
  elif 'result' in resp:
    vegaConfig = resp.result.vega_config
    vegaConfig_dict = _map_to_dict(vegaConfig)
    alt.Chart.from_json(json_lib.dumps(vegaConfig_dict)).display();

def show_message(msg):
  m = msg.system_message
  if 'text' in m:
    handle_text_response(getattr(m, 'text'))
  elif 'schema' in m:
    handle_schema_response(getattr(m, 'schema'))
  elif 'data' in m:
    handle_data_response(getattr(m, 'data'))
  elif 'chart' in m:
    handle_chart_response(getattr(m, 'chart'))
  print('\n')