Python SDK를 사용하여 데이터 에이전트 빌드

이 페이지에서는 Python SDK를 사용하여 Conversational Analytics API에 요청하는 방법을 보여줍니다. 샘플 Python 코드는 다음 작업을 완료하는 방법을 보여줍니다.

환경 인증 및 설정

Conversational Analytics API에 Python SDK를 사용하려면 Conversational Analytics API SDK Colaboratory 노트북의 안내에 따라 SDK를 다운로드하고 설치하세요. 다운로드 방법과 SDK Colab의 콘텐츠는 변경될 수 있습니다.

노트북의 설정 안내를 완료한 후 다음 코드를 사용하여 필요한 SDK 라이브러리를 가져오고, Colaboratory 환경 내에서 Google 계정을 인증하고, API 요청을 실행하기 위해 클라이언트를 초기화할 수 있습니다.

from google.colab import auth
auth.authenticate_user()

from google.cloud import geminidataanalytics

data_agent_client = geminidataanalytics.DataAgentServiceClient()
data_chat_client = geminidataanalytics.DataChatServiceClient()

결제 프로젝트 및 시스템 안내 지정

다음 샘플 Python 코드는 스크립트 전체에서 사용되는 결제 프로젝트와 시스템 안내를 정의합니다.

# Billing project
billing_project = "my_project_name"

# System description
system_description = "Help the user analyze their data."

샘플 값을 다음과 같이 바꿉니다.

  • my_project_name: 필요한 API를 사용 설정한 결제 프로젝트의 ID입니다.
  • Help the user analyze their data.: 에이전트의 동작을 안내하고 필요에 따라 에이전트를 맞춤설정하는 시스템 안내입니다. 예를 들어 시스템 안내를 사용하여 비즈니스 용어(예: '충성도 높은 고객'의 정의)를 정의하거나, 응답 길이('20단어 미만으로 요약')를 제어하거나, 데이터 형식을 설정('회사 표준에 부합')할 수 있습니다.

데이터 소스에 연결

다음 Python 코드 샘플은 에이전트가 질문에 답변하기 위해 쿼리할 Looker, BigQuery 또는 Looker Studio 데이터 소스의 연결 세부정보를 정의하는 방법을 보여줍니다.

Looker 데이터에 연결

다음 코드 예시에서는 API 키 또는 액세스 토큰을 사용하여 Looker Explore에 대한 연결의 세부정보를 정의하는 방법을 보여줍니다.

API 키

Conversational Analytics API로 데이터 소스를 인증하고 연결에 설명된 대로 생성된 Looker API 키를 사용하여 Looker 인스턴스와의 연결을 설정할 수 있습니다.

looker_client_id = "my_looker_client_id"
looker_client_secret = "my_looker_client_secret"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = lookml_model
looker_explore_reference.explore = explore

credentials = geminidataanalytics.Credentials()
credentials.oauth.secret.client_id = looker_client_id
credentials.oauth.secret.client_secret = looker_client_secret

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference]

샘플 값을 다음과 같이 바꿉니다.

  • my_looker_client_id: 생성된 Looker API 키의 클라이언트 ID입니다.
  • my_looker_client_secret: 생성된 Looker API 키의 클라이언트 보안 비밀번호입니다.
  • https://my_company.looker.com: Looker 인스턴스의 전체 URL입니다.
  • my_model: 연결할 Explore가 포함된 LookML 모델의 이름입니다.
  • my_explore: 데이터 에이전트가 쿼리할 Looker Explore의 이름입니다.

액세스 토큰

Conversational Analytics API로 데이터 소스를 인증하고 연결에 설명된 대로 액세스 토큰을 사용하여 Looker 인스턴스와의 연결을 설정할 수 있습니다.

looker_access_token = "my_access_token"
looker_instance_uri = "https://my_company.looker.com"
lookml_model = "my_model"
explore = "my_explore"

looker_explore_reference = geminidataanalytics.LookerExploreReference()
looker_explore_reference.looker_instance_uri = looker_instance_uri
looker_explore_reference.lookml_model = lookml_model
looker_explore_reference.explore = explore

credentials = geminidataanalytics.Credentials()
credentials.oauth.token.access_token = looker_access_token

datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.looker.explore_references = [looker_explore_reference]

