本页面将引导您使用 Python 向 Conversational Analytics API(通过 geminidataanalytics.googleapis.com
访问)发出 HTTP 请求。
本页面上的示例 Python 代码展示了如何完成以下任务:
您还可以在 Conversational Analytics API HTTP Colaboratory 笔记本中运行本页面上的代码示例。
此页面末尾包含完整版示例代码,以及用于流式传输 API 响应的辅助函数。
配置初始设置和身份验证
以下示例 Python 代码会执行以下任务:
- 导入所需的 Python 库
- 定义结算项目的变量、系统指令,以及数据代理的问题
- 使用 Google Cloud CLI 获取用于 HTTP 身份验证的访问令牌
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request
from google.colab import auth
auth.authenticate_user()
billing_project = 'YOUR-BILLING-PROJECT'
system_description = 'YOUR-SYSTEM-INSTRUCTIONS'
question = 'YOUR-QUESTION-HERE'
access_token = !gcloud auth application-default print-access-token
headers = {
"Authorization": f"Bearer {access_token[0]}",
"Content-Type": "application/json",
}
按如下所示替换示例值:
- YOUR-BILLING-PROJECT:已启用所需 API 的结算项目的 ID。
- YOUR-SYSTEM-INSTRUCTIONS:系统指令,用于指导代理的行为,并让您可以根据需要自定义代理。例如,您可以使用系统指令来定义业务术语(例如“忠实客户”的定义),控制回答长度(“使用数量少于 20 个字的内容来总结”)或设置数据格式(“符合公司标准”)。将占位符文本替换为与您的数据和用例相关的说明。
- YOUR-QUESTION-HERE:要发送给数据代理的自然语言问题。
向 Looker 进行身份验证
如果您打算连接到 Looker 数据源,则需要向 Looker 实例进行身份验证。
使用 API 密钥
以下 Python 代码示例演示了如何使用 API 密钥向 Looker 实例进行代理身份验证。
looker_credentials = {
"oauth": {
"secret": {
"client_id": "YOUR-LOOKER-CLIENT-ID",
"client_secret": "YOUR-LOOKER-CLIENT-SECRET",
}
}
}
按如下所示替换示例值:
- YOUR-LOOKER-CLIENT-ID:您生成的 Looker API 密钥的客户端 ID。
- YOUR-LOOKER-CLIENT-SECRET:您生成的 Looker API 密钥的客户端密钥。
使用访问令牌
以下 Python 代码示例演示了如何使用访问令牌向 Looker 实例进行代理身份验证。
looker_credentials = {
"oauth": {
"token": {
"access_token": "YOUR-TOKEN",
}
}
}
按如下所示替换示例值:
- YOUR-TOKEN:您生成的用于向 Looker 进行身份验证的
access_token
值。
连接到数据源
以下 Python 代码示例演示了如何定义代理要使用的 Looker、BigQuery 或 Looker Studio 数据源。
连接到 Looker 数据
以下示例代码定义了与 Looker 探索的连接。如需与 Looker 实例建立连接,请验证您是否已生成 Looker API 密钥,如使用 Conversational Analytics API 对数据源进行身份验证并连接到数据源中所述。
looker_data_source = {
"looker": {
"explore_references": {
"looker_instance_uri": "https://your_company.looker.com",
"lookml_model": "your_model",
"explore": "your_explore",
},
}
}
按如下所示替换示例值:
- https://your_company.looker.com:Looker 实例的完整网址。
- your_model:包含您要连接到的探索的 LookML 模型的名称。
- your_explore:您希望数据代理查询的 Looker 探索的名称。
连接到 BigQuery 数据
借助 Conversational Analytics API,您可以一次性连接和查询最多 10 个 BigQuery 表。
以下示例代码定义了与 BigQuery 表的连接。
bigquery_data_sources = {
"bq": {
"tableReferences": [
{
"projectId": "bigquery-public-data",
"datasetId": "san_francisco",
"tableId": "street_trees",
}
]
}
}
按如下所示替换示例值:
- bigquery-public-data:包含您要连接到的 BigQuery 数据集和表的 Google Cloud 项目的 ID。如需连接到公共数据集,请指定
bigquery-public-data
。 - san_francisco:BigQuery 数据集的 ID。
- street_trees:BigQuery 表的 ID。
连接到 Looker Studio 数据
以下示例代码定义了与 Looker Studio 数据源的连接。
looker_studio_data_source = {
"studio":{
"studio_references": [
{
"studio_datasource_id": "studio_datasource_id"
}
]
}
}
将 studio_datasource_id 替换为数据源 ID。
创建数据代理
以下示例代码演示了如何通过向数据代理创建端点发送 HTTP POST
请求来创建数据代理。请求载荷包含以下详细信息:
- 代理的完整资源名称。此值包含项目 ID、位置和代理的唯一标识符。
- 数据代理的说明。
- 数据代理的上下文,包括系统说明(在配置初始设置和身份验证中定义)和代理使用的数据源(在连接到数据源中定义)。
您还可以选择在请求载荷中添加 options
参数,以启用使用 Python 进行高级分析。
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_id = "data_agent_1"
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
按如下所示替换示例值:
- data_agent_1:数据代理的唯一标识符。此值用于代理的资源名称中,并用作
data_agent_id
网址查询参数。 - This is the description of data_agent_1.:数据代理的说明。
创建对话
以下示例代码演示了如何创建与数据代理的对话。
conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
conversation_payload = {
"agents": [
f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
],
"name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
"conversation_id": conversation_id
}
conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)
if conversation_response.status_code == 200:
print("Conversation created successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error creating Conversation: {conversation_response.status_code}")
print(conversation_response.text)
按如下所示替换示例值:
- data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
- conversation_1:对话的唯一标识符。
使用 API 提问
Conversational Analytics API 支持多轮对话,让用户可以提出基于之前的上下文的后续问题。该 API 提供了以下用于管理对话历史记录的方法:
- 有状态聊天: Google Cloud 会存储和管理对话历史记录。有状态聊天本身就是多轮聊天,因为 API 会保留之前消息中的上下文。您只需在每轮中发送当前消息。
无状态聊天:您的应用会管理对话历史记录。您必须在每条新消息中添加相关的先前消息。如需查看有关如何在无状态模式下管理多轮对话的详细示例,请参阅创建无状态多轮对话。
有状态聊天
发送包含对话引用的有状态聊天请求
以下示例代码演示了如何使用您在之前的步骤中定义的对话向 API 提问。此示例使用 get_stream
辅助函数来流式传输回答。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"conversation_reference": {
"conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
按如下所示替换示例值:
- data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
- conversation_1:对话的唯一标识符。
Make a bar graph for the top 5 states by the total number of airports
用作示例提示。
无状态聊天
发送包含数据代理引用的无状态聊天请求
以下示例代码演示了如何使用您在之前的步骤中定义的数据代理向 API 提出无状态问题。此示例使用 get_stream
辅助函数来流式传输回答。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
按如下所示替换示例值:
- data_agent_1:数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
Make a bar graph for the top 5 states by the total number of airports
用作示例提示。
发送包含内嵌上下文的无状态聊天请求
以下示例代码演示了如何使用内嵌上下文向 API 提出无状态问题。此示例使用 get_stream
辅助函数来流式传输回答,并使用 BigQuery 数据源作为示例。
您还可以选择在请求载荷中添加 options
参数,以启用使用 Python 进行高级分析。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"inline_context": {
"datasource_references": bigquery_data_sources,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
创建无状态多轮对话
如需在无状态对话中提出后续问题,您的应用必须通过在每个新请求中发送整个消息历史记录来管理对话的上下文。以下部分展示了如何定义和调用辅助函数来创建多轮对话:
发送多轮请求
以下 multi_turn_Conversation
辅助函数通过将消息存储在列表中来管理对话上下文。这样,您可以发送基于之前的轮次的后续问题。在函数的载荷中,您可以引用数据代理,也可以使用内嵌上下文直接提供数据源。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"
# List that is used to track previous turns and is reused across requests
conversation_messages = []
data_agent_id = "data_agent_1"
# Helper function for calling the API
def multi_turn_Conversation(msg):
userMessage = {
"userMessage": {
"text": msg
}
}
# Send a multi-turn request by including previous turns and the new message
conversation_messages.append(userMessage)
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": conversation_messages,
# Use a data agent reference
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
},
# Use inline context
# "inline_context": {
# "datasource_references": bigquery_data_sources,
# }
}
# Call the get_stream_multi_turn helper function to stream the response
get_stream_multi_turn(chat_url, chat_payload, conversation_messages)
在上述示例中,将 data_agent_1 替换为数据代理的 ID,如创建数据代理部分的示例代码块中所定义。
您可以在每轮对话中调用 multi_turn_Conversation
辅助函数。以下示例代码展示了如何发送初始请求,然后发送基于之前回答的后续请求。
# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")
# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")
在上面的示例中,按如下所示替换示例值:
- Which species of tree is most prevalent?:要发送给数据代理的自然语言问题。
- Can you show me the results as a bar chart?:基于或优化上一个问题的后续问题。
处理回答
以下 get_stream_multi_turn
函数用于处理流式 API 响应。此函数类似于 get_stream
辅助函数,但它会将回答存储在 conversation_messages
列表中,以便保存下一轮对话的对话上下文。
def get_stream_multi_turn(url, json, conversation_messages):
s = requests.Session()
acc = ''
with s.post(url, json=json, headers=headers, stream=True) as resp:
for line in resp.iter_lines():
if not line:
continue
decoded_line = str(line, encoding='utf-8')
if decoded_line == '[{':
acc = '{'
elif decoded_line == '}]':
acc += '}'
elif decoded_line == ',':
continue
else:
acc += decoded_line
if not is_json(acc):
continue
data_json = json_lib.loads(acc)
# Store the response that will be used in the next iteration
conversation_messages.append(data_json)
if not 'systemMessage' in data_json:
if 'error' in data_json:
handle_error(data_json['error'])
continue
if 'text' in data_json['systemMessage']:
handle_text_response(data_json['systemMessage']['text'])
elif 'schema' in data_json['systemMessage']:
handle_schema_response(data_json['systemMessage']['schema'])
elif 'data' in data_json['systemMessage']:
handle_data_response(data_json['systemMessage']['data'])
elif 'chart' in data_json['systemMessage']:
handle_chart_response(data_json['systemMessage']['chart'])
else:
colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
print(colored_json)
print('\n')
acc = ''
端到端代码示例
以下可展开的代码示例包含本指南中涵盖的所有任务。
使用 HTTP 和 Python 构建数据代理
from pygments import highlight, lexers, formatters import pandas as pd import json as json_lib import requests import json import altair as alt import IPython from IPython.display import display, HTML import requests import google.auth from google.auth.transport.requests import Request from google.colab import auth auth.authenticate_user() access_token = !gcloud auth application-default print-access-token headers = { "Authorization": f"Bearer {access_token[0]}", "Content-Type": "application/json", } ################### Data source details ################### billing_project = "your_billing_project" location = "global" system_instruction = "Help the user in analyzing their data" # BigQuery data source bigquery_data_sources = { "bq": { "tableReferences": [ { "projectId": "bigquery-public-data", "datasetId": "san_francisco", "tableId": "street_trees" } ] } } # Looker data source looker_credentials = { "oauth": { "secret": { "client_id": "your_looker_client_id", "client_secret": "your_looker_client_secret", } } } # # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block. # looker_credentials = { # "oauth": { # "token": { # "access_token": "your_looker_access_token", # } # } # } looker_data_source = { "looker": { "explore_references": { "looker_instance_uri": "https://my_company.looker.com", "lookml_model": "my_model", "explore": "my_explore", }, # "credentials": looker_credentials } # Looker Studio data source looker_studio_data_source = { "studio":{ "studio_references": [ { "datasource_id": "your_studio_datasource_id" } ] } } ################### Create data agent ################### data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents" data_agent_id = "data_agent_1" data_agent_payload = { "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional "description": "This is the description of data_agent.", # Optional "data_analytics_agent": { "published_context": { "datasource_references": bigquery_data_sources, "system_instruction": system_instruction, # Optional: To enable advanced analysis with Python, include the following options block: "options": { "analysis": { "python": { "enabled": True } } } } } } params = {"data_agent_id": data_agent_id} # Optional data_agent_response = requests.post( data_agent_url, params=params, json=data_agent_payload, headers=headers ) if data_agent_response.status_code == 200: print("Data Agent created successfully!") print(json.dumps(data_agent_response.json(), indent=2)) else: print(f"Error creating Data Agent: {data_agent_response.status_code}") print(data_agent_response.text) ################### Create conversation ################### conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations" data_agent_id = "data_agent_1" conversation_id = "conversation _1" conversation_payload = { "agents": [ f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" ], "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}" } params = { "conversation_id": conversation_id } conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload) if conversation_response.status_code == 200: print("Conversation created successfully!") print(json.dumps(conversation_response.json(), indent=2)) else: print(f"Error creating Conversation: {conversation_response.status_code}") print(conversation_response.text) ################### Chat with the API by using conversation (stateful) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" conversation_id = "conversation _1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "conversation_reference": { "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}", "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using dataAgents (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using inline context (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "inline_context": { "datasource_references": bigquery_data_sources, # Optional - if wanting to use advanced analysis with python "options": { "analysis": { "python": { "enabled": True } } } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Multi-turn conversation ################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat" # List that is used to track previous turns and is reused across requests conversation_messages = [] data_agent_id = "data_agent_1" # Helper function for calling the API def multi_turn_Conversation(msg): userMessage = { "userMessage": { "text": msg } } # Send a multi-turn request by including previous turns and the new message conversation_messages.append(userMessage) # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": conversation_messages, # Use a data agent reference "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials }, # Use inline context # "inline_context": { # "datasource_references": bigquery_data_sources, # } } # Call the get_stream_multi_turn helper function to stream the response get_stream_multi_turn(chat_url, chat_payload, conversation_messages) # Send first-turn request multi_turn_Conversation("Which species of tree is most prevalent?") # Send follow-up-turn request multi_turn_Conversation("Can you show me the results as a bar chart?")
以下可展开的代码示例包含用于流式传输聊天回答的 Python 辅助函数。
用于流式传输聊天回答的辅助 Python 函数
def is_json(str): try: json_object = json_lib.loads(str) except ValueError as e: return False return True def handle_text_response(resp): parts = resp['parts'] print(''.join(parts)) def get_property(data, field_name, default = ''): return data[field_name] if field_name in data else default def display_schema(data): fields = data['fields'] df = pd.DataFrame({ "Column": map(lambda field: get_property(field, 'name'), fields), "Type": map(lambda field: get_property(field, 'type'), fields), "Description": map(lambda field: get_property(field, 'description', '-'), fields), "Mode": map(lambda field: get_property(field, 'mode'), fields) }) display(df) def display_section_title(text): display(HTML('<h2>{}</h2>'.format(text))) def format_bq_table_ref(table_ref): return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId']) def format_looker_table_ref(table_ref): return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri']) def display_datasource(datasource): source_name = '' if 'studioDatasourceId' in datasource: source_name = datasource['studioDatasourceId'] elif 'lookerExploreReference' in datasource: source_name = format_looker_table_ref(datasource['lookerExploreReference']) else: source_name = format_bq_table_ref(datasource['bigqueryTableReference']) print(source_name) display_schema(datasource['schema']) def handle_schema_response(resp): if 'query' in resp: print(resp['query']['question']) elif 'result' in resp: display_section_title('Schema resolved') print('Data sources:') for datasource in resp['result']['datasources']: display_datasource(datasource) def handle_data_response(resp): if 'query' in resp: query = resp['query'] display_section_title('Retrieval query') print('Query name: {}'.format(query['name'])) print('Question: {}'.format(query['question'])) print('Data sources:') for datasource in query['datasources']: display_datasource(datasource) elif 'generatedSql' in resp: display_section_title('SQL generated') print(resp['generatedSql']) elif 'result' in resp: display_section_title('Data retrieved') fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields']) dict = {} for field in fields: dict[field] = map(lambda el: get_property(el, field), resp['result']['data']) display(pd.DataFrame(dict)) def handle_chart_response(resp): if 'query' in resp: print(resp['query']['instructions']) elif 'result' in resp: vegaConfig = resp['result']['vegaConfig'] alt.Chart.from_json(json_lib.dumps(vegaConfig)).display(); def handle_error(resp): display_section_title('Error') print('Code: {}'.format(resp['code'])) print('Message: {}'.format(resp['message'])) def get_stream(url, json): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = '' def get_stream_multi_turn(url, json, conversation_messages): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) # Store the response that will be used in the next iteration conversation_messages.append(data_json) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = ''