Creare un agente dati utilizzando HTTP e Python

Questa pagina ti guida nell'utilizzo di Python per effettuare richieste HTTP all'API Conversational Analytics (a cui si accede tramite geminidataanalytics.googleapis.com).

L'esempio di codice Python in questa pagina mostra come completare le seguenti attività:

Alla fine della pagina è inclusa una versione completa del codice campione, insieme alle funzioni di assistenza utilizzate per trasmettere in streaming la risposta dell'API.

Configurare le impostazioni iniziali e l'autenticazione

Il seguente codice Python di esempio esegue queste attività:

  • Importa le librerie Python richieste
  • Ottiene un token di accesso per l'autenticazione HTTP utilizzando Google Cloud CLI
  • Definisce le variabili per il progetto di fatturazione e le istruzioni di sistema
from pygments import highlight, lexers, formatters
import pandas as pd
import json as json_lib
import requests
import json
import altair as alt
import IPython
from IPython.display import display, HTML
import google.auth
from google.auth.transport.requests import Request

from google.colab import auth
auth.authenticate_user()

access_token = !gcloud auth application-default print-access-token
headers = {
    "Authorization": f"Bearer {access_token[0]}",
    "Content-Type": "application/json",
}

billing_project = 'YOUR-BILLING-PROJECT'
system_instruction = 'YOUR-SYSTEM-INSTRUCTIONS'

Sostituisci i valori di esempio come segue:

  • YOUR-BILLING-PROJECT: l'ID del progetto di fatturazione in cui hai abilitato le API richieste.
  • YOUR-SYSTEM-INSTRUCTIONS: istruzioni di sistema per guidare il comportamento dell'agente e personalizzarlo in base alle tue esigenze di dati. Ad esempio, puoi utilizzare le istruzioni di sistema per definire i termini commerciali, controllare la lunghezza della risposta o impostare la formattazione dei dati. Idealmente, definisci le istruzioni di sistema utilizzando il formato YAML consigliato in Scrivere istruzioni di sistema efficaci per fornire indicazioni dettagliate e strutturate.

Autenticarsi in Looker

Se prevedi di connetterti a un'origine dati di Looker, dovrai autenticarti nell'istanza di Looker.

Utilizzo di chiavi API

L'esempio di codice Python seguente mostra come autenticare l'agente in un'istanza di Looker utilizzando le chiavi API.

looker_credentials = {
    "oauth": {
        "secret": {
            "client_id": "YOUR-LOOKER-CLIENT-ID",
            "client_secret": "YOUR-LOOKER-CLIENT-SECRET",
        }
    }
}

Sostituisci i valori di esempio come segue:

  • YOUR-LOOKER-CLIENT-ID: l'ID client della chiave API Looker generata.
  • YOUR-LOOKER-CLIENT-SECRET: il client secret della chiave API Looker generata.

Utilizzo dei token di accesso

Il seguente esempio di codice Python mostra come autenticare l'agente in un'istanza di Looker utilizzando i token di accesso.

looker_credentials = {
    "oauth": {
        "token": {
            "access_token": "YOUR-TOKEN",
        }
    }
}

Sostituisci i valori di esempio come segue:

  • YOUR-TOKEN: il valore access_token che generi per l'autenticazione a Looker.

Connetti a un'origine dati

I seguenti esempi di codice Python mostrano come definire l'origine dati Looker, BigQuery o Looker Studio da utilizzare per l'agente.

Connettersi ai dati di Looker

Il seguente codice campione definisce una connessione a un'esplorazione di Looker. Per stabilire una connessione con un'istanza di Looker, verifica di aver generato le chiavi API di Looker, come descritto in Autenticare e connettersi a un'origine dati con l'API Conversational Analytics.

looker_data_source = {
    "looker": {
        "explore_references": {
            "looker_instance_uri": "https://your_company.looker.com",
            "lookml_model": "your_model",
            "explore": "your_explore",
       },
    }
}

Sostituisci i valori di esempio come segue:

  • https://your_company.looker.com: l'URL completo dell'istanza di Looker.
  • your_model: il nome del modello LookML che include l'Explore a cui vuoi connetterti.
  • your_explore: il nome dell'esplorazione di Looker che vuoi che l'agente dati interroghi.

Connettersi ai dati BigQuery

Con l'API Conversational Analytics, puoi connetterti ed eseguire query su un massimo di 10 tabelle BigQuery alla volta.

Il seguente codice campione definisce una connessione a una tabella BigQuery.

bigquery_data_sources = {
    "bq": {
        "tableReferences": [
            {
                "projectId": "bigquery-public-data",
                "datasetId": "san_francisco",
                "tableId": "street_trees",
            }
        ]
    }
}

Sostituisci i valori di esempio come segue:

  • bigquery-public-data: l'ID del progetto Google Cloud che contiene il set di dati e la tabella BigQuery a cui vuoi connetterti. Per connetterti a un set di dati pubblici, specifica bigquery-public-data.
  • san_francisco: l'ID del set di dati BigQuery.
  • street_trees: l'ID della tabella BigQuery.

Connettersi ai dati di Looker Studio

Il seguente codice campione definisce una connessione a un'origine dati di Looker Studio.

looker_studio_data_source = {
    "studio":{
        "studio_references": [
            {
              "studio_datasource_id": "studio_datasource_id"
            }
        ]
    }
}

Sostituisci studio_datasource_id con l'ID origine dati.

Crea un agente dati

Il seguente codice campione mostra come creare l'agente dati inviando una richiesta HTTP POST all'endpoint di creazione dell'agente dati. Il payload della richiesta include i seguenti dettagli:

Se vuoi, puoi anche attivare l'analisi avanzata con Python includendo il parametro options nel payload della richiesta.

data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_id = "data_agent_1"

data_agent_payload = {
      "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
      "description": "This is the description of data_agent_1.", # Optional

      "data_analytics_agent": {
          "published_context": {
              "datasource_references": bigquery_data_sources,
              "system_instruction": system_instruction,
              # Optional: To enable advanced analysis with Python, include the following options block:
              "options": {
                  "analysis": {
                      "python": {
                          "enabled": True
                      }
                  }
              }
          }
      }
  }

params = {"data_agent_id": data_agent_id} # Optional

