本頁面說明如何使用 Python 對 Conversational Analytics API (透過 geminidataanalytics.googleapis.com
存取) 發出 HTTP 要求。
本頁的 Python 程式碼範例說明如何完成下列工作:
頁面結尾會提供完整的程式碼範例,以及用於串流 API 回應的輔助函式。
設定初始設定和驗證
下列 Python 程式碼範例會執行這些工作:
- 匯入必要的 Python 程式庫
- 使用 Google Cloud CLI 取得 HTTP 驗證的存取權杖
- 定義帳單專案和系統指令的變數
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request
from google.colab import auth
auth.authenticate_user()
access_token = !gcloud auth application-default print-access-token
headers = {
"Authorization": f"Bearer {access_token[0]}",
"Content-Type": "application/json",
}
billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:您啟用必要 API 的帳單專案 ID。
- YOUR-SYSTEM-INSTRUCTIONS:系統指令,可引導代理程式的行為,並根據資料需求進行自訂。舉例來說,您可以使用系統指令定義業務用語、控制回覆長度或設定資料格式。建議您使用「撰寫有效的系統指令」一文中的 YAML 格式定義系統指令,提供詳細且結構化的指引。
向 Looker 進行驗證
如果您打算連結至 Looker 資料來源,則必須驗證 Looker 執行個體。
使用 API 金鑰
下列 Python 程式碼範例說明如何使用 API 金鑰,向 Looker 執行個體驗證代理程式。
looker_credentials = {
"oauth": {
"secret": {
"client_id": "YOUR-LOOKER-CLIENT-ID",
"client_secret": "YOUR-LOOKER-CLIENT-SECRET",
}
}
}
請依下列方式替換範例值:
- YOUR-LOOKER-CLIENT-ID:您產生的 Looker API 金鑰用戶端 ID。
- YOUR-LOOKER-CLIENT-SECRET:您產生的 Looker API 金鑰用戶端密鑰。
使用存取權杖
下列 Python 程式碼範例說明如何使用存取權杖,向 Looker 執行個體驗證代理程式。
looker_credentials = {
"oauth": {
"token": {
"access_token": "YOUR-TOKEN",
}
}
}
請依下列方式替換範例值:
- YOUR-TOKEN:您產生的
access_token
值,用於向 Looker 驗證。
連結至資料來源
下列 Python 程式碼範例說明如何定義供代理程式使用的 Looker、BigQuery 或 Looker Studio 資料來源。
連結至 Looker 資料
下列程式碼範例會定義與 Looker 探索的連線。如要與 Looker 執行個體建立連線,請確認您已產生 Looker API 金鑰,詳情請參閱「使用 Conversational Analytics API 驗證及連線至資料來源」。
looker_data_source = {
"looker": {
"explore_references": {
"looker_instance_uri": "https://your_company.looker.com",
"lookml_model": "your_model",
"explore": "your_explore",
},
}
}
請依下列方式替換範例值:
- https://your_company.looker.com:Looker 執行個體的完整網址。
- your_model:包含要連線至的「探索」的 LookML 模型名稱。
- your_explore:您希望資料代理程式查詢的 Looker 探索名稱。
連結至 BigQuery 資料
透過 Conversational Analytics API,您一次最多可以連結及查詢 10 個 BigQuery 資料表。
下列程式碼範例會定義 BigQuery 資料表的連線。
bigquery_data_sources = {
"bq": {
"tableReferences": [
{
"projectId": "bigquery-public-data",
"datasetId": "san_francisco",
"tableId": "street_trees",
}
]
}
}
請依下列方式替換範例值:
- bigquery-public-data:包含要連結的 BigQuery 資料集和資料表的專案 ID。 Google Cloud 如要連線至公開資料集,請指定
bigquery-public-data
。 - san_francisco:BigQuery 資料集的 ID。
- street_trees:BigQuery 資料表的 ID。
連結至 Looker Studio 資料
下列程式碼範例定義了與 Looker Studio 資料來源的連線。
looker_studio_data_source = {
"studio":{
"studio_references": [
{
"studio_datasource_id": "studio_datasource_id"
}
]
}
}
將 studio_datasource_id 替換為資料來源 ID。
建立資料代理程式
以下程式碼範例示範如何將 HTTP POST
要求傳送至資料代理程式建立端點,藉此建立資料代理程式。要求酬載包含下列詳細資料:
- 代理程式的完整資源名稱。這個值包含專案 ID、位置和代理程式的專屬 ID。
- 資料代理程式的說明。
- 資料代理程式的環境,包括系統說明 (在「設定初始設定和驗證」中定義) 和代理程式使用的資料來源 (在「連結至資料來源」中定義)。
您也可以在要求酬載中加入 options
參數,選擇啟用 Python 進階分析功能。
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_id = "data_agent_1"
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的專屬 ID。這個值會用於代理程式的資源名稱,以及
data_agent_id
網址查詢參數。 - This is the description of data_agent_1.:資料代理程式的說明。
建立對話
下列程式碼範例說明如何建立與資料代理程式的對話。
conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
conversation_payload = {
"agents": [
f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
],
"name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
"conversation_id": conversation_id
}
conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)
if conversation_response.status_code == 200:
print("Conversation created successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error creating Conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
- conversation_1:對話的專屬 ID。
管理資料代理程式和對話
下列程式碼範例說明如何使用 Conversational Analytics API 管理資料代理程式和對話。您可以執行下列工作:
取得資料虛擬服務專員
下列程式碼範例示範如何將 HTTP GET
要求傳送至資料代理程式資源網址,以擷取現有資料代理程式。
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent_response = requests.get(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Fetched Data Agent successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error: {data_agent_response.status_code}")
print(data_agent_response.text)
在先前的範例中,將 data_agent_1 替換為要擷取的資料代理程式 ID。
列出資料代理
下列程式碼示範如何將 HTTP GET
要求傳送至 dataAgents
端點,列出指定專案的所有資料代理程式。
如要列出所有代理程式,您必須具備專案的 geminidataanalytics.dataAgents.list
權限。如要進一步瞭解哪些 IAM 角色包含這項權限,請參閱預先定義的角色清單。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_response = requests.get(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent Listed successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Listing Data Agent: {data_agent_response.status_code}")
將 YOUR-BILLING-PROJECT 替換為帳單專案的 ID。
更新資料代理程式
下列程式碼範例示範如何將 HTTP PATCH
要求傳送至資料代理程式資源網址,藉此更新資料代理程式。要求酬載包含要變更的欄位新值,要求參數則包含 updateMask
參數,用於指定要更新的欄位。
data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
payload = {
"description": "Updated description of the data agent.",
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction
}
},
}
fields = ["description", "data_analytics_agent"]
params = {
"updateMask": ",".join(fields)
}
data_agent_response = requests.patch(
data_agent_url, headers=headers, params=params, json=payload
)
if data_agent_response.status_code == 200:
print("Data Agent updated successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Updating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- data_agent_1:要更新的資料代理程式 ID。
- YOUR-BILLING-PROJECT:計費專案的 ID。
- Updated description of the data agent.:資料代理程式的新說明。
設定資料代理程式的 IAM 政策
如要共用代理程式,可以使用 setIamPolicy
方法,將 IAM 角色指派給特定代理程式的使用者。下列程式碼範例示範如何使用包含繫結的酬載,對資料代理程式 URL 進行 POST
呼叫。繫結會指定應將哪些角色指派給哪些使用者。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"
# Request body
payload = {
"policy": {
"bindings": [
{
"role": role,
"members": [
f"user:{i.strip()}" for i in users.split(",")
]
}
]
}
}
data_agent_response = requests.post(
data_agent_url, headers=headers, json=payload
)
if data_agent_response.status_code == 200:
print("IAM Policy set successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error setting IAM policy: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:計費專案的 ID。
- data_agent_1:您要設定 IAM 政策的資料代理程式 ID。
- 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:以半形逗號分隔的使用者電子郵件地址清單,您要將指定角色授予這些使用者。
取得資料代理程式的身分與存取權管理政策
下列程式碼範例示範如何將 HTTP POST
要求傳送至資料代理程式網址,藉此擷取資料代理程式的 IAM 政策。要求酬載包含資料代理程式路徑。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"
# Request body
payload = {
"resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}
data_agent_response = requests.post(
data_agent_url, headers=headers, json=payload
)
if data_agent_response.status_code == 200:
print("IAM Policy fetched successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error fetching IAM policy: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:計費專案的 ID。
- data_agent_1:您要取得 IAM 政策的資料代理程式 ID。
刪除資料代理程式
下列程式碼範例示範如何將 HTTP DELETE
要求傳送至資料代理程式資源網址,以軟性刪除資料代理程式。軟刪除是指刪除代理程式,但仍可在 30 天內擷取。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
data_agent_response = requests.delete(
data_agent_url, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent deleted successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:計費專案的 ID。
- data_agent_1:要刪除的資料代理程式 ID。
取得對話
下列範例程式碼示範如何將 HTTP GET
要求傳送至對話資源網址,以擷取現有對話。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:計費專案的 ID。
- conversation_1:要擷取的對話 ID。
列出對話
下列程式碼範例示範如何將 HTTP GET
要求傳送至 conversations
端點,列出特定專案的對話。
根據預設,這個方法會傳回您建立的對話。管理員 (具備 cloudaicompanion.topicAdmin
IAM 角色的使用者) 可以查看專案中的所有對話。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
將 YOUR-BILLING-PROJECT 替換為已啟用必要 API 的帳單專案 ID。
列出對話中的訊息
下列程式碼範例示範如何將 HTTP GET
要求傳送至對話的 messages
端點,列出對話中的所有訊息。
如要列出訊息,您必須具備對話的 cloudaicompanion.topics.get
權限。
billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"
conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"
conversation_response = requests.get(conversation_url, headers=headers)
# Handle the response
if conversation_response.status_code == 200:
print("Conversation fetched successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error while fetching conversation: {conversation_response.status_code}")
print(conversation_response.text)
請依下列方式替換範例值:
- YOUR-BILLING-PROJECT:計費專案的 ID。
- conversation_1:要列出訊息的對話 ID。
使用 API 提問
對話式數據分析 API 支援多輪對話,可讓使用者根據先前的脈絡提出後續問題。API 提供下列方法來管理對話記錄:
- 具狀態的即時通訊: Google Cloud 儲存及管理對話記錄。有狀態的即時通訊本質上是多輪對話,因為 API 會保留先前訊息的背景資訊。您只需要在每個回合傳送當下的訊息。
無狀態即時通訊:應用程式會管理對話記錄。每則新訊息都必須包含相關的先前訊息。如需在無狀態模式下管理多輪對話的詳細範例,請參閱「建立無狀態多輪對話」。
有狀態的對話
傳送含有對話參照的具狀態即時通訊要求
下列程式碼範例示範如何使用您在先前步驟中定義的 conversation 向 API 提問。這個範例使用 get_stream
輔助函式串流回應。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"conversation_reference": {
"conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
- conversation_1:對話的專屬 ID。
- 我們以
Make a bar graph for the top 5 states by the total number of airports
做為範例提示。
無狀態對話
傳送無狀態的即時通訊要求,並附上資料代理程式參照
下列程式碼範例示範如何使用先前步驟中定義的資料代理程式,向 API 提出無狀態問題。這個範例使用 get_stream
輔助函式串流回應。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
請依下列方式替換範例值:
- data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
- 我們以
Make a bar graph for the top 5 states by the total number of airports
做為範例提示。
傳送內含內嵌背景資訊的無狀態即時通訊要求
下列程式碼範例示範如何使用內嵌環境,向 API 提出無狀態問題。這個範例會使用 get_stream
輔助函式串流回應,並以 BigQuery 資料來源為例。
您也可以在要求酬載中加入 options
參數,選擇啟用 Python 進階分析功能。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"inline_context": {
"datasource_references": bigquery_data_sources,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
建立無狀態多輪對話
如要在無狀態對話中詢問後續問題,應用程式必須管理對話內容,方法是在每次提出新要求時,傳送完整的訊息記錄。以下各節說明如何定義及呼叫輔助函式,建立多輪對話:
傳送多輪對話要求
下列 multi_turn_Conversation
輔助函式會將訊息儲存在清單中,藉此管理對話內容。你可以根據先前的對話內容提出後續問題。在函式的有效負載中,您可以參照資料代理程式,或使用內嵌環境直接提供資料來源。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"
# List that is used to track previous turns and is reused across requests
conversation_messages = []
data_agent_id = "data_agent_1"
# Helper function for calling the API
def multi_turn_Conversation(msg):
userMessage = {
"userMessage": {
"text": msg
}
}
# Send a multi-turn request by including previous turns and the new message
conversation_messages.append(userMessage)
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": conversation_messages,
# Use a data agent reference
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
},
# Use inline context
# "inline_context": {
# "datasource_references": bigquery_data_sources,
# }
}
# Call the get_stream_multi_turn helper function to stream the response
get_stream_multi_turn(chat_url, chat_payload, conversation_messages)
在先前的範例中,請將 data_agent_1 替換為資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
您可以針對對話的每個回合呼叫 multi_turn_Conversation
輔助函式。下列程式碼範例說明如何傳送初始要求,然後根據先前的回覆傳送後續要求。
# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")
# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")
在先前的範例中,請依下列方式替換範例值:
- Which species of tree is most prevalent?:要傳送給資料代理程式的自然語言問題。
- Can you show me the results as a bar chart?:根據上一個問題延伸或修正的後續問題。
處理回覆
下列 get_stream_multi_turn
函式會處理串流 API 回應。這個函式與 get_stream
輔助函式類似,但會將回應儲存在 conversation_messages
清單中,以便儲存對話脈絡,供下一個回合使用。
def get_stream_multi_turn(url, json, conversation_messages):
s = requests.Session()
acc = ''
with s.post(url, json=json, headers=headers, stream=True) as resp:
for line in resp.iter_lines():
if not line:
continue
decoded_line = str(line, encoding='utf-8')
if decoded_line == '[{':
acc = '{'
elif decoded_line == '}]':
acc += '}'
elif decoded_line == ',':
continue
else:
acc += decoded_line
if not is_json(acc):
continue
data_json = json_lib.loads(acc)
# Store the response that will be used in the next iteration
conversation_messages.append(data_json)
if not 'systemMessage' in data_json:
if 'error' in data_json:
handle_error(data_json['error'])
continue
if 'text' in data_json['systemMessage']:
handle_text_response(data_json['systemMessage']['text'])
elif 'schema' in data_json['systemMessage']:
handle_schema_response(data_json['systemMessage']['schema'])
elif 'data' in data_json['systemMessage']:
handle_data_response(data_json['systemMessage']['data'])
elif 'chart' in data_json['systemMessage']:
handle_chart_response(data_json['systemMessage']['chart'])
else:
colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
print(colored_json)
print('\n')
acc = ''
端對端程式碼範例
下列可展開的程式碼範例包含本指南涵蓋的所有工作。
使用 HTTP 和 Python 建構資料代理程式
from pygments import highlight, lexers, formatters import pandas as pd import json as json_lib import requests import json import altair as alt import IPython from IPython.display import display, HTML import requests import google.auth from google.auth.transport.requests import Request from google.colab import auth auth.authenticate_user() access_token = !gcloud auth application-default print-access-token headers = { "Authorization": f"Bearer {access_token[0]}", "Content-Type": "application/json", } ################### Data source details ################### billing_project = "your_billing_project" location = "global" system_instruction = "Help the user in analyzing their data" # BigQuery data source bigquery_data_sources = { "bq": { "tableReferences": [ { "projectId": "bigquery-public-data", "datasetId": "san_francisco", "tableId": "street_trees" } ] } } # Looker data source looker_credentials = { "oauth": { "secret": { "client_id": "your_looker_client_id", "client_secret": "your_looker_client_secret", } } } # # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block. # looker_credentials = { # "oauth": { # "token": { # "access_token": "your_looker_access_token", # } # } # } looker_data_source = { "looker": { "explore_references": { "looker_instance_uri": "https://my_company.looker.com", "lookml_model": "my_model", "explore": "my_explore", }, # "credentials": looker_credentials } # Looker Studio data source looker_studio_data_source = { "studio":{ "studio_references": [ { "datasource_id": "your_studio_datasource_id" } ] } } ################### Create data agent ################### data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents" data_agent_id = "data_agent_1" data_agent_payload = { "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional "description": "This is the description of data_agent.", # Optional "data_analytics_agent": { "published_context": { "datasource_references": bigquery_data_sources, "system_instruction": system_instruction, # Optional: To enable advanced analysis with Python, include the following options block: "options": { "analysis": { "python": { "enabled": True } } } } } } params = {"data_agent_id": data_agent_id} # Optional data_agent_response = requests.post( data_agent_url, params=params, json=data_agent_payload, headers=headers ) if data_agent_response.status_code == 200: print("Data Agent created successfully!") print(json.dumps(data_agent_response.json(), indent=2)) else: print(f"Error creating Data Agent: {data_agent_response.status_code}") print(data_agent_response.text) ################### Create conversation ################### conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations" data_agent_id = "data_agent_1" conversation_id = "conversation _1" conversation_payload = { "agents": [ f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" ], "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}" } params = { "conversation_id": conversation_id } conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload) if conversation_response.status_code == 200: print("Conversation created successfully!") print(json.dumps(conversation_response.json(), indent=2)) else: print(f"Error creating Conversation: {conversation_response.status_code}") print(conversation_response.text) ################### Chat with the API by using conversation (stateful) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" conversation_id = "conversation _1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "conversation_reference": { "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}", "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using dataAgents (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using inline context (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "inline_context": { "datasource_references": bigquery_data_sources, # Optional - if wanting to use advanced analysis with python "options": { "analysis": { "python": { "enabled": True } } } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Multi-turn conversation ################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat" # List that is used to track previous turns and is reused across requests conversation_messages = [] data_agent_id = "data_agent_1" # Helper function for calling the API def multi_turn_Conversation(msg): userMessage = { "userMessage": { "text": msg } } # Send a multi-turn request by including previous turns and the new message conversation_messages.append(userMessage) # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": conversation_messages, # Use a data agent reference "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials }, # Use inline context # "inline_context": { # "datasource_references": bigquery_data_sources, # } } # Call the get_stream_multi_turn helper function to stream the response get_stream_multi_turn(chat_url, chat_payload, conversation_messages) # Send first-turn request multi_turn_Conversation("Which species of tree is most prevalent?") # Send follow-up-turn request multi_turn_Conversation("Can you show me the results as a bar chart?")
下列可展開的程式碼範例包含用於串流傳送即時通訊回覆的 Python 輔助函式。
用於串流對話回覆的輔助 Python 函式
def is_json(str): try: json_object = json_lib.loads(str) except ValueError as e: return False return True def handle_text_response(resp): parts = resp['parts'] print(''.join(parts)) def get_property(data, field_name, default = ''): return data[field_name] if field_name in data else default def display_schema(data): fields = data['fields'] df = pd.DataFrame({ "Column": map(lambda field: get_property(field, 'name'), fields), "Type": map(lambda field: get_property(field, 'type'), fields), "Description": map(lambda field: get_property(field, 'description', '-'), fields), "Mode": map(lambda field: get_property(field, 'mode'), fields) }) display(df) def display_section_title(text): display(HTML('<h2>{}</h2>'.format(text))) def format_bq_table_ref(table_ref): return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId']) def format_looker_table_ref(table_ref): return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri']) def display_datasource(datasource): source_name = '' if 'studioDatasourceId' in datasource: source_name = datasource['studioDatasourceId'] elif 'lookerExploreReference' in datasource: source_name = format_looker_table_ref(datasource['lookerExploreReference']) else: source_name = format_bq_table_ref(datasource['bigqueryTableReference']) print(source_name) display_schema(datasource['schema']) def handle_schema_response(resp): if 'query' in resp: print(resp['query']['question']) elif 'result' in resp: display_section_title('Schema resolved') print('Data sources:') for datasource in resp['result']['datasources']: display_datasource(datasource) def handle_data_response(resp): if 'query' in resp: query = resp['query'] display_section_title('Retrieval query') print('Query name: {}'.format(query['name'])) print('Question: {}'.format(query['question'])) print('Data sources:') for datasource in query['datasources']: display_datasource(datasource) elif 'generatedSql' in resp: display_section_title('SQL generated') print(resp['generatedSql']) elif 'result' in resp: display_section_title('Data retrieved') fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields']) dict = {} for field in fields: dict[field] = map(lambda el: get_property(el, field), resp['result']['data']) display(pd.DataFrame(dict)) def handle_chart_response(resp): if 'query' in resp: print(resp['query']['instructions']) elif 'result' in resp: vegaConfig = resp['result']['vegaConfig'] alt.Chart.from_json(json_lib.dumps(vegaConfig)).display(); def handle_error(resp): display_section_title('Error') print('Code: {}'.format(resp['code'])) print('Message: {}'.format(resp['message'])) def get_stream(url, json): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = '' def get_stream_multi_turn(url, json, conversation_messages): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) # Store the response that will be used in the next iteration conversation_messages.append(data_json) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = ''