Python SDK を使用してデータ エージェントを構築する

このページでは、Python SDK を使用して Conversational Analytics API にリクエストを送信する方法について説明します。Python コードサンプルは、次のタスクを行う方法を示しています。

環境の認証と設定を行う

会話分析 API 用の Python SDK を使用するには、会話分析 API SDK Colaboratory ノートブックの手順に沿って SDK をダウンロードしてインストールします。なお、ダウンロード方法と SDK Colab の内容は変更される可能性があります。

ノートブックの設定手順を完了したら、次のコードを使用して、必要な SDK ライブラリのインポート、Colaboratory 環境内での Google アカウントの認証を行い、API リクエストを行うクライアントを初期化できます。

from google.colab import auth
auth.authenticate_user()

from google.cloud import geminidataanalytics

data_agent_client = geminidataanalytics.DataAgentServiceClient()
data_chat_client = geminidataanalytics.DataChatServiceClient()

課金プロジェクトとシステム指示を指定する

次の Python サンプルコードは、スクリプト全体で使用される課金プロジェクトとシステム指示を定義します。

# Billing project
billing_project = "my_project_name"

# System instructions
system_instruction = "Help the user analyze their data."

サンプル値を次のように置き換えます。

  • my_project_name: 必要な API が有効になっている課金プロジェクトの ID。
  • Help the user analyze their data.: エージェントの動作をガイドし、データのニーズに合わせてカスタマイズするためのシステム指示。たとえば、システム指示を使用してビジネス用語を定義したり、回答の長さを制御したり、データの形式を設定できます。理想的には、効果的なシステム指示を記述するで推奨されている YAML 形式でシステム指示を定義し、構造化された詳細なガイダンスを提供します。

データソースに接続する

次の Python コードサンプルは、エージェントが質問に回答するためにクエリする LookerBigQueryLooker Studio データソースの接続情報を定義する方法を示しています。

Looker データに接続する

次のコード例は、API キーまたはアクセス トークンを使用して Looker Explore への接続の詳細を定義する方法を示しています。

API キー

会話分析 API を使用してデータソースの認証と接続を行うで説明されているように、生成された Looker API キーを使用して Looker インスタンスとの接続を確立できます。

looker_client_id = "my_looker_client_id"
looker_client_secret = "my_looker_client_secret"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = lookml_model
looker_explore_reference.explore = explore

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference]

サンプル値を次のように置き換えます。

  • my_looker_client_id: 生成された Looker API キーのクライアント ID。
  • my_looker_client_secret: 生成された Looker API キーのクライアント シークレット。
  • https://my_company.looker.com: Looker インスタンスの完全な URL。
  • my_model: 接続する Explore を含む LookML モデルの名前。
  • my_explore: データ エージェントがクエリを実行する Looker Explore の名前。

アクセス トークン

会話分析 API を使用してデータソースの認証と接続を行うで説明されているように、アクセス トークンを使用して Looker インスタンスとの接続を確立できます。

looker_access_token = "my_access_token"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = lookml_model
looker_explore_reference.explore = explore

credentials = geminidataanalytics.Credentials()
credentials.oauth.token.access_token = looker_access_token

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference]

サンプル値を次のように置き換えます。

  • my_access_token: Looker の認証用に生成する access_token 値。
  • https://my_company.looker.com: Looker インスタンスの完全な URL。
  • my_model: 接続する Explore を含む LookML モデルの名前。
  • my_explore: データ エージェントがクエリを実行する Looker Explore の名前。

BigQuery データに接続する

会話分析 API を使用すると、一度に最大 10 個の BigQuery テーブルに接続してクエリを実行できます。

次のサンプルコードは、単一の BigQuery テーブルへの接続を定義します。

bq_project_id = "my_project_id"
bq_dataset_id = "my_dataset_id"
bq_table_id = "my_table_id"

bigquery_table_reference = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference.project_id = bq_project_id
bigquery_table_reference.dataset_id = bq_dataset_id
bigquery_table_reference.table_id = bq_table_id

# Connect to your data source
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.bq.table_references = [bigquery_table_reference]