샘플 값을 다음과 같이 바꿉니다.

  • my_access_token: Looker에 인증하기 위해 생성하는 access_token 값입니다.
  • https://my_company.looker.com: Looker 인스턴스의 전체 URL입니다.
  • my_model: 연결할 Explore가 포함된 LookML 모델의 이름입니다.
  • my_explore: 데이터 에이전트가 쿼리할 Looker Explore의 이름입니다.

BigQuery 데이터에 연결

Conversational Analytics API를 사용하면 한 번에 최대 10개의 BigQuery 테이블에 연결하고 쿼리할 수 있습니다.

다음 샘플 코드는 단일 BigQuery 테이블에 대한 연결을 정의합니다.

bq_project_id = "my_project_id"
bq_dataset_id = "my_dataset_id"
bq_table_id = "my_table_id"

bigquery_table_reference = geminidataanalytics.BigQueryTableReference()
bigquery_table_reference.project_id = bq_project_id
bigquery_table_reference.dataset_id = bq_dataset_id
bigquery_table_reference.table_id = bq_table_id

# Connect to your data source
datasource_references = geminidataanalytics.DatasourceReferences()
datasource_references.bq.table_references = [bigquery_table_reference]

샘플 값을 다음과 같이 바꿉니다.

  • my_project_id: 연결하려는 BigQuery 데이터 세트 및 테이블이 포함된 Google Cloud 프로젝트의 ID입니다. 공개 데이터 세트에 연결하려면 bigquery-public-data를 지정합니다.
  • my_dataset_id: BigQuery 데이터 세트의 ID입니다. 예를 들면 san_francisco입니다.
  • my_table_id: BigQuery 테이블의 ID입니다. 예를 들면 street_trees입니다.

Looker Studio 데이터에 연결

다음 샘플 코드는 Looker Studio 데이터 소스에 대한 연결을 정의합니다.

studio_datasource_id = "my_datasource_id"

studio_references = geminidataanalytics.StudioDatasourceReference()
studio_references.datasource_id = studio_datasource_id

## Connect to your data source
datasource_references.studio.studio_references = [studio_references]

이전 예시에서 my_datasource_id를 데이터 소스 ID로 바꿉니다.

스테이트풀(Stateful) 또는 스테이트리스(Stateless) 채팅의 컨텍스트 설정하기

Conversational Analytics API는 사용자가 이전 컨텍스트를 기반으로 후속 질문을 할 수 있는 멀티턴 대화를 지원합니다. 다음 샘플 Python 코드는 스테이트풀(Stateful) 또는 스테이트리스(Stateless) 채팅의 컨텍스트를 설정하는 방법을 보여줍니다.

  • 스테이트풀(Stateful) 채팅: Google Cloud 에서 대화 기록을 저장하고 관리합니다. 스테이트풀(Stateful) 채팅은 API가 이전 메시지의 컨텍스트를 유지하므로 본질적으로 멀티턴입니다. 각 대화 차례에 현재 메시지만 전송하면 됩니다.
  • 스테이트리스(Stateless) 채팅: 애플리케이션에서 대화 기록을 관리합니다. 새 메시지마다 전체 대화 기록을 포함해야 합니다. 스테이트리스(Stateless) 모드에서 멀티턴 대화를 관리하는 방법에 대한 자세한 예는 스테이트리스(Stateless) 멀티턴 대화 만들기를 참조하세요.

스테이트풀(Stateful) 채팅

다음 코드 샘플은 Google Cloud 이 대화 기록을 저장하고 관리하는 스테이트풀(Stateful) 채팅의 컨텍스트를 설정합니다. 다음 샘플 코드에 published_context.options.analysis.python.enabled = True 줄을 포함하여 Python으로 고급 분석을 선택적으로 사용 설정할 수도 있습니다.

# Set up context for stateful chat
published_context = geminidataanalytics.Context()
published_context.system_instruction = system_instruction
published_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
published_context.options.analysis.python.enabled = True

스테이트리스(Stateless) 채팅

다음 샘플 코드는 각 메시지와 함께 전체 대화 기록을 전송해야 하는 스테이트리스(Stateless) 채팅의 컨텍스트를 설정합니다. 다음 샘플 코드에 inline_context.options.analysis.python.enabled = True 줄을 포함하여 Python으로 고급 분석을 선택적으로 사용 설정할 수도 있습니다.

