使用 HTTP 和 Python 建構資料代理程式

本頁面說明如何使用 Python 對 Conversational Analytics API (透過 geminidataanalytics.googleapis.com 存取) 發出 HTTP 要求。

本頁的 Python 程式碼範例說明如何完成下列工作:

頁面結尾會提供完整的程式碼範例,以及用於串流 API 回應的輔助函式

設定初始設定和驗證

下列 Python 程式碼範例會執行這些工作:

  • 匯入必要的 Python 程式庫
  • 使用 Google Cloud CLI 取得 HTTP 驗證的存取權杖
  • 定義帳單專案和系統指令的變數
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
    "Authorization": f"Bearer {access_token[0]}",
    "Content-Type": "application/json",
}

billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:您啟用必要 API 的帳單專案 ID。
  • YOUR-SYSTEM-INSTRUCTIONS:系統指令,可引導代理程式的行為,並根據資料需求進行自訂。舉例來說,您可以使用系統指令定義業務用語、控制回覆長度或設定資料格式。建議您使用「撰寫有效的系統指令」一文中的 YAML 格式定義系統指令,提供詳細且結構化的指引。

向 Looker 進行驗證

如果您打算連結至 Looker 資料來源,則必須驗證 Looker 執行個體。

使用 API 金鑰

下列 Python 程式碼範例說明如何使用 API 金鑰,向 Looker 執行個體驗證代理程式。

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

請依下列方式替換範例值:

  • YOUR-LOOKER-CLIENT-ID:您產生的 Looker API 金鑰用戶端 ID。
  • YOUR-LOOKER-CLIENT-SECRET:您產生的 Looker API 金鑰用戶端密鑰。

使用存取權杖

下列 Python 程式碼範例說明如何使用存取權杖,向 Looker 執行個體驗證代理程式。

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

請依下列方式替換範例值:

  • YOUR-TOKEN:您產生的 access_token 值,用於向 Looker 驗證。

連結至資料來源

下列 Python 程式碼範例說明如何定義供代理程式使用的 LookerBigQueryLooker Studio 資料來源。

連結至 Looker 資料

下列程式碼範例會定義與 Looker 探索的連線。如要與 Looker 執行個體建立連線,請確認您已產生 Looker API 金鑰,詳情請參閱「使用 Conversational Analytics API 驗證及連線至資料來源」。

looker_data_source = {
    "looker": {
        "explore_references": {
            "looker_instance_uri": "https://your_company.looker.com",
            "lookml_model": "your_model",
            "explore": "your_explore",
       },
    }
}

請依下列方式替換範例值:

  • https://your_company.looker.com:Looker 執行個體的完整網址。
  • your_model:包含要連線至的「探索」的 LookML 模型名稱。
  • your_explore:您希望資料代理程式查詢的 Looker 探索名稱。

連結至 BigQuery 資料

透過 Conversational Analytics API,您一次最多可以連結及查詢 10 個 BigQuery 資料表。

下列程式碼範例會定義 BigQuery 資料表的連線。

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "bigquery-public-data",
                "datasetId": "san_francisco",
                "tableId": "street_trees",
            }
        ]
    }
}

請依下列方式替換範例值:

  • bigquery-public-data:包含要連結的 BigQuery 資料集和資料表的專案 ID。 Google Cloud 如要連線至公開資料集,請指定 bigquery-public-data
  • san_francisco:BigQuery 資料集的 ID。
  • street_trees:BigQuery 資料表的 ID。

連結至 Looker Studio 資料

下列程式碼範例定義了與 Looker Studio 資料來源的連線。

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

studio_datasource_id 替換為資料來源 ID。

建立資料代理程式

以下程式碼範例示範如何將 HTTP POST 要求傳送至資料代理程式建立端點,藉此建立資料代理程式。要求酬載包含下列詳細資料:

  • 代理程式的完整資源名稱。這個值包含專案 ID、位置和代理程式的專屬 ID。
  • 資料代理程式的說明。
  • 資料代理程式的環境,包括系統說明 (在「設定初始設定和驗證」中定義) 和代理程式使用的資料來源 (在「連結至資料來源」中定義)。

