このページでは、Python を使用して会話分析 API(geminidataanalytics.googleapis.com
を介してアクセス)に HTTP リクエストを行う方法について説明します。
このページの Python コードサンプルは、次のタスクを行う方法を示しています。
- 初期設定と認証を構成する
- Looker、BigQuery、Looker Studio のデータソースに接続する
- データ エージェントを作成する
- 会話を作成する
- API を使用して質問する
- ステートレス マルチターンの会話を作成する
このページの最後に、サンプルコードの完全版と、API レスポンスのストリーミングに使用されるヘルパー関数が記載されています。
初期設定と認証を構成する
以下の Python サンプルコードは、次のタスクを実行します。
- 必要な Python ライブラリをインポートする
- Google Cloud CLI を使用して HTTP 認証用のアクセス トークンを取得する
- 課金プロジェクトとシステム指示の変数を定義する
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request
from google.colab import auth
auth.authenticate_user()
access_token = !gcloud auth application-default print-access-token
headers = {
"Authorization": f"Bearer {access_token[0]}",
"Content-Type": "application/json",
}
billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'
サンプル値を次のように置き換えます。
- YOUR-BILLING-PROJECT: 必要な API を有効にした課金プロジェクトの ID。
- YOUR-SYSTEM-INSTRUCTIONS: エージェントの動作をガイドし、データのニーズに合わせてカスタマイズするためのシステム指示。たとえば、システム指示を使用してビジネス用語を定義したり、回答の長さを制御したり、データの形式を設定できます。理想的には、効果的なシステム指示を記述するで推奨されている YAML 形式でシステム指示を定義し、構造化された詳細なガイダンスを提供します。
Looker で認証を行う
Looker データソースに接続する場合は、Looker インスタンスで認証を行う必要があります。
API キーの使用
次の Python コードサンプルは、API キーを使用してエージェントを Looker インスタンスに対して認証する方法を示しています。
looker_credentials = {
"oauth": {
"secret": {
"client_id": "YOUR-LOOKER-CLIENT-ID",
"client_secret": "YOUR-LOOKER-CLIENT-SECRET",
}
}
}
サンプル値を次のように置き換えます。
- YOUR-LOOKER-CLIENT-ID: 生成された Looker API キーのクライアント ID。
- YOUR-LOOKER-CLIENT-SECRET: 生成された Looker API キーのクライアント シークレット。
アクセス トークンを使用する
次の Python コードサンプルは、アクセス トークンを使用してエージェントを Looker インスタンスに対して認証する方法を示しています。
looker_credentials = {
"oauth": {
"token": {
"access_token": "YOUR-TOKEN",
}
}
}
サンプル値を次のように置き換えます。
- YOUR-TOKEN: Looker の認証用に生成する
access_token
値。
データソースに接続する
次の Python コードサンプルは、エージェントが使用する Looker、BigQuery、Looker Studio データソースを定義する方法を示しています。
Looker データに接続する
次のサンプルコードは、Looker Explore への接続を定義します。Looker インスタンスとの接続を確立するには、会話分析 API を使用してデータソースの認証と接続を行うで説明されているように、Looker API キーが生成されていることを確認します。
looker_data_source = {
"looker": {
"explore_references": {
"looker_instance_uri": "https://your_company.looker.com",
"lookml_model": "your_model",
"explore": "your_explore",
},
}
}
サンプル値を次のように置き換えます。
- https://your_company.looker.com: Looker インスタンスの完全な URL。
- your_model: 接続する Explore を含む LookML モデルの名前。
- your_explore: データ エージェントがクエリを実行する Looker Explore の名前。
BigQuery データに接続する
会話分析 API を使用すると、一度に最大 10 個の BigQuery テーブルに接続してクエリを実行できます。
次のサンプルコードは、BigQuery テーブルへの接続を定義します。
bigquery_data_sources = {
"bq": {
"tableReferences": [
{
"projectId": "bigquery-public-data",
"datasetId": "san_francisco",
"tableId": "street_trees",
}
]
}
}
サンプル値を次のように置き換えます。
- bigquery-public-data: 接続する BigQuery データセットとテーブルを含む Google Cloud プロジェクトの ID。公開データセットに接続するには、
bigquery-public-data
を指定します。 - san_francisco: BigQuery データセットの ID。
- street_trees: BigQuery テーブルの ID。
Looker Studio データに接続する
次のサンプルコードは、Looker Studio データソースへの接続を定義します。
looker_studio_data_source = {
"studio":{
"studio_references": [
{
"studio_datasource_id": "studio_datasource_id"
}
]
}
}
studio_datasource_id は、データソース ID に置き換えます。
データ エージェントを作成する
次のサンプルコードは、データ エージェントの作成エンドポイントに HTTP POST
リクエストを送信してデータ エージェントを作成する方法を示しています。リクエスト ペイロードには次の詳細が含まれます。
- エージェントの完全なリソース名。この値には、プロジェクト ID、ロケーション、およびエージェントの固有識別子が含まれます。
- データ エージェントの説明。
- データ エージェントのコンテキスト。システムの説明(初期設定と認証を構成するで定義)とエージェントが使用するデータソース(データソースに接続するで定義)を含みます。
リクエスト ペイロードに options
パラメータを含めることで、Python を使用した高度な分析を有効にすることもできます。
data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"
data_agent_id = "data_agent_1"
data_agent_payload = {
"name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
"description": "This is the description of data_agent_1.", # Optional
"data_analytics_agent": {
"published_context": {
"datasource_references": bigquery_data_sources,
"system_instruction": system_instruction,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
}
params = {"data_agent_id": data_agent_id} # Optional
data_agent_response = requests.post(
data_agent_url, params=params, json=data_agent_payload, headers=headers
)
if data_agent_response.status_code == 200:
print("Data Agent created successfully!")
print(json.dumps(data_agent_response.json(), indent=2))
else:
print(f"Error creating Data Agent: {data_agent_response.status_code}")
print(data_agent_response.text)
サンプル値を次のように置き換えます。
- data_agent_1: データ エージェントが一意に識別されるようにする識別子。この値は、エージェントのリソース名と
data_agent_id
URL クエリ パラメータとして使用されます。 - This is the description of data_agent_1.: データ エージェントの説明。
会話を作成する
次のサンプルコードは、データ エージェントとの会話を作成する方法を示しています。
conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
conversation_payload = {
"agents": [
f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
],
"name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
"conversation_id": conversation_id
}
conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)
if conversation_response.status_code == 200:
print("Conversation created successfully!")
print(json.dumps(conversation_response.json(), indent=2))
else:
print(f"Error creating Conversation: {conversation_response.status_code}")
print(conversation_response.text)
サンプル値を次のように置き換えます。
- data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
- conversation_1: 会話の固有識別子。
API を使用して質問する
データ エージェントと会話を作成したら、データに質問できます。
会話分析 API はマルチターンの会話をサポートしています。これにより、ユーザーは以前のコンテキストに基づいてフォローアップの質問を実施できます。この API には、会話履歴を管理するための次のメソッドが用意されています。
- ステートフル リクエスト: Google Cloud が会話履歴を保存して管理します。ステートフル チャットは、API が以前のメッセージのコンテキストを保持するため、本質的にマルチターンです。各ターンで現在のメッセージのみを送信します。
ステートレス チャット: アプリケーションが会話履歴を管理します。新しいメッセージには、関連する以前のメッセージを含める必要があります。ステートレス モードでマルチターン会話を管理する方法の詳細な例については、ステートレス マルチターン会話を作成するをご覧ください。
ステートフル チャット
会話参照を含むステートフル チャット リクエストを送信する
次のサンプルコードは、前の手順で定義した会話を使用して API に質問する方法を示しています。このサンプルでは、get_stream
ヘルパー関数を使用してレスポンスをストリーミングします。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"conversation_reference": {
"conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
サンプル値を次のように置き換えます。
- data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
- conversation_1: 会話の固有識別子。
- サンプル プロンプトとして
Make a bar graph for the top 5 states by the total number of airports
を使用しました。
ステートレス チャット
データ エージェント参照を含むステートレス チャット リクエストを送信する
次のサンプルコードは、前の手順で定義したデータ エージェントを使用して、API にステートレスな質問をする方法を示しています。このサンプルでは、get_stream
ヘルパー関数を使用してレスポンスをストリーミングします。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"
data_agent_id = "data_agent_1"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
サンプル値を次のように置き換えます。
- data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID。
- サンプル プロンプトとして
Make a bar graph for the top 5 states by the total number of airports
を使用しました。
インライン コンテキストを含むステートレス チャット リクエストを送信する
次のサンプルコードは、インライン コンテキストを使用して API にステートレスな質問をする方法を示しています。このサンプルでは、get_stream
ヘルパー関数を使用してレスポンスをストリーミングし、BigQuery データソースを例として使用します。
リクエスト ペイロードに options
パラメータを含めることで、Python を使用した高度な分析を有効にすることもできます。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": [
{
"userMessage": {
"text": "Make a bar graph for the top 5 states by the total number of airports"
}
}
],
"inline_context": {
"datasource_references": bigquery_data_sources,
# Optional: To enable advanced analysis with Python, include the following options block:
"options": {
"analysis": {
"python": {
"enabled": True
}
}
}
}
}
# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)
ステートレス マルチターンの会話を作成する
ステートレスな会話でフォローアップの質問をするには、アプリケーションで新しいリクエストごとにメッセージ履歴全体を送信して、会話のコンテキストを管理する必要があります。以降のセクションでは、マルチターン会話を作成するためにヘルパー関数を定義して呼び出す方法について説明します。
マルチターンのリクエストを送信する
次の multi_turn_Conversation
ヘルパー関数は、メッセージをリストに保存して会話コンテキストを管理します。これにより、前のターンを基にしたフォローアップの質問を送信できます。関数のペイロードで、データ エージェントを参照するか、インライン コンテキストを使用してデータソースを直接指定できます。
chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"
# List that is used to track previous turns and is reused across requests
conversation_messages = []
data_agent_id = "data_agent_1"
# Helper function for calling the API
def multi_turn_Conversation(msg):
userMessage = {
"userMessage": {
"text": msg
}
}
# Send a multi-turn request by including previous turns and the new message
conversation_messages.append(userMessage)
# Construct the payload
chat_payload = {
"parent": f"projects/{billing_project}/locations/global",
"messages": conversation_messages,
# Use a data agent reference
"data_agent_context": {
"data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
# "credentials": looker_credentials
},
# Use inline context
# "inline_context": {
# "datasource_references": bigquery_data_sources,
# }
}
# Call the get_stream_multi_turn helper function to stream the response
get_stream_multi_turn(chat_url, chat_payload, conversation_messages)
前の例で、data_agent_1 は、データ エージェントを作成するのサンプルコード ブロックで定義されているデータ エージェントの ID に置き換えます。
会話のターンごとに multi_turn_Conversation
ヘルパー関数を呼び出すことができます。次のサンプルコードは、最初のリクエストを送信し、前のレスポンスを基にフォローアップ リクエストを送信する方法を示しています。
# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")
# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")
前の例では、サンプル値を次のように置き換えます。
- Which species of tree is most prevalent?: データ エージェントに送信する自然言語の質問。
- Can you show me the results as a bar chart?: 前の質問を基にする、または前の質問を絞り込むフォローアップの質問。
レスポンスを処理する
次の get_stream_multi_turn
関数は、ストリーミング API レスポンスを処理します。この関数は get_stream
ヘルパー関数と似ていますが、レスポンスを conversation_messages
リストに保存して、次のターンの会話コンテキストを保存します。
def get_stream_multi_turn(url, json, conversation_messages):
s = requests.Session()
acc = ''
with s.post(url, json=json, headers=headers, stream=True) as resp:
for line in resp.iter_lines():
if not line:
continue
decoded_line = str(line, encoding='utf-8')
if decoded_line == '[{':
acc = '{'
elif decoded_line == '}]':
acc += '}'
elif decoded_line == ',':
continue
else:
acc += decoded_line
if not is_json(acc):
continue
data_json = json_lib.loads(acc)
# Store the response that will be used in the next iteration
conversation_messages.append(data_json)
if not 'systemMessage' in data_json:
if 'error' in data_json:
handle_error(data_json['error'])
continue
if 'text' in data_json['systemMessage']:
handle_text_response(data_json['systemMessage']['text'])
elif 'schema' in data_json['systemMessage']:
handle_schema_response(data_json['systemMessage']['schema'])
elif 'data' in data_json['systemMessage']:
handle_data_response(data_json['systemMessage']['data'])
elif 'chart' in data_json['systemMessage']:
handle_chart_response(data_json['systemMessage']['chart'])
else:
colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
print(colored_json)
print('\n')
acc = ''
エンドツーエンドのコードサンプル
次の展開可能なコードサンプルには、このガイドで説明するすべてのタスクが含まれています。
HTTP と Python を使用してデータ エージェントを構築する
from pygments import highlight, lexers, formatters import pandas as pd import json as json_lib import requests import json import altair as alt import IPython from IPython.display import display, HTML import requests import google.auth from google.auth.transport.requests import Request from google.colab import auth auth.authenticate_user() access_token = !gcloud auth application-default print-access-token headers = { "Authorization": f"Bearer {access_token[0]}", "Content-Type": "application/json", } ################### Data source details ################### billing_project = "your_billing_project" location = "global" system_instruction = "Help the user in analyzing their data" # BigQuery data source bigquery_data_sources = { "bq": { "tableReferences": [ { "projectId": "bigquery-public-data", "datasetId": "san_francisco", "tableId": "street_trees" } ] } } # Looker data source looker_credentials = { "oauth": { "secret": { "client_id": "your_looker_client_id", "client_secret": "your_looker_client_secret", } } } # # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block. # looker_credentials = { # "oauth": { # "token": { # "access_token": "your_looker_access_token", # } # } # } looker_data_source = { "looker": { "explore_references": { "looker_instance_uri": "https://my_company.looker.com", "lookml_model": "my_model", "explore": "my_explore", }, # "credentials": looker_credentials } # Looker Studio data source looker_studio_data_source = { "studio":{ "studio_references": [ { "datasource_id": "your_studio_datasource_id" } ] } } ################### Create data agent ################### data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents" data_agent_id = "data_agent_1" data_agent_payload = { "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional "description": "This is the description of data_agent.", # Optional "data_analytics_agent": { "published_context": { "datasource_references": bigquery_data_sources, "system_instruction": system_instruction, # Optional: To enable advanced analysis with Python, include the following options block: "options": { "analysis": { "python": { "enabled": True } } } } } } params = {"data_agent_id": data_agent_id} # Optional data_agent_response = requests.post( data_agent_url, params=params, json=data_agent_payload, headers=headers ) if data_agent_response.status_code == 200: print("Data Agent created successfully!") print(json.dumps(data_agent_response.json(), indent=2)) else: print(f"Error creating Data Agent: {data_agent_response.status_code}") print(data_agent_response.text) ################### Create conversation ################### conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations" data_agent_id = "data_agent_1" conversation_id = "conversation _1" conversation_payload = { "agents": [ f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}" ], "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}" } params = { "conversation_id": conversation_id } conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload) if conversation_response.status_code == 200: print("Conversation created successfully!") print(json.dumps(conversation_response.json(), indent=2)) else: print(f"Error creating Conversation: {conversation_response.status_code}") print(conversation_response.text) ################### Chat with the API by using conversation (stateful) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" conversation_id = "conversation _1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "conversation_reference": { "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}", "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using dataAgents (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat" data_agent_id = "data_agent_1" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Chat with the API by using inline context (stateless) #################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat" # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": [ { "userMessage": { "text": "Make a bar graph for the top 5 states by the total number of airports" } } ], "inline_context": { "datasource_references": bigquery_data_sources, # Optional - if wanting to use advanced analysis with python "options": { "analysis": { "python": { "enabled": True } } } } } # Call the get_stream function to stream the response get_stream(chat_url, chat_payload) ################### Multi-turn conversation ################### chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat" # List that is used to track previous turns and is reused across requests conversation_messages = [] data_agent_id = "data_agent_1" # Helper function for calling the API def multi_turn_Conversation(msg): userMessage = { "userMessage": { "text": msg } } # Send a multi-turn request by including previous turns and the new message conversation_messages.append(userMessage) # Construct the payload chat_payload = { "parent": f"projects/{billing_project}/locations/global", "messages": conversation_messages, # Use a data agent reference "data_agent_context": { "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # "credentials": looker_credentials }, # Use inline context # "inline_context": { # "datasource_references": bigquery_data_sources, # } } # Call the get_stream_multi_turn helper function to stream the response get_stream_multi_turn(chat_url, chat_payload, conversation_messages) # Send first-turn request multi_turn_Conversation("Which species of tree is most prevalent?") # Send follow-up-turn request multi_turn_Conversation("Can you show me the results as a bar chart?")
次の展開可能なコードサンプルには、チャット レスポンスのストリーミングに使用される Python ヘルパー関数が含まれています。
チャット レスポンスをストリーミングするヘルパー Python 関数
def is_json(str): try: json_object = json_lib.loads(str) except ValueError as e: return False return True def handle_text_response(resp): parts = resp['parts'] print(''.join(parts)) def get_property(data, field_name, default = ''): return data[field_name] if field_name in data else default def display_schema(data): fields = data['fields'] df = pd.DataFrame({ "Column": map(lambda field: get_property(field, 'name'), fields), "Type": map(lambda field: get_property(field, 'type'), fields), "Description": map(lambda field: get_property(field, 'description', '-'), fields), "Mode": map(lambda field: get_property(field, 'mode'), fields) }) display(df) def display_section_title(text): display(HTML('<h2>{}</h2>'.format(text))) def format_bq_table_ref(table_ref): return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId']) def format_looker_table_ref(table_ref): return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri']) def display_datasource(datasource): source_name = '' if 'studioDatasourceId' in datasource: source_name = datasource['studioDatasourceId'] elif 'lookerExploreReference' in datasource: source_name = format_looker_table_ref(datasource['lookerExploreReference']) else: source_name = format_bq_table_ref(datasource['bigqueryTableReference']) print(source_name) display_schema(datasource['schema']) def handle_schema_response(resp): if 'query' in resp: print(resp['query']['question']) elif 'result' in resp: display_section_title('Schema resolved') print('Data sources:') for datasource in resp['result']['datasources']: display_datasource(datasource) def handle_data_response(resp): if 'query' in resp: query = resp['query'] display_section_title('Retrieval query') print('Query name: {}'.format(query['name'])) print('Question: {}'.format(query['question'])) print('Data sources:') for datasource in query['datasources']: display_datasource(datasource) elif 'generatedSql' in resp: display_section_title('SQL generated') print(resp['generatedSql']) elif 'result' in resp: display_section_title('Data retrieved') fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields']) dict = {} for field in fields: dict[field] = map(lambda el: get_property(el, field), resp['result']['data']) display(pd.DataFrame(dict)) def handle_chart_response(resp): if 'query' in resp: print(resp['query']['instructions']) elif 'result' in resp: vegaConfig = resp['result']['vegaConfig'] alt.Chart.from_json(json_lib.dumps(vegaConfig)).display(); def handle_error(resp): display_section_title('Error') print('Code: {}'.format(resp['code'])) print('Message: {}'.format(resp['message'])) def get_stream(url, json): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = '' def get_stream_multi_turn(url, json, conversation_messages): s = requests.Session() acc = '' with s.post(url, json=json, headers=headers, stream=True) as resp: for line in resp.iter_lines(): if not line: continue decoded_line = str(line, encoding='utf-8') if decoded_line == '[{': acc = '{' elif decoded_line == '}]': acc += '}' elif decoded_line == ',': continue else: acc += decoded_line if not is_json(acc): continue data_json = json_lib.loads(acc) # Store the response that will be used in the next iteration conversation_messages.append(data_json) if not 'systemMessage' in data_json: if 'error' in data_json: handle_error(data_json['error']) continue if 'text' in data_json['systemMessage']: handle_text_response(data_json['systemMessage']['text']) elif 'schema' in data_json['systemMessage']: handle_schema_response(data_json['systemMessage']['schema']) elif 'data' in data_json['systemMessage']: handle_data_response(data_json['systemMessage']['data']) elif 'chart' in data_json['systemMessage']: handle_chart_response(data_json['systemMessage']['chart']) else: colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter()) print(colored_json) print('\n') acc = ''