data_agent_response = requests.post(
    data_agent_url, params=params, json=data_agent_payload, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent created successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error creating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Sostituisci i valori di esempio come segue:

  • data_agent_1: un identificatore univoco per l'agente dei dati. Questo valore viene utilizzato nel nome della risorsa dell'agente e come parametro di query URL data_agent_id.
  • This is the description of data_agent_1.: una descrizione dell'agente di dati.

Creare una conversazione

Il seguente codice campione mostra come creare una conversazione con l'agente di dati.

conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation_payload = {
    "agents": [
        f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
    ],
    "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
}
params = {
    "conversation_id": conversation_id
}

conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

if conversation_response.status_code == 200:
    print("Conversation created successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error creating Conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Sostituisci i valori di esempio come segue:

  • data_agent_1: l'ID dell'agente dati, come definito nel blocco di codice campione in Creare un agente dati.
  • conversation_1: un identificatore univoco per la conversazione.

Gestire gli agenti di dati e le conversazioni

I seguenti esempi di codice mostrano come gestire gli agenti di dati e le conversazioni utilizzando l'API Conversational Analytics. Puoi eseguire le seguenti operazioni:

Recuperare un agente dati

Il seguente codice campione mostra come recuperare un data agent esistente inviando una richiesta HTTP GET all'URL della risorsa data agent.

data_agent_id = "data_agent_1"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Fetched Data Agent successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error: {data_agent_response.status_code}")
    print(data_agent_response.text)

Nell'esempio precedente, sostituisci data_agent_1 con l'ID dell'agente dati che vuoi recuperare.

Elenco degli agenti dati

Il seguente codice mostra come elencare tutti gli agenti di dati per un determinato progetto inviando una richiesta GET HTTP all'endpoint dataAgents.

Per elencare tutti gli agenti, devi disporre dell'autorizzazione geminidataanalytics.dataAgents.list per il progetto. Per saperne di più sui ruoli IAM che includono questa autorizzazione, consulta l'elenco dei ruoli predefiniti.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"

data_agent_response = requests.get(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent Listed successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Listing Data Agent: {data_agent_response.status_code}")

Sostituisci YOUR-BILLING-PROJECT con l'ID del tuo progetto di fatturazione.

Aggiorna un agente dati

Il seguente codice campione mostra come aggiornare un agente dati inviando una richiesta HTTP PATCH all'URL della risorsa dell'agente dati. Il payload della richiesta include i nuovi valori per i campi che vuoi modificare e i parametri della richiesta includono un parametro updateMask, che specifica i campi da aggiornare.

data_agent_id = "data_agent_1"
billing_project = "YOUR-BILLING-PROJECT"
location = "global"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

payload = {
    "description": "Updated description of the data agent.",
    "data_analytics_agent": {
        "published_context": {
            "datasource_references": bigquery_data_sources,
            "system_instruction": system_instruction
        }
    },
}

fields = ["description", "data_analytics_agent"]
params = {
    "updateMask": ",".join(fields)
}

data_agent_response = requests.patch(
    data_agent_url, headers=headers, params=params, json=payload
)

if data_agent_response.status_code == 200:
    print("Data Agent updated successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Updating Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Sostituisci i valori di esempio come segue:

  • data_agent_1: l'ID dell'agente dati che vuoi aggiornare.
  • YOUR-BILLING-PROJECT: l'ID del tuo progetto di fatturazione.
  • Updated description of the data agent.: una nuova descrizione per l'agente dati.

Impostare il criterio IAM per un agente dati

Per condividere un agente, puoi utilizzare il metodo setIamPolicy per assegnare ruoli IAM agli utenti di un agente specifico. Il seguente codice campione mostra come effettuare una chiamata POST all'URL dell'agente dati con un payload che include i binding. Il binding specifica quali ruoli devono essere assegnati a quali utenti.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"
role = "roles/geminidataanalytics.dataAgentEditor"
users = "222larabrown@gmail.com, cloudysanfrancisco@gmail.com"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:setIamPolicy"

# Request body
payload = {
    "policy": {
        "bindings": [
            {
                "role": role,
                "members": [
                    f"user:{i.strip()}" for i in users.split(",")
                ]
            }
        ]
    }
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy set successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error setting IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

Sostituisci i valori di esempio come segue:

  • YOUR-BILLING-PROJECT: l'ID del tuo progetto di fatturazione.
  • data_agent_1: L'ID dell'agente dati per cui vuoi impostare la policy IAM.
  • 222larabrown@gmail.com, cloudysanfrancisco@gmail.com: un elenco separato da virgole di indirizzi email degli utenti a cui vuoi concedere il ruolo specificato.

Recupero del criterio IAM per un agente dati

Il seguente codice campione mostra come recuperare il criterio IAM per un agente dati inviando una richiesta HTTP POST all'URL dell'agente dati. Il payload della richiesta include il percorso dell'agente dati.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}:getIamPolicy"

# Request body
payload = {
    "resource": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
}

data_agent_response = requests.post(
    data_agent_url, headers=headers, json=payload
)

if data_agent_response.status_code == 200:
    print("IAM Policy fetched successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error fetching IAM policy: {data_agent_response.status_code}")
    print(data_agent_response.text)

Sostituisci i valori di esempio come segue:

  • YOUR-BILLING-PROJECT: l'ID del tuo progetto di fatturazione.
  • data_agent_1: L'ID dell'agente dati per cui vuoi ottenere il criterio IAM.

Eliminare un agente di dati

Il seguente codice campione mostra come eliminare temporaneamente un agente dati inviando una richiesta HTTP DELETE all'URL della risorsa dell'agente dati. L'eliminazione temporanea significa che l'agente viene eliminato, ma può comunque essere recuperato entro 30 giorni.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
data_agent_id = "data_agent_1"

data_agent_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"

data_agent_response = requests.delete(
    data_agent_url, headers=headers
)

if data_agent_response.status_code == 200:
    print("Data Agent deleted successfully!")
    print(json.dumps(data_agent_response.json(), indent=2))
else:
    print(f"Error Deleting Data Agent: {data_agent_response.status_code}")
    print(data_agent_response.text)

Sostituisci i valori di esempio come segue:

  • YOUR-BILLING-PROJECT: l'ID del tuo progetto di fatturazione.
  • data_agent_1: l'ID dell'agente dati che vuoi eliminare.

Recuperare una conversazione

Il seguente codice campione mostra come recuperare una conversazione esistente inviando una richiesta HTTP GET all'URL della risorsa conversazione.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations/{conversation_id}"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Sostituisci i valori di esempio come segue:

  • YOUR-BILLING-PROJECT: l'ID del tuo progetto di fatturazione.
  • conversation_1: l'ID della conversazione che vuoi recuperare.

Elenca conversazioni

Il seguente codice campione mostra come elencare le conversazioni per un determinato progetto inviando una richiesta GET HTTP all'endpoint conversations.

Per impostazione predefinita, questo metodo restituisce le conversazioni che hai creato. Gli amministratori (utenti con il ruolo IAM cloudaicompanion.topicAdmin) possono visualizzare tutte le conversazioni all'interno del progetto.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"
conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Sostituisci YOUR-BILLING-PROJECT con l'ID del progetto di fatturazione in cui hai abilitato le API richieste.

Elencare i messaggi in una conversazione

Il seguente codice campione mostra come elencare tutti i messaggi in una conversazione inviando una richiesta HTTP GET all'endpoint messages della conversazione.

Per elencare i messaggi, devi disporre dell'autorizzazione cloudaicompanion.topics.get sulla conversazione.

billing_project = "YOUR-BILLING-PROJECT"
location = "global"

conversation_id = "conversation_1"

conversation_url = f"{base_url}/v1alpha/projects/{billing_project}/locations/{location}/conversations/{conversation_id}/messages"

conversation_response = requests.get(conversation_url, headers=headers)

# Handle the response
if conversation_response.status_code == 200:
    print("Conversation fetched successfully!")
    print(json.dumps(conversation_response.json(), indent=2))
else:
    print(f"Error while fetching conversation: {conversation_response.status_code}")
    print(conversation_response.text)

Sostituisci i valori di esempio come segue:

  • YOUR-BILLING-PROJECT: l'ID del tuo progetto di fatturazione.
  • conversation_1: l'ID della conversazione per cui vuoi elencare i messaggi.

Utilizzare l'API per porre domande

Dopo aver creato un agente dati e una conversazione, puoi porre domande sui tuoi dati.

L'API Conversational Analytics supporta conversazioni a più turni, che consentono agli utenti di porre domande aggiuntive basate sul contesto precedente. L'API fornisce i seguenti metodi per gestire la cronologia delle conversazioni:

  • Chat con stato: Google Cloud memorizza e gestisce la cronologia della conversazione. La chat stateful è intrinsecamente a più turni, in quanto l'API conserva il contesto dei messaggi precedenti. Devi inviare solo il messaggio corrente per ogni turno.
  • Chat stateless: la tua applicazione gestisce la cronologia delle conversazioni. Devi includere i messaggi precedenti pertinenti in ogni nuovo messaggio. Per esempi dettagliati su come gestire le conversazioni multi-turn in modalità stateless, vedi Creare una conversazione multi-turn stateless.

Chat stateful

Invia una richiesta di chat stateful con un riferimento alla conversazione

Il seguente codice campione mostra come porre domande all'API utilizzando la conversazione che hai definito nei passaggi precedenti. Questo esempio utilizza una funzione helper get_stream per trasmettere in streaming la risposta.

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "conversation_reference": {
        "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

Sostituisci i valori di esempio come segue:

  • data_agent_1: l'ID dell'agente dati, come definito nel blocco di codice campione in Creare un agente dati.
  • conversation_1: un identificatore univoco per la conversazione.
  • Make a bar graph for the top 5 states by the total number of airports è stato utilizzato come prompt di esempio.

Chat stateless

Inviare una richiesta di chat stateless con un riferimento all'agente dati

Il seguente codice campione mostra come porre all'API una domanda senza stato utilizzando l'agente dati che hai definito nei passaggi precedenti. Questo esempio utilizza una funzione helper get_stream per trasmettere in streaming la risposta.

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

data_agent_id = "data_agent_1"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "data_agent_context": {
        "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
        # "credentials": looker_credentials
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

Sostituisci i valori di esempio come segue:

  • data_agent_1: l'ID dell'agente dati, come definito nel blocco di codice campione in Creare un agente dati.
  • Make a bar graph for the top 5 states by the total number of airports è stato utilizzato come prompt di esempio.

Inviare una richiesta di chat stateless con contesto incorporato

Il seguente codice campione mostra come porre all'API una domanda senza stato utilizzando il contesto incorporato. Questo esempio utilizza una funzione helper get_stream per trasmettere in streaming la risposta e utilizza un'origine dati BigQuery come esempio.

Se vuoi, puoi anche attivare l'analisi avanzata con Python includendo il parametro options nel payload della richiesta.

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

# Construct the payload
chat_payload = {
    "parent": f"projects/{billing_project}/locations/global",
    "messages": [
        {
            "userMessage": {
                "text": "Make a bar graph for the top 5 states by the total number of airports"
            }
        }
    ],
    "inline_context": {
        "datasource_references": bigquery_data_sources,
          # Optional: To enable advanced analysis with Python, include the following options block:
          "options": {
              "analysis": {
                  "python": {
                      "enabled": True
                  }
              }
          }
    }
}

# Call the get_stream function to stream the response
get_stream(chat_url, chat_payload)

Creare una conversazione multi-turno stateless

Per fare altre domande in una conversazione stateless, la tua applicazione deve gestire il contesto della conversazione inviando l'intera cronologia dei messaggi a ogni nuova richiesta. Le sezioni seguenti mostrano come definire e chiamare le funzioni di assistenza per creare una conversazione in più turni:

Inviare richieste multi-turno

La seguente funzione helper multi_turn_Conversation gestisce il contesto della conversazione memorizzando i messaggi in un elenco. In questo modo puoi inviare domande aggiuntive che si basano sui turni precedenti. Nel payload della funzione, puoi fare riferimento a un agente dati o fornire l'origine dati direttamente utilizzando il contesto incorporato.

chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Helper function for calling the API
def multi_turn_Conversation(msg):

  userMessage = {
      "userMessage": {
          "text": msg
      }
  }

  # Send a multi-turn request by including previous turns and the new message
  conversation_messages.append(userMessage)

  # Construct the payload
  chat_payload = {
      "parent": f"projects/{billing_project}/locations/global",
      "messages": conversation_messages,
      # Use a data agent reference
      "data_agent_context": {
          "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
          # "credentials": looker_credentials
      },
      # Use inline context
      # "inline_context": {
      #     "datasource_references": bigquery_data_sources,
      # }
  }

  # Call the get_stream_multi_turn helper function to stream the response
  get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

Nell'esempio precedente, sostituisci data_agent_1 con l'ID dell'agente dati, come definito nel blocco di codice campione in Crea un agente dati.

Puoi chiamare la funzione di assistenza multi_turn_Conversation per ogni turno della conversazione. Il seguente codice campione mostra come inviare una richiesta iniziale e poi una richiesta di follow-up che si basa sulla risposta precedente.

# Send first-turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up-turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

Nell'esempio precedente, sostituisci i valori di esempio come segue:

  • Which species of tree is most prevalent?: Una domanda in linguaggio naturale da inviare all'agente per i dati.
  • Can you show me the results as a bar chart?: Una domanda aggiuntiva che si basa sulla domanda precedente o la perfeziona.

Elaborare le risposte

La seguente funzione get_stream_multi_turn elabora la risposta dell'API di streaming. Questa funzione è simile alla funzione helper get_stream, ma memorizza la risposta nell'elenco conversation_messages per salvare il contesto della conversazione per il turno successivo.

def get_stream_multi_turn(url, json, conversation_messages):
    s = requests.Session()

    acc = ''

    with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if not line:
                continue

            decoded_line = str(line, encoding='utf-8')

            if decoded_line == '[{':
                acc = '{'
            elif decoded_line == '}]':
                acc += '}'
            elif decoded_line == ',':
                continue
            else:
                acc += decoded_line

            if not is_json(acc):
                continue

            data_json = json_lib.loads(acc)
            # Store the response that will be used in the next iteration
            conversation_messages.append(data_json)

            if not 'systemMessage' in data_json:
                if 'error' in data_json:
                    handle_error(data_json['error'])
                continue

            if 'text' in data_json['systemMessage']:
                handle_text_response(data_json['systemMessage']['text'])
            elif 'schema' in data_json['systemMessage']:
                handle_schema_response(data_json['systemMessage']['schema'])
            elif 'data' in data_json['systemMessage']:
                handle_data_response(data_json['systemMessage']['data'])
            elif 'chart' in data_json['systemMessage']:
                handle_chart_response(data_json['systemMessage']['chart'])
            else:
                colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                print(colored_json)
            print('\n')
            acc = ''

Esempio di codice end-to-end

Il seguente esempio di codice espandibile contiene tutte le attività trattate in questa guida.

Creare un agente dati utilizzando HTTP e Python

    from pygments import highlight, lexers, formatters
    import pandas as pd
    import json as json_lib
    import requests
    import json
    import altair as alt
    import IPython
    from IPython.display import display, HTML
    import requests
    import google.auth
    from google.auth.transport.requests import Request

    from google.colab import auth
    auth.authenticate_user()

    access_token = !gcloud auth application-default print-access-token
    headers = {
        "Authorization": f"Bearer {access_token[0]}",
        "Content-Type": "application/json",
    }

    ################### Data source details ###################

    billing_project = "your_billing_project"
    location = "global"
    system_instruction = "Help the user in analyzing their data"


    # BigQuery data source
    bigquery_data_sources = {
        "bq": {
        "tableReferences": [
            {
            "projectId": "bigquery-public-data",
            "datasetId": "san_francisco",
            "tableId": "street_trees"
            }
        ]
        }
    }

    # Looker data source
    looker_credentials = {
        "oauth": {
            "secret": {
            "client_id": "your_looker_client_id",
            "client_secret": "your_looker_client_secret",
            }
        }
    }
    # # To use access_token for authentication, uncomment the following looker_credentials code block and comment out the previous looker_credentials code block.
    # looker_credentials = {
    #     "oauth": {
    #         "token": {
    #           "access_token": "your_looker_access_token",
    #         }
    #     }
    # }
    looker_data_source = {
        "looker": {
        "explore_references": {
            "looker_instance_uri": "https://my_company.looker.com",
            "lookml_model": "my_model",
            "explore": "my_explore",
        },
        # "credentials": looker_credentials
    }

    # Looker Studio data source
    looker_studio_data_source = {
        "studio":{
            "studio_references":
            [
                {
                "datasource_id": "your_studio_datasource_id"
                }
            ]
        }
    }

    ################### Create data agent ###################
    data_agent_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/dataAgents"

    data_agent_id = "data_agent_1"

    data_agent_payload = {
        "name": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}", # Optional
        "description": "This is the description of data_agent.", # Optional

        "data_analytics_agent": {
            "published_context": {
                "datasource_references": bigquery_data_sources,
                "system_instruction": system_instruction,
                # Optional: To enable advanced analysis with Python, include the following options block:
                "options": {
                    "analysis": {
                        "python": {
                            "enabled": True
                        }
                    }
                }
            }
        }
    }

    params = {"data_agent_id": data_agent_id} # Optional

    data_agent_response = requests.post(
        data_agent_url, params=params, json=data_agent_payload, headers=headers
    )

    if data_agent_response.status_code == 200:
        print("Data Agent created successfully!")
        print(json.dumps(data_agent_response.json(), indent=2))
    else:
        print(f"Error creating Data Agent: {data_agent_response.status_code}")
        print(data_agent_response.text)


    ################### Create conversation ###################

    conversation_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}/conversations"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    conversation_payload = {
        "agents": [
            f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}"
        ],
        "name": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}"
    }
    params = {
        "conversation_id": conversation_id
    }

    conversation_response = requests.post(conversation_url, headers=headers, params=params, json=conversation_payload)

    if conversation_response.status_code == 200:
        print("Conversation created successfully!")
        print(json.dumps(conversation_response.json(), indent=2))
    else:
        print(f"Error creating Conversation: {conversation_response.status_code}")
        print(conversation_response.text)


    ################### Chat with the API by using conversation (stateful) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"
    conversation_id = "conversation _1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "conversation_reference": {
            "conversation": f"projects/{billing_project}/locations/{location}/conversations/{conversation_id}",
            "data_agent_context": {
                "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
                # "credentials": looker_credentials
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using dataAgents (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/{location}:chat"

    data_agent_id = "data_agent_1"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "data_agent_context": {
            "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
            # "credentials": looker_credentials
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Chat with the API by using inline context (stateless) ####################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

    # Construct the payload
    chat_payload = {
        "parent": f"projects/{billing_project}/locations/global",
        "messages": [
            {
                "userMessage": {
                    "text": "Make a bar graph for the top 5 states by the total number of airports"
                }
            }
        ],
        "inline_context": {
            "datasource_references": bigquery_data_sources,
            # Optional - if wanting to use advanced analysis with python
            "options": {
                "analysis": {
                    "python": {
                        "enabled": True
                    }
                }
            }
        }
    }

    # Call the get_stream function to stream the response
    get_stream(chat_url, chat_payload)

    ################### Multi-turn conversation ###################

    chat_url = f"https://geminidataanalytics.googleapis.com/v1alpha/projects/{billing_project}/locations/global:chat"

    # List that is used to track previous turns and is reused across requests
    conversation_messages = []

    data_agent_id = "data_agent_1"

    # Helper function for calling the API
    def multi_turn_Conversation(msg):

      userMessage = {
          "userMessage": {
              "text": msg
          }
      }

      # Send a multi-turn request by including previous turns and the new message
      conversation_messages.append(userMessage)

      # Construct the payload
      chat_payload = {
          "parent": f"projects/{billing_project}/locations/global",
          "messages": conversation_messages,
          # Use a data agent reference
          "data_agent_context": {
              "data_agent": f"projects/{billing_project}/locations/{location}/dataAgents/{data_agent_id}",
              # "credentials": looker_credentials
          },
          # Use inline context
          # "inline_context": {
          #     "datasource_references": bigquery_data_sources,
          # }
      }

      # Call the get_stream_multi_turn helper function to stream the response
      get_stream_multi_turn(chat_url, chat_payload, conversation_messages)

    # Send first-turn request
    multi_turn_Conversation("Which species of tree is most prevalent?")

    # Send follow-up-turn request
    multi_turn_Conversation("Can you show me the results as a bar chart?")
    

Il seguente esempio di codice espandibile contiene le funzioni helper Python utilizzate per lo streaming delle risposte della chat.

Funzioni helper Python per lo streaming delle risposte della chat

    def is_json(str):
      try:
          json_object = json_lib.loads(str)
      except ValueError as e:
          return False
      return True

    def handle_text_response(resp):
      parts = resp['parts']
      print(''.join(parts))

    def get_property(data, field_name, default = ''):
      return data[field_name] if field_name in data else default

    def display_schema(data):
      fields = data['fields']
      df = pd.DataFrame({
        "Column": map(lambda field: get_property(field, 'name'), fields),
        "Type": map(lambda field: get_property(field, 'type'), fields),
        "Description": map(lambda field: get_property(field, 'description', '-'), fields),
        "Mode": map(lambda field: get_property(field, 'mode'), fields)
      })
      display(df)

    def display_section_title(text):
      display(HTML('<h2>{}</h2>'.format(text)))

    def format_bq_table_ref(table_ref):
      return '{}.{}.{}'.format(table_ref['projectId'], table_ref['datasetId'], table_ref['tableId'])

    def format_looker_table_ref(table_ref):
      return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref['lookmlModel'], table_ref['explore'], table_ref['lookerInstanceUri'])

    def display_datasource(datasource):
      source_name = ''

      if 'studioDatasourceId' in datasource:
        source_name = datasource['studioDatasourceId']
      elif 'lookerExploreReference' in datasource:
        source_name = format_looker_table_ref(datasource['lookerExploreReference'])
      else:
        source_name = format_bq_table_ref(datasource['bigqueryTableReference'])

      print(source_name)
      display_schema(datasource['schema'])

    def handle_schema_response(resp):
      if 'query' in resp:
        print(resp['query']['question'])
      elif 'result' in resp:
        display_section_title('Schema resolved')
        print('Data sources:')
        for datasource in resp['result']['datasources']:
          display_datasource(datasource)

    def handle_data_response(resp):
      if 'query' in resp:
        query = resp['query']
        display_section_title('Retrieval query')
        print('Query name: {}'.format(query['name']))
        print('Question: {}'.format(query['question']))
        print('Data sources:')
        for datasource in query['datasources']:
          display_datasource(datasource)
      elif 'generatedSql' in resp:
        display_section_title('SQL generated')
        print(resp['generatedSql'])
      elif 'result' in resp:
        display_section_title('Data retrieved')

        fields = map(lambda field: get_property(field, 'name'), resp['result']['schema']['fields'])
        dict = {}

        for field in fields:
          dict[field] = map(lambda el: get_property(el, field), resp['result']['data'])

        display(pd.DataFrame(dict))

    def handle_chart_response(resp):
      if 'query' in resp:
        print(resp['query']['instructions'])
      elif 'result' in resp:
        vegaConfig = resp['result']['vegaConfig']
        alt.Chart.from_json(json_lib.dumps(vegaConfig)).display();

    def handle_error(resp):
      display_section_title('Error')
      print('Code: {}'.format(resp['code']))
      print('Message: {}'.format(resp['message']))

    def get_stream(url, json):
      s = requests.Session()

      acc = ''

      with s.post(url, json=json, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
          if not line:
            continue

          decoded_line = str(line, encoding='utf-8')

          if decoded_line == '[{':
            acc = '{'
          elif decoded_line == '}]':
            acc += '}'
          elif decoded_line == ',':
            continue
          else:
            acc += decoded_line

          if not is_json(acc):
            continue

          data_json = json_lib.loads(acc)

          if not 'systemMessage' in data_json:
            if 'error' in data_json:
                handle_error(data_json['error'])
            continue

          if 'text' in data_json['systemMessage']:
            handle_text_response(data_json['systemMessage']['text'])
          elif 'schema' in data_json['systemMessage']:
            handle_schema_response(data_json['systemMessage']['schema'])
          elif 'data' in data_json['systemMessage']:
            handle_data_response(data_json['systemMessage']['data'])
          elif 'chart' in data_json['systemMessage']:
            handle_chart_response(data_json['systemMessage']['chart'])
          else:
            colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
            print(colored_json)
            print('\n')
            acc = ''

    def get_stream_multi_turn(url, json, conversation_messages):
        s = requests.Session()

        acc = ''

        with s.post(url, json=json, headers=headers, stream=True) as resp:
            for line in resp.iter_lines():
                if not line:
                    continue

                decoded_line = str(line, encoding='utf-8')

                if decoded_line == '[{':
                    acc = '{'
                elif decoded_line == '}]':
                    acc += '}'
                elif decoded_line == ',':
                    continue
                else:
                    acc += decoded_line

                if not is_json(acc):
                    continue

                data_json = json_lib.loads(acc)
                # Store the response that will be used in the next iteration
                conversation_messages.append(data_json)

                if not 'systemMessage' in data_json:
                    if 'error' in data_json:
                        handle_error(data_json['error'])
                    continue

                if 'text' in data_json['systemMessage']:
                    handle_text_response(data_json['systemMessage']['text'])
                elif 'schema' in data_json['systemMessage']:
                    handle_schema_response(data_json['systemMessage']['schema'])
                elif 'data' in data_json['systemMessage']:
                    handle_data_response(data_json['systemMessage']['data'])
                elif 'chart' in data_json['systemMessage']:
                    handle_chart_response(data_json['systemMessage']['chart'])
                else:
                    colored_json = highlight(acc, lexers.JsonLexer(), formatters.TerminalFormatter())
                    print(colored_json)
                print('\n')
                acc = ''