使用 HTTP 和 Python 建構資料代理程式

本頁面說明如何使用 Python 對 Conversational Analytics API (透過 geminidataanalytics.googleapis.com 存取) 發出 HTTP 要求。

本頁的 Python 程式碼範例說明如何完成下列工作:

頁面結尾會提供完整的程式碼範例,以及用於串流 API 回應的輔助函式

設定初始設定和驗證

下列 Python 程式碼範例會執行這些工作:

  • 匯入必要的 Python 程式庫
  • 使用 Google Cloud CLI 取得 HTTP 驗證的存取權杖
  • 定義帳單專案和系統指令的變數
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
    "Authorization": f"Bearer {access_token[0]}",
    "Content-Type": "application/json",
}

billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:您啟用必要 API 的帳單專案 ID。
  • YOUR-SYSTEM-INSTRUCTIONS:系統指令,可引導代理程式的行為,並根據您的資料需求進行自訂。舉例來說,您可以使用系統指令定義業務用語、控制回覆長度,或設定資料格式。建議您使用「撰寫有效的系統指令」中建議的 YAML 格式定義系統指令,提供詳細且結構化的指引。

向 Looker 進行驗證

如果您打算連結至 Looker 資料來源,則必須驗證 Looker 執行個體。

使用 API 金鑰

下列 Python 程式碼範例說明如何使用 API 金鑰,向 Looker 執行個體驗證代理程式。

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

請依下列方式替換範例值:

  • YOUR-LOOKER-CLIENT-ID:您產生的 Looker API 金鑰用戶端 ID。
  • YOUR-LOOKER-CLIENT-SECRET:您產生的 Looker API 金鑰用戶端密鑰。

使用存取權杖

下列 Python 程式碼範例說明如何使用存取權杖,向 Looker 執行個體驗證代理程式。

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

請依下列方式替換範例值:

  • YOUR-TOKEN:您產生的 access_token 值,用於向 Looker 驗證。

連結至資料來源

以下各節說明如何定義代理程式資料來源的連線詳細資料。代理程式可以連結至 LookerBigQueryLooker Studio 中的資料。

連結至 Looker 資料

下列程式碼範例會定義與 Looker 探索的連線。如要與 Looker 執行個體建立連線,請確認您已產生 Looker API 金鑰,如「使用 Conversational Analytics API 驗證及連線至資料來源」一文所述。使用 Conversational Analytics API 時,一次只能連結一個 Looker 探索。

looker_data_source = {
    "looker": {
        "explore_references": {
            "looker_instance_uri": "https://your_company.looker.com",
            "lookml_model": "your_model",
            "explore": "your_explore",
       },
       # Do not include the following line during agent creation
       "credentials": looker_credentials
    }
}

請依下列方式替換範例值:

  • https://your_company.looker.com:Looker 執行個體的完整網址。
  • your_model:包含要連結的「探索」的 LookML 模型名稱。
  • your_explore:您希望資料代理程式查詢的 Looker 探索名稱。

連結至 BigQuery 資料

使用對話式數據分析 API 時,可連結的 BigQuery 資料表數量沒有硬性限制。不過,連結大量表格可能會降低準確度,或導致您超出模型的輸入權杖限制。

下列程式碼範例定義了與多個 BigQuery 資料表的連線。

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "my_project_id",
                "datasetId": "my_dataset_id",
                "tableId": "my_table_id"
            },
            {
                "projectId": "my_project_id_2",
                "datasetId": "my_dataset_id_2",
                "tableId": "my_table_id_2"
            },
            {
                "projectId": "my_project_id_3",
                "datasetId": "my_dataset_id_3",
                "tableId": "my_table_id_3"
            },
        ]
    }
}

請依下列方式替換範例值:

  • my_project_id:包含要連結的 BigQuery 資料集和資料表的專案 ID。 Google Cloud 如要連線至公開資料集,請指定 bigquery-public-data
  • my_dataset_id:BigQuery 資料集的 ID。
  • my_table_id:BigQuery 資料表的 ID。

連結至 Looker Studio 資料

下列程式碼範例定義了與 Looker Studio 資料來源的連線。

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

studio_datasource_id 替換為資料來源 ID。

建立資料代理程式