サンプル値を次のように置き換えます。

  • my_project_id: 接続する BigQuery データセットとテーブルを含む Google Cloud プロジェクトの ID。公開データセットに接続するには、bigquery-public-data を指定します。
  • my_dataset_id: BigQuery データセットの ID。例: san_francisco
  • my_table_id: BigQuery テーブルの ID。例: street_trees

Looker Studio データに接続する

次のサンプルコードは、Looker Studio データソースへの接続を定義します。

studio_datasource_id = "my_datasource_id"

studio_references = geminidataanalytics.StudioDatasourceReference()
studio_references.datasource_id = studio_datasource_id

## Connect to your data source
datasource_references.studio.studio_references = [studio_references]

前の例では、my_datasource_id をデータソース ID に置き換えます。

ステートフルまたはステートレス チャットのコンテキストを設定する

会話分析 API はマルチターンの会話をサポートしています。これにより、ユーザーは以前のコンテキストに基づいてフォローアップの質問を実施できます。次の Python サンプルコードは、ステートフル チャットまたはステートレス チャットのコンテキストを設定する方法を示しています。

  • ステートフル リクエスト: Google Cloud が会話履歴を保存して管理します。ステートフル チャットは、API が以前のメッセージのコンテキストを保持するため、本質的にマルチターンです。各ターンで現在のメッセージのみを送信します。
  • ステートレス チャット: アプリケーションが会話履歴を管理します。新しいメッセージには、会話履歴全体を含める必要があります。ステートレス モードでマルチターン会話を管理する方法の詳細な例については、ステートレス マルチターン会話を作成するをご覧ください。

ステートフル チャット

次のコードサンプルは、ステートフル チャットのコンテキストを設定します。ここで、 Google Cloud は会話履歴を保存して管理します。次のサンプルコードに published_context.options.analysis.python.enabled = True 行を含めることで、Python を使用した高度な分析を有効にすることもできます。

# Set up context for stateful chat
published_context = geminidataanalytics.Context()
published_context.system_instruction = system_instruction
published_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
published_context.options.analysis.python.enabled = True

ステートレス チャット

次のサンプルコードは、ステートレス チャットのコンテキストを設定します。ステートレス チャットでは、各メッセージで会話履歴全体を送信する必要があります。次のサンプルコードに inline_context.options.analysis.python.enabled = True 行を含めることで、Python を使用した高度な分析を有効にすることもできます。

# Set up context for stateless chat
# datasource_references.looker.credentials = credentials
inline_context = geminidataanalytics.Context()
inline_context.system_instruction = system_instruction
inline_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
inline_context.options.analysis.python.enabled = True

データ エージェントを作成する

次の Python コードのサンプルは、API リクエストを実行してデータ エージェントを作成します。このデータ エージェントを使用して、データに関する会話を行うことができます。データ エージェントは、指定されたデータソース、システム指示、コンテキストで構成されます。

data_agent_id = "data_agent_1"

data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}" # Optional

request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/global",
    data_agent_id=data_agent_id, # Optional
    data_agent=data_agent,
)

try:
    data_agent_client.create_data_agent(request=request)
    print("Data Agent created")
except Exception as e:
    print(f"Error creating Data Agent: {e}")

前の例では、値 data_agent_1 をデータ エージェントの固有識別子に置き換えます。

データ エージェントを取得する

次の Python コードサンプルは、以前に作成したデータ エージェントを取得する API リクエストを行う方法を示しています。

# Initialize request arguments
data_agent_id = "data_agent_1"
request = geminidataanalytics.GetDataAgentRequest(
    name=f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
)

# Make the request
response = data_agent_client.get_data_agent(request=request)

# Handle the response
print(response)

前の例で、値 data_agent_1 は、取得するデータ エージェントの固有識別子に置き換えます。

会話を作成する

次の Python コードサンプルは、API リクエストを実行して会話を作成します。

# Initialize request arguments
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation = geminidataanalytics.Conversation()
conversation.agents = [f'projects/{billing_project}/locations/global/dataAgents/{data_agent_id}']
conversation.name = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"

request = geminidataanalytics.CreateConversationRequest(
    parent=f"projects/{billing_project}/locations/global",
    conversation_id=conversation_id,
    conversation=conversation,
)

# Make the request
response = data_chat_client.create_conversation(request=request)

# Handle the response
print(response)

サンプル値を次のように置き換えます。

API を使用して質問する

データ エージェント会話を作成したら、次の Python コードのサンプルを使用してエージェントにクエリを送信します。このコードでは、ステートフル チャットまたはステートレス チャット用に設定したコンテキストを使用します。API は、エージェントがクエリに回答するために行う手順を表すメッセージ ストリームを返します。

ステートフル チャット

Conversation 参照を含むステートフル チャット リクエストを送信する

以前に作成した Conversation リソースを参照することで、ステートフル チャット リクエストをデータ エージェントに送信できます。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Create a conversation_reference
conversation_reference = geminidataanalytics.ConversationReference()
conversation_reference.conversation = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"
conversation_reference.data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# conversation_reference.data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent = f"projects/{billing_project}/locations/global",
    messages = messages,
    conversation_reference = conversation_reference
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

サンプル値を次のように置き換えます。

  • Which species of tree is most prevalent?: データ エージェントに送信する自然言語の質問。
  • data_agent_1: データ エージェントを作成するで定義されている、データ エージェントの固有識別子。
  • conversation_1: 会話を作成するで定義されている、会話の固有識別子。

ステートレス チャット

次のコードサンプルは、ステートレス チャットのコンテキストを設定したときにデータ エージェントにクエリを送信する方法を示しています。ステートレス クエリを送信するには、以前に定義した DataAgent リソースを参照するか、リクエストでインライン コンテキストを使用します。

DataAgent 参照を使用してステートレス チャット リクエストを送信する

以前に作成した DataAgent リソースを参照して、データ エージェントにクエリを送信できます。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"

data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
    data_agent_context = data_agent_context
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

サンプル値を次のように置き換えます。

  • Which species of tree is most prevalent?: データ エージェントに送信する自然言語の質問。
  • data_agent_1: データ エージェントを作成するで定義されている、データ エージェントの固有識別子。

インライン コンテキストを含むステートレス チャット リクエストを送信する

次のサンプルコードは、inline_context パラメータを使用して、ステートレス チャット リクエスト内でコンテキストを直接提供する方法を示しています。

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

request = geminidataanalytics.ChatRequest(
    inline_context=inline_context,
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

前の例では、Which species of tree is most prevalent? をデータ エージェントに送信する自然言語の質問に置き換えます。

ステートレス マルチターンの会話を作成する

ステートレスな会話でフォローアップの質問をするには、アプリケーションで新しいリクエストごとにメッセージ履歴全体を送信して、会話のコンテキストを管理する必要があります。次の例は、データ エージェントを参照するか、インライン コンテキストを使用してデータソースを直接指定して、マルチターン会話を作成する方法を示しています。

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Use data agent context
data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Helper function for calling the API
def multi_turn_Conversation(msg):

    message = geminidataanalytics.Message()
    message.user_message.text = msg

    # Send a multi-turn request by including previous turns and the new message
    conversation_messages.append(message)

    request = geminidataanalytics.ChatRequest(
        parent=f"projects/{billing_project}/locations/global",
        messages=conversation_messages,
        # Use data agent context
        data_agent_context=data_agent_context,
        # Use inline context
        # inline_context=inline_context,
    )

    # Make the request
    stream = data_chat_client.chat(request=request)

    # Handle the response
    for response in stream:
      show_message(response)
      conversation_messages.append(response)

# Send the first turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

前の例では、サンプル値を次のように置き換えます。

  • data_agent_1: データ エージェントを作成するのサンプルコード ブロックで定義されている、データ エージェントの固有識別子。
  • Which species of tree is most prevalent?: データ エージェントに送信する自然言語の質問。
  • Can you show me the results as a bar chart?: 前の質問を基にする、または前の質問を絞り込むフォローアップの質問。

ヘルパー関数を定義する

次のサンプルコードには、前のコードサンプルで使用されているヘルパー関数の定義が含まれています。これらの関数は、API からのレスポンスを解析して結果を表示するのに役立ちます。

from pygments import highlight, lexers, formatters
import pandas as pd
import requests
import json as json_lib
import altair as alt
import IPython
from IPython.display import display, HTML

import proto
from google.protobuf.json_format import MessageToDict, MessageToJson

def handle_text_response(resp):
  parts = getattr(resp, 'parts')
  print(''.join(parts))

def display_schema(data):
  fields = getattr(data, 'fields')
  df = pd.DataFrame({
    "Column": map(lambda field: getattr(field, 'name'), fields),
    "Type": map(lambda field: getattr(field, 'type'), fields),
    "Description": map(lambda field: getattr(field, 'description', '-'), fields),
    "Mode": map(lambda field: getattr(field, 'mode'), fields)
  })
  display(df)

def display_section_title(text):
  display(HTML('<h2>{}</h2>'.format(text)))

def format_looker_table_ref(table_ref):
 return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref.lookml_model, table_ref.explore, table_ref.looker_instance_uri)