# Set up context for stateless chat
# datasource_references.looker.credentials = credentials
inline_context = geminidataanalytics.Context()
inline_context.system_instruction = system_instruction
inline_context.datasource_references = datasource_references
# Optional: To enable advanced analysis with Python, include the following line:
inline_context.options.analysis.python.enabled = True

데이터 에이전트 만들기

다음 샘플 Python 코드는 데이터 에이전트를 만들기 위한 API 요청을 실행합니다. 이 에이전트를 사용하여 데이터에 관한 대화를 나눌 수 있습니다. 데이터 에이전트는 지정된 데이터 소스, 시스템 안내, 컨텍스트로 구성됩니다.

data_agent_id = "data_agent_1"

data_agent = geminidataanalytics.DataAgent()
data_agent.data_analytics_agent.published_context = published_context
data_agent.name = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}" # Optional

request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/global",
    data_agent_id=data_agent_id, # Optional
    data_agent=data_agent,
)

try:
    data_agent_client.create_data_agent(request=request)
    print("Data Agent created")
except Exception as e:
    print(f"Error creating Data Agent: {e}")

이전 예시에서 data_agent_1 값을 데이터 에이전트의 고유 식별자로 바꿉니다.

데이터 에이전트 가져오기

다음 샘플 Python 코드는 이전에 만든 데이터 에이전트를 가져오기 위한 API 요청을 실행하는 방법을 보여줍니다.

# Initialize request arguments
data_agent_id = "data_agent_1"
request = geminidataanalytics.GetDataAgentRequest(
    name=f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}",
)

# Make the request
response = data_agent_client.get_data_agent(request=request)

# Handle the response
print(response)

이전 예시에서 data_agent_1 값을 가져오려는 데이터 에이전트의 고유 식별자로 바꿉니다.

대화 만들기

다음 샘플 Python 코드는 대화를 만들기 위한 API 요청을 실행합니다.

# Initialize request arguments
data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

conversation = geminidataanalytics.Conversation()
conversation.agents = [f'projects/{billing_project}/locations/global/dataAgents/{data_agent_id}']
conversation.name = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"

request = geminidataanalytics.CreateConversationRequest(
    parent=f"projects/{billing_project}/locations/global",
    conversation_id=conversation_id,
    conversation=conversation,
)

# Make the request
response = data_chat_client.create_conversation(request=request)

# Handle the response
print(response)

샘플 값을 다음과 같이 바꿉니다.

  • data_agent_1: 데이터 에이전트의 ID로, 데이터 에이전트 만들기의 샘플 코드 블록에 정의되어 있습니다.
  • conversation_1: 대화의 고유 식별자입니다.

API를 사용하여 질문하기

데이터 에이전트대화를 만든 후 다음 샘플 Python 코드는 에이전트에 쿼리를 전송합니다. 이 코드는 스테이트풀(Stateful) 또는 스테이트리스(Stateless) 채팅을 위해 설정한 컨텍스트를 사용합니다. API는 에이전트가 쿼리에 답변하기 위해 취하는 단계를 나타내는 메시지 스트림을 반환합니다.

스테이트풀(Stateful) 채팅

Conversation 참조로 스테이트풀(Stateful) 채팅 요청 보내기

이전에 만든 Conversation 리소스를 참조하여 데이터 에이전트에 스테이트풀(Stateful) 채팅 요청을 보낼 수 있습니다.

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"
conversation_id = "conversation_1"

# Create a conversation_reference
conversation_reference = geminidataanalytics.ConversationReference()
conversation_reference.conversation = f"projects/{billing_project}/locations/global/conversations/{conversation_id}"
conversation_reference.data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# conversation_reference.data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent = f"projects/{billing_project}/locations/global",
    messages = messages,
    conversation_reference = conversation_reference
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

샘플 값을 다음과 같이 바꿉니다.

  • Which species of tree is most prevalent?: 데이터 에이전트에게 보낼 자연어 질문입니다.
  • data_agent_1: 데이터 에이전트의 고유 식별자로, 데이터 에이전트 만들기에 정의되어 있습니다.
  • conversation_1: 대화의 고유 식별자로, 대화 만들기에 정의되어 있습니다.