以下程式碼範例示範如何將 HTTP POST 要求傳送至資料代理程式建立端點,藉此建立資料代理程式。要求酬載包含下列詳細資料:

  • 代理程式的完整資源名稱。這個值包含專案 ID、位置和代理程式的專屬 ID。
  • 資料代理人的說明。
  • 資料代理程式的環境,包括系統說明 (在「設定初始設定和驗證」中定義) 和代理程式使用的資料來源 (在「連結至資料來源」中定義)。

您也可以在要求酬載中加入 options 參數,選擇啟用 Python 進階分析功能。

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的專屬 ID。這個值會用於代理程式的資源名稱,以及 data_agent_id 網址查詢參數。
  • This is the description of data_agent_1.:資料代理程式的說明。

建立對話

下列程式碼範例說明如何使用資料代理程式建立對話。

conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
  • conversation_1:對話的專屬 ID。

管理資料代理程式和對話

下列程式碼範例說明如何使用 Conversational Analytics API 管理資料代理程式和對話。您可以執行下列工作:

取得資料虛擬服務專員

下列程式碼範例示範如何將 HTTP GET 要求傳送至資料代理程式資源網址,以擷取現有資料代理程式。

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

在先前的範例中,請將 data_agent_1 替換為要擷取的資料代理程式 ID。

列出資料代理

下列程式碼示範如何將 HTTP GET 要求傳送至 dataAgents 端點,列出指定專案的所有資料代理程式。

如要列出所有代理程式,您必須具備專案的 geminidataanalytics.dataAgents.list 權限。如要進一步瞭解哪些 IAM 角色包含這項權限,請參閱預先定義的角色清單。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agents: {data_agent_response.status_code}")

YOUR-BILLING-PROJECT 替換為帳單專案的 ID。

列出可存取的資料代理

下列程式碼示範如何將 HTTP GET 要求傳送至 dataAgents:listAccessible 端點,列出指定專案的所有可存取資料代理程式。

billing_project = "YOUR-BILLING-PROJECT"
creator_filter = "YOUR-CREATOR-FILTER"
location = "global"
data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents:listAccessible"

params = {
    "creator_filter": creator_filter
}

data_agent_response = requests.get(
    data_agent_url, headers=headers, params=params
)

if data_agent_response.status_code == 200:
    print("Accessible Data Agents Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Accessible Data Agents: {data_agent_response.status_code}")

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • YOUR-CREATOR-FILTER:根據資料代理程式建立者套用的篩選器。可能的值包括 NONE (預設值)、CREATOR_ONLYNOT_CREATOR_ONLY

更新資料代理程式

下列程式碼範例示範如何將 HTTP PATCH 要求傳送至資料代理程式資源網址,藉此更新資料代理程式。要求酬載包含要變更的欄位新值,要求參數則包含 updateMask 參數,用於指定要更新的欄位。

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
location = "global"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • data_agent_1:要更新的資料代理程式 ID。
  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • Updated description of the data agent.:資料代理程式的新說明。

設定資料代理程式的 IAM 政策

如要共用代理程式,可以使用 setIamPolicy 方法,將 IAM 角色指派給特定代理程式的使用者。下列程式碼範例示範如何使用包含繫結的酬載,對資料代理程式網址發出 POST 呼叫。繫結會指定應將哪些角色指派給哪些使用者。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • data_agent_1:您要設定 IAM 政策的資料代理程式 ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:以半形逗號分隔的使用者電子郵件地址清單,您要將指定角色授予這些使用者。

取得資料代理程式的身分與存取權管理政策

下列程式碼範例示範如何將 HTTP POST 要求傳送至資料代理程式網址,以擷取資料代理程式的 IAM 政策。要求酬載包含資料代理程式路徑。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • data_agent_1:您要取得 IAM 政策的資料代理程式 ID。

刪除資料代理程式

下列程式碼範例示範如何將 HTTP DELETE 要求傳送至資料代理程式資源網址,以軟性刪除資料代理程式。軟刪除是指刪除代理程式,但仍可在 30 天內擷取。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • data_agent_1:要刪除的資料代理程式 ID。

取得對話

下列範例程式碼示範如何將 HTTP GET 要求傳送至對話資源網址,以擷取現有對話。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • conversation_1:要擷取的對話 ID。

列出對話

下列程式碼範例示範如何將 HTTP GET 要求傳送至 conversations 端點,列出特定專案的對話。

根據預設,這個方法會傳回您建立的對話。管理員 (具備 cloudaicompanion.topicAdmin IAM 角色的使用者) 可以查看專案中的所有對話。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