您也可以在要求酬載中加入 options 參數,選擇啟用 Python 進階分析功能。

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的專屬 ID。這個值會用於代理程式的資源名稱,以及 data_agent_id 網址查詢參數。
  • This is the description of data_agent_1.:資料代理程式的說明。

建立對話

下列程式碼範例說明如何建立與資料代理程式的對話。

conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
  • conversation_1:對話的專屬 ID。

管理資料代理程式和對話

下列程式碼範例說明如何使用 Conversational Analytics API 管理資料代理程式和對話。您可以執行下列工作:

取得資料虛擬服務專員

下列程式碼範例示範如何將 HTTP GET 要求傳送至資料代理程式資源網址,以擷取現有資料代理程式。

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

在先前的範例中,將 data_agent_1 替換為要擷取的資料代理程式 ID。

列出資料代理

下列程式碼示範如何將 HTTP GET 要求傳送至 dataAgents 端點,列出指定專案的所有資料代理程式。

如要列出所有代理程式,您必須具備專案的 geminidataanalytics.dataAgents.list 權限。如要進一步瞭解哪些 IAM 角色包含這項權限,請參閱預先定義的角色清單。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agent: {data_agent_response.status_code}")

YOUR-BILLING-PROJECT 替換為帳單專案的 ID。

更新資料代理程式

下列程式碼範例示範如何將 HTTP PATCH 要求傳送至資料代理程式資源網址,藉此更新資料代理程式。要求酬載包含要變更的欄位新值,要求參數則包含 updateMask 參數,用於指定要更新的欄位。

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
location = "global"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • data_agent_1:要更新的資料代理程式 ID。
  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • Updated description of the data agent.:資料代理程式的新說明。

設定資料代理程式的 IAM 政策

如要共用代理程式,可以使用 setIamPolicy 方法,將 IAM 角色指派給特定代理程式的使用者。下列程式碼範例示範如何使用包含繫結的酬載,對資料代理程式 URL 進行 POST 呼叫。繫結會指定應將哪些角色指派給哪些使用者。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • data_agent_1:您要設定 IAM 政策的資料代理程式 ID。
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com:以半形逗號分隔的使用者電子郵件地址清單,您要將指定角色授予這些使用者。

取得資料代理程式的身分與存取權管理政策

下列程式碼範例示範如何將 HTTP POST 要求傳送至資料代理程式網址,藉此擷取資料代理程式的 IAM 政策。要求酬載包含資料代理程式路徑。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • data_agent_1:您要取得 IAM 政策的資料代理程式 ID。

刪除資料代理程式

下列程式碼範例示範如何將 HTTP DELETE 要求傳送至資料代理程式資源網址,以軟性刪除資料代理程式。軟刪除是指刪除代理程式,但仍可在 30 天內擷取。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • data_agent_1:要刪除的資料代理程式 ID。

取得對話

下列範例程式碼示範如何將 HTTP GET 要求傳送至對話資源網址,以擷取現有對話。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • conversation_1:要擷取的對話 ID。

列出對話

下列程式碼範例示範如何將 HTTP GET 要求傳送至 conversations 端點,列出特定專案的對話。

根據預設,這個方法會傳回您建立的對話。管理員 (具備 cloudaicompanion.topicAdmin IAM 角色的使用者) 可以查看專案中的所有對話。

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

YOUR-BILLING-PROJECT 替換為已啟用必要 API 的帳單專案 ID。

列出對話中的訊息

下列程式碼範例示範如何將 HTTP GET 要求傳送至對話的 messages 端點,列出對話中的所有訊息。

如要列出訊息,您必須具備對話的 cloudaicompanion.topics.get 權限

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

請依下列方式替換範例值:

  • YOUR-BILLING-PROJECT:計費專案的 ID。
  • conversation_1:要列出訊息的對話 ID。

使用 API 提問

建立資料代理程式對話後,您就可以提出資料相關問題。

對話式數據分析 API 支援多輪對話,可讓使用者根據先前的脈絡提出後續問題。API 提供下列方法來管理對話記錄:

  • 具狀態的即時通訊: Google Cloud 儲存及管理對話記錄。有狀態的即時通訊本質上是多輪對話,因為 API 會保留先前訊息的背景資訊。您只需要在每個回合傳送當下的訊息。
  • 無狀態即時通訊:應用程式會管理對話記錄。每則新訊息都必須包含相關的先前訊息。如需在無狀態模式下管理多輪對話的詳細範例,請參閱「建立無狀態多輪對話」。

有狀態的對話

傳送含有對話參照的具狀態即時通訊要求

下列程式碼範例示範如何使用您在先前步驟中定義的 conversation 向 API 提問。這個範例使用 get_stream 輔助函式串流回應。

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
  • conversation_1:對話的專屬 ID。
  • 我們以 Make a bar graph for the top 5 states by the total number of airports 做為範例提示。

無狀態對話

傳送無狀態的即時通訊要求,並附上資料代理程式參照

下列程式碼範例示範如何使用先前步驟中定義的資料代理程式,向 API 提出無狀態問題。這個範例使用 get_stream 輔助函式串流回應。

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

請依下列方式替換範例值:

  • data_agent_1:資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。
  • 我們以 Make a bar graph for the top 5 states by the total number of airports 做為範例提示。

傳送內含內嵌背景資訊的無狀態即時通訊要求

下列程式碼範例示範如何使用內嵌環境,向 API 提出無狀態問題。這個範例會使用 get_stream 輔助函式串流回應,並以 BigQuery 資料來源為例。

您也可以在要求酬載中加入 options 參數,選擇啟用 Python 進階分析功能。

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

建立無狀態多輪對話

如要在無狀態對話中詢問後續問題,應用程式必須管理對話內容,方法是在每次提出新要求時,傳送完整的訊息記錄。以下各節說明如何定義及呼叫輔助函式,建立多輪對話:

傳送多輪對話要求

下列 multi_turn_Conversation 輔助函式會將訊息儲存在清單中,藉此管理對話內容。你可以根據先前的對話內容提出後續問題。在函式的有效負載中,您可以參照資料代理程式,或使用內嵌環境直接提供資料來源。

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

在先前的範例中,請將 data_agent_1 替換為資料代理程式的 ID,如「建立資料代理程式」中的程式碼範例區塊所定義。

您可以針對對話的每個回合呼叫 multi_turn_Conversation 輔助函式。下列程式碼範例說明如何傳送初始要求,然後根據先前的回覆傳送後續要求。

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

在先前的範例中,請依下列方式替換範例值:

  • Which species of tree is most prevalent?:要傳送給資料代理程式的自然語言問題。
  • Can you show me the results as a bar chart?:根據上一個問題延伸或修正的後續問題。

處理回覆

下列 get_stream_multi_turn 函式會處理串流 API 回應。這個函式與 get_stream 輔助函式類似,但會將回應儲存在 conversation_messages 清單中,以便儲存對話脈絡,供下一個回合使用。

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

端對端程式碼範例

下列可展開的程式碼範例包含本指南涵蓋的所有工作。

使用 HTTP 和 Python 建構資料代理程式

    from pygments import highlight, lexers, formatters
    import pandas as pd
    import json as json_lib
    import requests
    import json
    import altair as alt
    import IPython
    from IPython.display import display, HTML
    import requests
    import google.auth
    from google.auth.transport.requests import Request

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }
    # # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }
    looker_data_source = {
        "looker": {
        "explore_references": {
            "looker_instance_uri": "https://my_company.looker.com",
            "lookml_model": "my_model",
            "explore": "my_explore",
        },
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references":
            [
                {
                "datasource_id": "your_studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

下列可展開的程式碼範例包含用於串流傳送即時通訊回覆的 Python 輔助函式。

用於串流對話回覆的輔助 Python 函式

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      print(''.join(parts))

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        print('Question: {}'.format(query['question']))
        print('Data sources:')
        for datasource in query['datasources']:
          display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''