def format_bq_table_ref(table_ref):
  return '{}.{}.{}'.format(table_ref.project_id, table_ref.dataset_id, table_ref.table_id)

def display_datasource(datasource):
  source_name = ''
  if 'studio_datasource_id' in datasource:
   source_name = getattr(datasource, 'studio_datasource_id')
  elif 'looker_explore_reference' in datasource:
   source_name = format_looker_table_ref(getattr(datasource, 'looker_explore_reference'))
  else:
    source_name = format_bq_table_ref(getattr(datasource, 'bigquery_table_reference'))

  print(source_name)
  display_schema(datasource.schema)

def handle_schema_response(resp):
  if 'query' in resp:
    print(resp.query.question)
  elif 'result' in resp:
    display_section_title('Schema resolved')
    print('Data sources:')
    for datasource in resp.result.datasources:
      display_datasource(datasource)

def handle_data_response(resp):
  if 'query' in resp:
    query = resp.query
    display_section_title('Retrieval query')
    print('Query name: {}'.format(query.name))
    print('Question: {}'.format(query.question))
    print('Data sources:')
    for datasource in query.datasources:
      display_datasource(datasource)
  elif 'generated_sql' in resp:
    display_section_title('SQL generated')
    print(resp.generated_sql)
  elif 'result' in resp:
    display_section_title('Data retrieved')

    fields = [field.name for field in resp.result.schema.fields]
    d = {}
    for el in resp.result.data:
      for field in fields:
        if field in d:
          d[field].append(el[field])
        else:
          d[field] = [el[field]]

    display(pd.DataFrame(d))

def handle_chart_response(resp):
  def _value_to_dict(v):
    if isinstance(v, proto.marshal.collections.maps.MapComposite):
      return _map_to_dict(v)
    elif isinstance(v, proto.marshal.collections.RepeatedComposite):
      return [_value_to_dict(el) for el in v]
    elif isinstance(v, (int, float, str, bool)):
      return v
    else:
      return MessageToDict(v)

  def _map_to_dict(d):
    out = {}
    for k in d:
      if isinstance(d[k], proto.marshal.collections.maps.MapComposite):
        out[k] = _map_to_dict(d[k])
      else:
        out[k] = _value_to_dict(d[k])
    return out

  if 'query' in resp:
    print(resp.query.instructions)
  elif 'result' in resp:
    vegaConfig = resp.result.vega_config
    vegaConfig_dict = _map_to_dict(vegaConfig)
    alt.Chart.from_json(json_lib.dumps(vegaConfig_dict)).display();

def show_message(msg):
  m = msg.system_message
  if 'text' in m:
    handle_text_response(getattr(m, 'text'))
  elif 'schema' in m:
    handle_schema_response(getattr(m, 'schema'))
  elif 'data' in m:
    handle_data_response(getattr(m, 'data'))
  elif 'chart' in m:
    handle_chart_response(getattr(m, 'chart'))
  print('\n')