YOUR-BILLING-PROJECT 替換為已啟用必要 API 的帳單專案 ID。

列出對話中的訊息

下列程式碼範例示範如何將 HTTP GET 要求傳送至對話的 messages 端點,列出對話中的所有訊息。

如要列出訊息,您必須具備對話的 cloudaicompanion.topics.get 權限

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1beta/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • conversation_1:您要列出訊息的對話 ID。

使用 API 提問

建立資料代理程式對話後,您就可以提出資料相關問題。

對話式數據分析 API 支援多輪對話,可讓使用者根據先前的脈絡提出後續問題。API 提供下列方法來管理對話記錄:

  • 有狀態的對話: Google Cloud 儲存及管理對話記錄。有狀態的即時通訊本質上是多輪對話,因為 API 會保留先前訊息的背景資訊。您只需要在每個回合中傳送當前訊息。
  • 無狀態對話:應用程式會管理對話記錄。每則新訊息都必須包含相關的先前訊息。如需在無狀態模式下管理多輪對話的詳細範例,請參閱「建立無狀態多輪對話」。

有狀態的對話

傳送含有對話參照的具狀態即時通訊要求

下列程式碼範例示範如何使用先前步驟中定義的 conversation 向 API 提問。這個範例使用 get_stream 輔助函式串流回應。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
  • conversation_1:對話的專屬 ID。
  • 我們以「Make a bar graph for the top 5 states by the total number of airports」做為範例提示。

無狀態對話

傳送無狀態的即時通訊要求,並附上資料代理程式參照

以下範例程式碼示範如何使用先前步驟中定義的資料代理程式,向 API 提出無狀態問題。這個範例使用 get_stream 輔助函式串流回應。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
  • 我們以「Make a bar graph for the top 5 states by the total number of airports」做為範例提示。

傳送內含內嵌背景資訊的無狀態即時通訊要求

下列程式碼範例示範如何使用內嵌環境,向 API 提出無狀態問題。這個範例會使用 get_stream 輔助函式串流回應,並以 BigQuery 資料來源為例。

您也可以在要求酬載中加入 options 參數,選擇啟用 Python 進階分析功能。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

建立無狀態多輪對話

如要在無狀態對話中詢問後續問題,應用程式必須管理對話內容,方法是在每次提出新要求時,傳送完整的訊息記錄。以下各節說明如何定義及呼叫輔助函式,建立多輪對話:

傳送多輪對話要求

下列 multi_turn_Conversation 輔助函式會將訊息儲存在清單中,藉此管理對話內容。你可以根據先前的對話內容提出後續問題。在函式的酬載中,您可以參照資料代理程式,或使用內嵌環境直接提供資料來源。

chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

在先前的範例中,請將 data_agent_1 替換為資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。

您可以針對對話的每個回合呼叫 multi_turn_Conversation 輔助函式。下列程式碼範例說明如何傳送初始要求,然後根據先前的回覆傳送後續要求。

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在先前的範例中,請依下列方式替換範例值:

  • Which species of tree is most prevalent?:要傳送給資料代理程式的自然語言問題。
  • Can you show me the results as a bar chart?:根據或修正先前問題的後續問題。

處理回覆

下列 get_stream_multi_turn 函式會處理串流 API 回應。這個函式與 get_stream 輔助函式類似,但會將回應儲存在 conversation_messages 清單中,以便儲存對話脈絡,供下一個回合使用。

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

端對端程式碼範例

下列可展開的程式碼範例包含本指南涵蓋的所有工作。

使用 HTTP 和 Python 建構資料代理程式

    from pygments import highlight, lexers, formatters
    import pandas as pd
    import json as json_lib
    import requests
    import json
    import altair as alt
    import IPython
    from IPython.display import display, HTML
    import requests
    import google.auth
    from google.auth.transport.requests import Request

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }

    # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }

    looker_data_source = {
        "looker": {
        "explore_references": {
            "looker_instance_uri": "https://my_company.looker.com",
            "lookml_model": "my_model",
            "explore": "my_explore",
        },
        # Do not include the following line during agent creation
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references":
            [
                {
                "datasource_id": "your_studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1beta/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

下列可展開的程式碼範例包含用於串流傳送即時通訊回覆的 Python 輔助函式。

用於串流對話回覆的輔助 Python 函式

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      print(''.join(parts))

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        print('Question: {}'.format(query['question']))
        print('Data sources:')
        for datasource in query['datasources']:
          display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''