스테이트리스(Stateless) 채팅

다음 코드 샘플은 스테이트리스(Stateless) 채팅의 컨텍스트를 설정한 경우 데이터 에이전트에 쿼리를 전송하는 방법을 보여줍니다. 이전에 정의한 DataAgent 리소스를 참조하거나 요청에서 인라인 컨텍스트를 사용하여 스테이트리스(Stateless) 쿼리를 보낼 수 있습니다.

DataAgent 참조로 스테이트리스(Stateless) 채팅 요청 보내기

이전에 만든 DataAgent 리소스를 참조하여 데이터 에이전트에 쿼리를 보낼 수 있습니다.

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

data_agent_id = "data_agent_1"

data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Form the request
request = geminidataanalytics.ChatRequest(
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
    data_agent_context = data_agent_context
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

샘플 값을 다음과 같이 바꿉니다.

  • Which species of tree is most prevalent?: 데이터 에이전트에게 보낼 자연어 질문입니다.
  • data_agent_1: 데이터 에이전트의 고유 식별자로, 데이터 에이전트 만들기에 정의되어 있습니다.

인라인 컨텍스트를 사용하여 스테이트리스(Stateless) 채팅 요청 보내기

다음 샘플 코드는 inline_context 파라미터를 사용하여 스테이트리스(Stateless) 채팅 요청 내에서 직접 컨텍스트를 제공하는 방법을 보여줍니다.

# Create a request that contains a single user message (your question)
question = "Which species of tree is most prevalent?"
messages = [geminidataanalytics.Message()]
messages[0].user_message.text = question

request = geminidataanalytics.ChatRequest(
    inline_context=inline_context,
    parent=f"projects/{billing_project}/locations/global",
    messages=messages,
)

# Make the request
stream = data_chat_client.chat(request=request)

# Handle the response
for response in stream:
    show_message(response)

이전 예시에서 Which species of tree is most prevalent?를 데이터 에이전트에 전송할 자연어 질문으로 바꿉니다.

스테이트리스(Stateless) 멀티턴 대화 만들기

스테이트리스(Stateless) 대화에서 후속 질문을 하려면 애플리케이션이 각 새 요청과 함께 전체 메시지 기록을 전송하여 대화의 컨텍스트를 관리해야 합니다. 다음 예시에서는 데이터 에이전트를 참조하거나 인라인 컨텍스트를 사용하여 데이터 소스를 직접 제공하여 멀티턴 대화를 만드는 방법을 보여줍니다.

# List that is used to track previous turns and is reused across requests
conversation_messages = []

data_agent_id = "data_agent_1"

# Use data agent context
data_agent_context = geminidataanalytics.DataAgentContext()
data_agent_context.data_agent = f"projects/{billing_project}/locations/global/dataAgents/{data_agent_id}"
# data_agent_context.credentials = credentials

# Helper function for calling the API
def multi_turn_Conversation(msg):

    message = geminidataanalytics.Message()
    message.user_message.text = msg

    # Send a multi-turn request by including previous turns and the new message
    conversation_messages.append(message)

    request = geminidataanalytics.ChatRequest(
        parent=f"projects/{billing_project}/locations/global",
        messages=conversation_messages,
        # Use data agent context
        data_agent_context=data_agent_context,
        # Use inline context
        # inline_context=inline_context,
    )

    # Make the request
    stream = data_chat_client.chat(request=request)

    # Handle the response
    for response in stream:
      show_message(response)
      conversation_messages.append(response)

# Send the first turn request
multi_turn_Conversation("Which species of tree is most prevalent?")

# Send follow-up turn request
multi_turn_Conversation("Can you show me the results as a bar chart?")

이전 예시에서 샘플 값을 다음과 같이 바꿉니다.

  • data_agent_1: 데이터 에이전트의 고유 식별자로, 데이터 에이전트 만들기의 샘플 코드 블록에 정의되어 있습니다.
  • Which species of tree is most prevalent?: 데이터 에이전트에게 보낼 자연어 질문입니다.
  • Can you show me the results as a bar chart?: 이전 질문을 기반으로 하거나 이전 질문을 구체화하는 후속 질문입니다.

도우미 함수 정의

다음 샘플 코드에는 이전 코드 샘플에 사용된 도우미 함수 정의가 포함되어 있습니다. 이러한 함수는 API의 응답을 파싱하고 결과를 표시하는 데 도움이 됩니다.

from pygments import highlight, lexers, formatters
import pandas as pd
import requests
import json as json_lib
import altair as alt
import IPython
from IPython.display import display, HTML

import proto
from google.protobuf.json_format import MessageToDict, MessageToJson

def handle_text_response(resp):
  parts = getattr(resp, 'parts')
  print(''.join(parts))

def display_schema(data):
  fields = getattr(data, 'fields')
  df = pd.DataFrame({
    "Column": map(lambda field: getattr(field, 'name'), fields),
    "Type": map(lambda field: getattr(field, 'type'), fields),
    "Description": map(lambda field: getattr(field, 'description', '-'), fields),
    "Mode": map(lambda field: getattr(field, 'mode'), fields)
  })
  display(df)

def display_section_title(text):
  display(HTML('<h2>{}</h2>'.format(text)))

def format_looker_table_ref(table_ref):
 return 'lookmlModel: {}, explore: {}, lookerInstanceUri: {}'.format(table_ref.lookml_model, table_ref.explore, table_ref.looker_instance_uri)

def format_bq_table_ref(table_ref):
  return '{}.{}.{}'.format(table_ref.project_id, table_ref.dataset_id, table_ref.table_id)

def display_datasource(datasource):
  source_name = ''
  if 'studio_datasource_id' in datasource:
   source_name = getattr(datasource, 'studio_datasource_id')
  elif 'looker_explore_reference' in datasource:
   source_name = format_looker_table_ref(getattr(datasource, 'looker_explore_reference'))
  else:
    source_name = format_bq_table_ref(getattr(datasource, 'bigquery_table_reference'))

  print(source_name)
  display_schema(datasource.schema)

def handle_schema_response(resp):
  if 'query' in resp:
    print(resp.query.question)
  elif 'result' in resp:
    display_section_title('Schema resolved')
    print('Data sources:')
    for datasource in resp.result.datasources:
      display_datasource(datasource)

def handle_data_response(resp):
  if 'query' in resp:
    query = resp.query
    display_section_title('Retrieval query')
    print('Query name: {}'.format(query.name))
    print('Question: {}'.format(query.question))
    print('Data sources:')
    for datasource in query.datasources:
      display_datasource(datasource)
  elif 'generated_sql' in resp:
    display_section_title('SQL generated')
    print(resp.generated_sql)
  elif 'result' in resp:
    display_section_title('Data retrieved')

    fields = [field.name for field in resp.result.schema.fields]
    d = {}
    for el in resp.result.data:
      for field in fields:
        if field in d:
          d[field].append(el[field])
        else:
          d[field] = [el[field]]

    display(pd.DataFrame(d))

def handle_chart_response(resp):
  def _value_to_dict(v):
    if isinstance(v, proto.marshal.collections.maps.MapComposite):
      return _map_to_dict(v)
    elif isinstance(v, proto.marshal.collections.RepeatedComposite):
      return [_value_to_dict(el) for el in v]
    elif isinstance(v, (int, float, str, bool)):
      return v
    else:
      return MessageToDict(v)

  def _map_to_dict(d):
    out = {}
    for k in d:
      if isinstance(d[k], proto.marshal.collections.maps.MapComposite):
        out[k] = _map_to_dict(d[k])
      else:
        out[k] = _value_to_dict(d[k])
    return out

  if 'query' in resp:
    print(resp.query.instructions)
  elif 'result' in resp:
    vegaConfig = resp.result.vega_config
    vegaConfig_dict = _map_to_dict(vegaConfig)
    alt.Chart.from_json(json_lib.dumps(vegaConfig_dict)).display();

def show_message(msg):
  m = msg.system_message
  if 'text' in m:
    handle_text_response(getattr(m, 'text'))
  elif 'schema' in m:
    handle_schema_response(getattr(m, 'schema'))
  elif 'data' in m:
    handle_data_response(getattr(m, 'data'))
  elif 'chart' in m:
    handle_chart_response(getattr(m, 'chart'))
  print('\n')